I' m trying to submit multiple DELETE statements in to the TableEnvironment in way:
val settings = EnvironmentSettings.newInstance.inBatchMode.build()
val env = TableEnvironment.create(settings)
createSchema(env)
val queries = getDeleteQueries
queries.foreach(q => env.executeSql(q).await())
what according to documentation should just work delete-statements. But in reality I have follow exception after the first query is finished:
Caused by: .apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.
at .apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.19.1.jar:1.19.1]
at .apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.19.1.jar:1.19.1]
at .apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) ~[?:?]
at .apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1032) ~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
at .apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:876) ~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
at .apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112) ~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
at .apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735) ~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
When I check the code I see that there is indeed a validation that not more then 1 job can be submitted:
private void validateAllowedExecution() {
if (enforceSingleJobExecution && jobCounter > 0) {
throw new FlinkRuntimeException(
"Cannot have more than one execute() or executeAsync() call in a single environment.");
}
jobCounter++;
}
When I've tried to submit multiple statements separated by ; I've got follow exception:
Caused by: java.lang.IllegalArgumentException: only single statement supported
at .apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) ~[flink-dist-1.19.1.jar:1.19.1]
at .apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:104) ~[?:?]
at .apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) ~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
My job is submitted to Kubernetes vie usage of flink-kubernetes-operator
Flink version is 1.19.1
Flink Kubernetes operator version is 1.10.0
My queries look like:
DELETE FROM hdfs_table
WHERE id IN (
SELECT DISTINCT a.id
FROM jdbc_table_a a
JOIN jdbc_table_b b ON a.b_id = b.id
WHERE b.should_be_deleted
)
Two questions which I would appreciate to get help with:
- What am I missing in the configuration what's preventing me from submitting multiple DELETE statements?
- Is it possible to submit multiple DELETE question in parallel to be able to reuse jdbc tables rather then load them on every query?