I'm trying to use Flink-cdc to capture data change from Mysql and update the Hudi table in S3. My pyFlink job was like:
env = StreamExecutionEnvironment.get_execution_environment(config)
env.set_parallelism(1)
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)
t_env.execute_sql(f"""
CREATE TABLE source (
...
) WITH (
'connector' = 'mysql-cdc',
...
)
""")
t_env.execute_sql(f"""
CREATE TABLE target (
...
) WITH (
'connector' = 'hudi',
'path' = 's3a://xxx/xx/xx',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'write.operation' = 'upsert',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.partitionpath.field' = '',
'write.tasks' = '1',
'compaction.tasks' = '1',
'compaction.async.enabled' = 'true',
'hoodie.table.version' = 'SIX',
'hoodie.write.table.version' = '6'
)
""")
t_env.execute_sql(f"""
INSERT INTO target
SELECT * FROM source
""")
But when I try to submit this job to Flink, Flink return an error:
.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation
What does this error mean? Does it mean S3 is not support to be the sink in this situation? Flink could not upsert data into S3?
I'm trying to use Flink-cdc to capture data change from Mysql and update the Hudi table in S3. My pyFlink job was like:
env = StreamExecutionEnvironment.get_execution_environment(config)
env.set_parallelism(1)
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)
t_env.execute_sql(f"""
CREATE TABLE source (
...
) WITH (
'connector' = 'mysql-cdc',
...
)
""")
t_env.execute_sql(f"""
CREATE TABLE target (
...
) WITH (
'connector' = 'hudi',
'path' = 's3a://xxx/xx/xx',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'write.operation' = 'upsert',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.partitionpath.field' = '',
'write.tasks' = '1',
'compaction.tasks' = '1',
'compaction.async.enabled' = 'true',
'hoodie.table.version' = 'SIX',
'hoodie.write.table.version' = '6'
)
""")
t_env.execute_sql(f"""
INSERT INTO target
SELECT * FROM source
""")
But when I try to submit this job to Flink, Flink return an error:
.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation
What does this error mean? Does it mean S3 is not support to be the sink in this situation? Flink could not upsert data into S3?
Share Improve this question edited Mar 19 at 10:14 Rinze asked Mar 17 at 10:59 RinzeRinze 8141 gold badge10 silver badges26 bronze badges 2 |1 Answer
Reset to default 0Resolve by setting 'hoodie.write.lock.provider' = '.apache.hudi.client.transaction.lock.InProcessLockProvider'
s3:
instead ofs3a:
? – John Rotenstein Commented Mar 17 at 11:55s3://
instead ofs3a://
, but this will raise another error said thats3://
is unknown scheme – Rinze Commented Mar 19 at 0:52