最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Debezium mysql connector is capturing schema changes for tables that are not in the table include list - Stack Overflow

programmeradmin4浏览0评论

I am using Debebezium CDC with the outbox transformer and we need it to start up as fast as possible. Since we are using the outbox, there is only ever going to be one table that we are interested in (and that table structure is unlikely to change).

When we use "snapshot.mode": "none" then any binlog changes we capture throws an error as we have not recorded the schema. When we use "snapshot.mode": "schema_only" all tables in the schema are written to "schema.history.internal.kafka.topic", despite limiting this by "table.include.list" - this only contains the one table that we are interested in.

There are many tables in the schema and adding them all to "table.exclude.list" feels impractical.

Is there a fault / something missing in our properties file? We only want to capture the schema for one table we are interested in.

{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.include.list": "xxxxxxxxxx",
    "database.hostname": "xxxxxxxxxxxx",
    "database.port": "3306",
    "database.user": "xxxxxx",
    "database.password": "xxxxxxxxxxxxxx",
    "database.server.id": "184054",
    "table.include.list": "xxxxxx.outbox",
    "snapshot.mode": "schema_only",
    "snapshot.locking.mode": "none",
    "schema.history.internal.kafka.bootstrap.servers": "xxxxxxxxxxxxxxxxxxxx",
    "schema.history.internal.kafka.topic": "operations.integration.schema.history",
    "include.schema.changes": "false",
    "tombstones.on.delete": "false",
    "topic.prefix": "operations.integration.feeds",
    "poll.interval.ms": 100,
    "skipped.operations": "u,d,t",
    "value.converter": ".apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "producer.overridepression.type": "lz4",
    "transforms": "outbox,routing,insertMessageSystemHeader,dropHeaders",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.expand.json.payload": "true",
    "transforms.outbox.table.fields.additional.placement": "message_id:header:messaging.message.message.id,idempotency_key:header:xxxxxxx.message.idempotency_key,is_canary_request:header:xxxxxxx.message.canary,message_name:header:xxxxxxx.message.name,message_version:header:xxxxxxx.message.version,message_owner:header:xxxxxxx.message.owner,correlation_id:header:xxxxxxx.message.conversation_id,message_format:header:xxxxxxx.message.format",
    "transforms.outbox.table.field.event.key": "event_id",
    "transforms.outbox.table.field.event.timestamp": "date_updated",
    "transforms.outbox.table.field.event.payload": "details",
    "transforms.outbox.table.field.event.id": "message_id",
    "transforms.outbox.tracing.span.context.field": "correlation_id",
    "transforms.outbox.route.topic.replacement": "xxxxxxxxxxxxxxxxxxxx.${routedByValue}",
    "transforms.outbox.route.by.field": "target",
    "transforms.routing.type": "io.debezium.transforms.partitions.PartitionRouting",
    "transforms.routing.partition.payload.fields": "id",
    "transforms.routing.partition.topic.num": "10",
    "transforms.insertMessageSystemHeader.type": ".apache.kafka.connect.transforms.InsertHeader",
    "transforms.insertMessageSystemHeader.header": "messaging.system",
    "transforms.insertMessageSystemHeader.value.literal": "kafka",
    "transforms.dropHeaders.type": ".apache.kafka.connect.transforms.DropHeaders",
    "transforms.dropHeaders.headers": "id"
}
发布评论

评论列表(0)

  1. 暂无评论