最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Submitting multiple DELETE statements in Flink SQL - Stack Overflow

programmeradmin6浏览0评论

I' m trying to submit multiple DELETE statements in to the TableEnvironment in way:

val settings = EnvironmentSettings.newInstance.inBatchMode.build()
val env = TableEnvironment.create(settings)
createSchema(env)
val queries = getDeleteQueries
queries.foreach(q => env.executeSql(q).await())

what according to documentation should just work delete-statements. But in reality I have follow exception after the first query is finished:

Caused by: .apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.
    at .apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.19.1.jar:1.19.1]
    at .apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.19.1.jar:1.19.1]
    at .apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) ~[?:?]
    at .apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1032) ~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
    at .apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:876) ~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
    at .apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112) ~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
    at .apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735) ~[flink-table-api-java-uber-1.19.1.jar:1.19.1]

When I check the code I see that there is indeed a validation that not more then 1 job can be submitted:

    private void validateAllowedExecution() {
    if (enforceSingleJobExecution && jobCounter > 0) {
        throw new FlinkRuntimeException(
                "Cannot have more than one execute() or executeAsync() call in a single environment.");
    }
    jobCounter++;
}

When I've tried to submit multiple statements separated by ; I've got follow exception:

Caused by: java.lang.IllegalArgumentException: only single statement supported
    at .apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) ~[flink-dist-1.19.1.jar:1.19.1]
    at .apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:104) ~[?:?]
    at .apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) ~[flink-table-api-java-uber-1.19.1.jar:1.19.1]

My job is submitted to Kubernetes vie usage of flink-kubernetes-operator

Flink version is 1.19.1

Flink Kubernetes operator version is 1.10.0

My queries look like:

DELETE FROM hdfs_table
WHERE id IN (
  SELECT DISTINCT a.id
  FROM jdbc_table_a a
  JOIN jdbc_table_b b ON a.b_id = b.id
WHERE b.should_be_deleted
)

Two questions which I would appreciate to get help with:

  1. What am I missing in the configuration what's preventing me from submitting multiple DELETE statements?
  2. Is it possible to submit multiple DELETE question in parallel to be able to reuse jdbc tables rather then load them on every query?
发布评论

评论列表(0)

  1. 暂无评论