最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Use value of _id as key for MongoDB Kafka Source Connector - Stack Overflow

programmeradmin0浏览0评论

My collection looks like this Collection

And the source configuration looks like this

curl -X POST "http://localhost:8083/connectors" \
     -H "Content-Type: application/json" \
     -d '{
           "name": "mongodb-source-connector-",
           "config": {
             "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
             "tasks.max": "1",
             "connection.uri": "mongodb://mongo:27017/?replicaSet=rs0",
             "copy.existing": "true",
             "database": "projectsDB",
             "collection": "projects",
             "topic.namespace.map": "{\"projectsDB.projects\": \"mongo.projects\"}",
             "publish.full.document.only": true
           }
         }'

The resulting kafka message key has the following format { "_id": "3" }, but I need it to be just a string value of _id field.

I have already tried to use transformations like this

"transforms": "id",
"transforms.id.type": ".apache.kafka.connect.transforms.ExtractField$Key",
"transforms.id.field": "documentKey._id" 

but it did not help and the error was

      "trace": ".apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat .apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat .apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat .apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat .apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339)\n\tat .apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)\n\tat .apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat .apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: .apache.kafka.connect.errors.DataException: Only Struct objects supported for [field extraction], found: java.lang.String\n\tat .apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)\n\tat .apache.kafka.connect.transforms.util.Requirements.requireStructOrNull(Requirements.java:61)\n\tat .apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)\n\tat .apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\tat .apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat .apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 11 more\n"
发布评论

评论列表(0)

  1. 暂无评论