最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

docker compose - org.apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_key' cann

programmeradmin2浏览0评论

I am working on a Proof of Concept (PoC) where I am streaming data from a source database into a Kafka topic using Change Data Capture (CDC). The data is successfully being captured in the Kafka topic.

I am using the Kafka JDBC Sink Connector to write this data from Kafka to a target database. However, the connection to the target database is failing, and I am getting an error. here is my jdbc sink connector config:

{
    "name": "jdbc-sink-connector",
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:sqlserver://192.168.0.102:1433;databaseName=targetdatabase",
        "connection.username": "test_user",
        "connection.password": "******",
        "hibernate.dialect": ".hibernate.dialect.SQLServerDialect",
        "dialect.name": "SqlServerDatabaseDialect",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "primary.key.mode": "record_key",
        "primary.key.fields":"id",
        "schema.evolution": "basic",
        "topics": "sqlserver-cdc.testdatabase.dbo.employees",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.add.fields": "op,source.ts_ms",
        "decimal.handling.mode": "double"


        
    }
}

".apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at .apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at .apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at .apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at .apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at .apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at .apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at .apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: .apache.kafka.connect.errors.ConnectException: JDBC sink connector failure at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:83) at .apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583) ... 11 more Caused by: .apache.kafka.connect.errors.ConnectException: Failed to process a sink record at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:72) at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:93) ... 12 more Caused by: .apache.kafka.connect.errors.ConnectException: Configured primary key mode 'record_key' cannot have null schema at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.applyRecordKeyAsPrimaryKey(SinkRecordDescriptor.java:315) at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordKeyData(SinkRecordDescriptor.java:288) at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:261) at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:66) ... 13 more"

发布评论

评论列表(0)

  1. 暂无评论