最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Flink sql LAG function with window_frame does not work - Stack Overflow

programmeradmin2浏览0评论
/** mode('streaming')*/
CREATE OR REPLACE TABLE eoj_table (
   `pk` string,
   `id` string,
   `name` string,
   `headers` MAP<STRING, BYTES> METADATA ,
   `hard_deleted` boolean,
   `kafka_key` STRING,
   `ts` timestamp(3) METADATA  FROM 'timestamp'VIRTUAL,
   `procTime` AS PROCTIME(),
    WATERMARK FOR ts AS ts
) WITH (
   'connector' = 'kafka',
   'properties.bootstrap.servers' = 'kafka:29092',
   'properties.group.id' = 'group_id_1',
   'topic-pattern' = '^topic(_backfill)?$',
   'value.format' = 'json',
   'format' = 'json',
   'key.format' = 'raw',
   'key.fields' = 'kafka_key',
   'value.fields-include' = 'EXCEPT_KEY',
   'scan.startup.mode' = 'earliest-offset',
   'json.timestamp-format.standard' = 'ISO-8601',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
);

I have the above kafka table, and trying to run this query

SELECT LAG(pk) 
  OVER (
    PARTITION BY id ORDER BY procTime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
  ) 
AS prev_data_hash 
FROM eoj_table;

I get this error

Caused by: .apache.calcite.sql.validate.SqlValidatorException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions

How do I use LAG function and have it look current row and previous row only.

/** mode('streaming')*/
CREATE OR REPLACE TABLE eoj_table (
   `pk` string,
   `id` string,
   `name` string,
   `headers` MAP<STRING, BYTES> METADATA ,
   `hard_deleted` boolean,
   `kafka_key` STRING,
   `ts` timestamp(3) METADATA  FROM 'timestamp'VIRTUAL,
   `procTime` AS PROCTIME(),
    WATERMARK FOR ts AS ts
) WITH (
   'connector' = 'kafka',
   'properties.bootstrap.servers' = 'kafka:29092',
   'properties.group.id' = 'group_id_1',
   'topic-pattern' = '^topic(_backfill)?$',
   'value.format' = 'json',
   'format' = 'json',
   'key.format' = 'raw',
   'key.fields' = 'kafka_key',
   'value.fields-include' = 'EXCEPT_KEY',
   'scan.startup.mode' = 'earliest-offset',
   'json.timestamp-format.standard' = 'ISO-8601',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
);

I have the above kafka table, and trying to run this query

SELECT LAG(pk) 
  OVER (
    PARTITION BY id ORDER BY procTime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
  ) 
AS prev_data_hash 
FROM eoj_table;

I get this error

Caused by: .apache.calcite.sql.validate.SqlValidatorException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions

How do I use LAG function and have it look current row and previous row only.

Share Improve this question edited Feb 5 at 19:11 President James K. Polk 42.1k30 gold badges110 silver badges146 bronze badges asked Feb 4 at 18:10 user3822232user3822232 3153 silver badges16 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

You can do it like this:

SELECT LAG(pk, 1) 
  OVER (
    PARTITION BY id ORDER BY procTime
  ) 
AS prev_data_hash 
FROM eoj_table;

This query doesn't specify a number of ROWs or a RANGE, but that doesn't matter. It's the LAG function that manages the state for this query, and its implementation only keeps as many previous values in its accumulator as necessary (in this case, it will keep 2).

If you're curious, here's the relevant code:

    public void accumulate(LagAcc<T> acc, T value) throws Exception {
        acc.buffer.add(value);
        while (acc.buffer.size() > acc.offset + 1) {
            acc.buffer.removeFirst();
        }
    }

In this case, the value of the offset is 1, so it keeps a buffer of 2 values.

发布评论

评论列表(0)

  1. 暂无评论