I getting the following Exception in Flink.
The window function requires the timecol is a time attribute type, but is TIMESTAMP(3)
Little bit research at the internet tells me, this problem is cause when the DESCRIPTOR for the TUMBLE must have a WATERMARK and the field must have the ROWTIME property.
Which my table has it but as complexity I am creating a VIEW from this table, the documentation says in Theory, the view must inherit this WATERMARK and ROWTIME, which I can see from printing the Schema.
So my view looks like the following (it is just a test, I am not trying to produce that make sense),
CREATE VIEW aggregated_transactions AS
SELECT
t.kafka_key_transaction_id,
t.credit_card_number,
c.customer_email,
t.amount AS total_amount,
c.average_spending_amount,
LocationScore(c.country, t.location) as location_score,
t.ts
FROM transactions t
INNER JOIN credit_cards cc ON t.credit_card_number = cc.credit_card_number
INNER JOIN customers c ON c.customer_id = cc.customer_id
And Schema
(
`kafka_key_transaction_id` STRING NOT NULL,
`credit_card_number` BIGINT,
`customer_email` STRING,
`total_amount` INT,
`average_spending_amount` INT,
`location_score` DOUBLE,
`ts` TIMESTAMP(3) *ROWTIME*
)
So as you can see the ts -> time_stamp has 'ROWTIME' but when I create the following table and try to insert
CREATE TABLE feature_set (
credit_card_number BIGINT,
total_amount INT,
transaction_count BIGINT,
average_spending_amount INT,
locationScore DOUBLE,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'print'
)
and the TUMBLE...
INSERT INTO feature_set
SELECT
credit_card_number,
customer_email,
SUM(total_amount) AS total_amount,
COUNT(kafka_key_transaction_id) AS transaction_count,
SUM(average_spending_amount) AS average_spending_amount
AVG(location_score),
window_start,
window_end
FROM TABLE(TUMBLE(TABLE aggregated_transactions, DESCRIPTOR(ts), INTERVAL '1' MINUTES))
GROUP BY credit_card_number, window_start, window_end
this fails with the previously mentioned Exception but even the Exception say 'ts' columns has the ROWTIME tag.
Error while applying rule StreamPhysicalWindowTableFunctionRule(in:LOGICAL,out:STREAM_PHYSICAL), args [rel#13516:FlinkLogicalTableFunctionScan.LOGICAL.any.None: 0.[NONE].[NONE](input#0=RelSubset#13514,invocation=TUMBLE(DESCRIPTOR($4), 60000:INTERVAL MINUTE),rowType=RecordType(BIGINT credit_card_number, INTEGER total_amount, INTEGER average_spending_amount, DOUBLE location_score, TIMESTAMP(3) ROWTIME ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) ROWTIME window_time))]
As you can see 'ROWTIME' is there 'TIMESTAMP(3) ROWTIME ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) ROWTIME window_time'
Can anybody tell be what is going wrong here?
I getting the following Exception in Flink.
The window function requires the timecol is a time attribute type, but is TIMESTAMP(3)
Little bit research at the internet tells me, this problem is cause when the DESCRIPTOR for the TUMBLE must have a WATERMARK and the field must have the ROWTIME property.
Which my table has it but as complexity I am creating a VIEW from this table, the documentation says in Theory, the view must inherit this WATERMARK and ROWTIME, which I can see from printing the Schema.
So my view looks like the following (it is just a test, I am not trying to produce that make sense),
CREATE VIEW aggregated_transactions AS
SELECT
t.kafka_key_transaction_id,
t.credit_card_number,
c.customer_email,
t.amount AS total_amount,
c.average_spending_amount,
LocationScore(c.country, t.location) as location_score,
t.ts
FROM transactions t
INNER JOIN credit_cards cc ON t.credit_card_number = cc.credit_card_number
INNER JOIN customers c ON c.customer_id = cc.customer_id
And Schema
(
`kafka_key_transaction_id` STRING NOT NULL,
`credit_card_number` BIGINT,
`customer_email` STRING,
`total_amount` INT,
`average_spending_amount` INT,
`location_score` DOUBLE,
`ts` TIMESTAMP(3) *ROWTIME*
)
So as you can see the ts -> time_stamp has 'ROWTIME' but when I create the following table and try to insert
CREATE TABLE feature_set (
credit_card_number BIGINT,
total_amount INT,
transaction_count BIGINT,
average_spending_amount INT,
locationScore DOUBLE,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'print'
)
and the TUMBLE...
INSERT INTO feature_set
SELECT
credit_card_number,
customer_email,
SUM(total_amount) AS total_amount,
COUNT(kafka_key_transaction_id) AS transaction_count,
SUM(average_spending_amount) AS average_spending_amount
AVG(location_score),
window_start,
window_end
FROM TABLE(TUMBLE(TABLE aggregated_transactions, DESCRIPTOR(ts), INTERVAL '1' MINUTES))
GROUP BY credit_card_number, window_start, window_end
this fails with the previously mentioned Exception but even the Exception say 'ts' columns has the ROWTIME tag.
Error while applying rule StreamPhysicalWindowTableFunctionRule(in:LOGICAL,out:STREAM_PHYSICAL), args [rel#13516:FlinkLogicalTableFunctionScan.LOGICAL.any.None: 0.[NONE].[NONE](input#0=RelSubset#13514,invocation=TUMBLE(DESCRIPTOR($4), 60000:INTERVAL MINUTE),rowType=RecordType(BIGINT credit_card_number, INTEGER total_amount, INTEGER average_spending_amount, DOUBLE location_score, TIMESTAMP(3) ROWTIME ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) ROWTIME window_time))]
As you can see 'ROWTIME' is there 'TIMESTAMP(3) ROWTIME ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) ROWTIME window_time'
Can anybody tell be what is going wrong here?
Share Improve this question asked Jan 31 at 13:17 posthumecaverposthumecaver 1,8635 gold badges21 silver badges39 bronze badges1 Answer
Reset to default 0This is a known problem with Flink SQL -- namely that time attributes are lost for views. As a workaround, I can suggest that you use a CTE instead:
WITH aggregated_transactions AS (
SELECT ...
)
INSERT INTO feature_set
SELECT ...