My collection looks like this Collection
And the source configuration looks like this
curl -X POST "http://localhost:8083/connectors" \
-H "Content-Type: application/json" \
-d '{
"name": "mongodb-source-connector-",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
"connection.uri": "mongodb://mongo:27017/?replicaSet=rs0",
"copy.existing": "true",
"database": "projectsDB",
"collection": "projects",
"topic.namespace.map": "{\"projectsDB.projects\": \"mongo.projects\"}",
"publish.full.document.only": true
}
}'
The resulting kafka message key has the following format { "_id": "3" }
, but I need it to be just a string value of _id field.
I have already tried to use transformations like this
"transforms": "id",
"transforms.id.type": ".apache.kafka.connect.transforms.ExtractField$Key",
"transforms.id.field": "documentKey._id"
but it did not help and the error was
"trace": ".apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat .apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat .apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat .apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat .apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339)\n\tat .apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)\n\tat .apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat .apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: .apache.kafka.connect.errors.DataException: Only Struct objects supported for [field extraction], found: java.lang.String\n\tat .apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)\n\tat .apache.kafka.connect.transforms.util.Requirements.requireStructOrNull(Requirements.java:61)\n\tat .apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)\n\tat .apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\tat .apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat .apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 11 more\n"