return FALSE; $r = well_tag_thread__update(array('id' => $id), $update); return $r; } function well_tag_thread_find($tagid, $page, $pagesize) { $arr = well_tag_thread__find(array('tagid' => $tagid), array('id' => -1), $page, $pagesize); return $arr; } function well_tag_thread_find_by_tid($tid, $page, $pagesize) { $arr = well_tag_thread__find(array('tid' => $tid), array(), $page, $pagesize); return $arr; } ?>Time Attribute Type for a TUMBLE with Apache Flink - Stack Overflow
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Time Attribute Type for a TUMBLE with Apache Flink - Stack Overflow

programmeradmin1浏览0评论

I getting the following Exception in Flink.

The window function requires the timecol is a time attribute type, but is TIMESTAMP(3)

Little bit research at the internet tells me, this problem is cause when the DESCRIPTOR for the TUMBLE must have a WATERMARK and the field must have the ROWTIME property.

Which my table has it but as complexity I am creating a VIEW from this table, the documentation says in Theory, the view must inherit this WATERMARK and ROWTIME, which I can see from printing the Schema.

So my view looks like the following (it is just a test, I am not trying to produce that make sense),

CREATE VIEW aggregated_transactions AS
  SELECT
    t.kafka_key_transaction_id,
    t.credit_card_number,
    c.customer_email,
    t.amount AS total_amount,
    c.average_spending_amount,
    LocationScore(c.country, t.location) as location_score,
    t.ts
    FROM transactions t
    INNER JOIN credit_cards cc ON t.credit_card_number = cc.credit_card_number
    INNER JOIN customers c ON c.customer_id = cc.customer_id

And Schema

 (
  `kafka_key_transaction_id` STRING NOT NULL,
  `credit_card_number` BIGINT,
  `customer_email` STRING,
  `total_amount` INT,
  `average_spending_amount` INT,
 `location_score` DOUBLE,
 `ts` TIMESTAMP(3) *ROWTIME*
 )

So as you can see the ts -> time_stamp has 'ROWTIME' but when I create the following table and try to insert

  CREATE TABLE feature_set (
    credit_card_number BIGINT,
    total_amount INT,
    transaction_count BIGINT,
    average_spending_amount INT,
    locationScore DOUBLE,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3)
  ) WITH (
    'connector' = 'print'
  )

and the TUMBLE...

INSERT INTO feature_set
  SELECT
    credit_card_number,
    customer_email,
    SUM(total_amount) AS total_amount,
    COUNT(kafka_key_transaction_id) AS transaction_count,        
    SUM(average_spending_amount) AS average_spending_amount        
    AVG(location_score),
    window_start,
    window_end
  FROM TABLE(TUMBLE(TABLE aggregated_transactions, DESCRIPTOR(ts), INTERVAL '1' MINUTES))
  GROUP BY credit_card_number, window_start, window_end

this fails with the previously mentioned Exception but even the Exception say 'ts' columns has the ROWTIME tag.

Error while applying rule StreamPhysicalWindowTableFunctionRule(in:LOGICAL,out:STREAM_PHYSICAL), args [rel#13516:FlinkLogicalTableFunctionScan.LOGICAL.any.None: 0.[NONE].[NONE](input#0=RelSubset#13514,invocation=TUMBLE(DESCRIPTOR($4), 60000:INTERVAL MINUTE),rowType=RecordType(BIGINT credit_card_number, INTEGER total_amount, INTEGER average_spending_amount, DOUBLE location_score, TIMESTAMP(3) ROWTIME ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) ROWTIME window_time))]

As you can see 'ROWTIME' is there 'TIMESTAMP(3) ROWTIME ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) ROWTIME window_time'

Can anybody tell be what is going wrong here?

I getting the following Exception in Flink.

The window function requires the timecol is a time attribute type, but is TIMESTAMP(3)

Little bit research at the internet tells me, this problem is cause when the DESCRIPTOR for the TUMBLE must have a WATERMARK and the field must have the ROWTIME property.

Which my table has it but as complexity I am creating a VIEW from this table, the documentation says in Theory, the view must inherit this WATERMARK and ROWTIME, which I can see from printing the Schema.

So my view looks like the following (it is just a test, I am not trying to produce that make sense),

CREATE VIEW aggregated_transactions AS
  SELECT
    t.kafka_key_transaction_id,
    t.credit_card_number,
    c.customer_email,
    t.amount AS total_amount,
    c.average_spending_amount,
    LocationScore(c.country, t.location) as location_score,
    t.ts
    FROM transactions t
    INNER JOIN credit_cards cc ON t.credit_card_number = cc.credit_card_number
    INNER JOIN customers c ON c.customer_id = cc.customer_id

And Schema

 (
  `kafka_key_transaction_id` STRING NOT NULL,
  `credit_card_number` BIGINT,
  `customer_email` STRING,
  `total_amount` INT,
  `average_spending_amount` INT,
 `location_score` DOUBLE,
 `ts` TIMESTAMP(3) *ROWTIME*
 )

So as you can see the ts -> time_stamp has 'ROWTIME' but when I create the following table and try to insert

  CREATE TABLE feature_set (
    credit_card_number BIGINT,
    total_amount INT,
    transaction_count BIGINT,
    average_spending_amount INT,
    locationScore DOUBLE,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3)
  ) WITH (
    'connector' = 'print'
  )

and the TUMBLE...

INSERT INTO feature_set
  SELECT
    credit_card_number,
    customer_email,
    SUM(total_amount) AS total_amount,
    COUNT(kafka_key_transaction_id) AS transaction_count,        
    SUM(average_spending_amount) AS average_spending_amount        
    AVG(location_score),
    window_start,
    window_end
  FROM TABLE(TUMBLE(TABLE aggregated_transactions, DESCRIPTOR(ts), INTERVAL '1' MINUTES))
  GROUP BY credit_card_number, window_start, window_end

this fails with the previously mentioned Exception but even the Exception say 'ts' columns has the ROWTIME tag.

Error while applying rule StreamPhysicalWindowTableFunctionRule(in:LOGICAL,out:STREAM_PHYSICAL), args [rel#13516:FlinkLogicalTableFunctionScan.LOGICAL.any.None: 0.[NONE].[NONE](input#0=RelSubset#13514,invocation=TUMBLE(DESCRIPTOR($4), 60000:INTERVAL MINUTE),rowType=RecordType(BIGINT credit_card_number, INTEGER total_amount, INTEGER average_spending_amount, DOUBLE location_score, TIMESTAMP(3) ROWTIME ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) ROWTIME window_time))]

As you can see 'ROWTIME' is there 'TIMESTAMP(3) ROWTIME ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) ROWTIME window_time'

Can anybody tell be what is going wrong here?

Share Improve this question asked Jan 31 at 13:17 posthumecaverposthumecaver 1,8635 gold badges21 silver badges39 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

This is a known problem with Flink SQL -- namely that time attributes are lost for views. As a workaround, I can suggest that you use a CTE instead:

WITH aggregated_transactions AS (
  SELECT ...
)
INSERT INTO feature_set
SELECT ...
发布评论

评论列表(0)

  1. 暂无评论