最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

docker - Debezium Kafka Connect: Missing schema-changes Topic and API Parameter Optimization - Stack Overflow

programmeradmin3浏览0评论

I have a Postgres database named test_kafkaconnect, which contains two tables: aa and bb. I want to stream all database changes to Kafka.

I figured out that I need to use Debezium Kafka Connect for this. Here is my docker-compose configuration for Postgres, Kafka, and Debezium Kafka Connect:

version: '3.8'

services:
  postgres:
    image: postgres:17.2
    container_name: postgres
    restart: always
    shm_size: 128mb
    environment:
      POSTGRES_PASSWORD: pass
    volumes:
      - postgres_data:/var/lib/postgresql/data

    ports:
      - 5432:5432



  # Kafka broker in KRaft mode
  kafka:
    image: apache/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      # KRaft-specific settings
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.25.33.214:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  
  kafka-connect:
    image: debezium/connect:3.0.0.Final
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: "172.25.33.214:9092"
      GROUP_ID: "postgres_to_kafka_groupid"
      CONFIG_STORAGE_TOPIC: "connect-configs"
      OFFSET_STORAGE_TOPIC: "connect-offsets"
      STATUS_STORAGE_TOPIC: "connect-status"
      CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      STATUS_STORAGE_REPLICATION_FACTOR: "1"

  
volumes:
  postgres_data:
  

When I run this Docker setup, the following topics are created in Kafka:

connect-configs

connect-offsets

connect-status

After running this Docker setup, I send an API request to Kafka Connect:

curl -X POST "http://172.25.33.214:8083/connectors" -H "Content-Type: application/json" -d '{
      "name": "postgres-connector",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "172.25.33.214",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "pass",
        "database.dbname": "test_kafkaconnect",
        "database.server.name": "pgserver",
        "plugin.name": "pgoutput",
        "slot.name": "debezium_slot",
        "publication.name": "dbz_publication",

        "database.history.kafka.bootstrap.servers": "172.25.33.214:9092",
        "schema.history.internal.kafka.topic": "testkafkaconnect-schema",
        "topic.prefix": "dbz_",
            "snapshot.mode": "initial"
      }
    }'

Response:

{"name":"postgres-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"172.25.33.214","database.port":"5432","database.user":"postgres","database.password":"pass","database.dbname":"test_kafkaconnect","database.server.name":"pgserver","plugin.name":"pgoutput","slot.name":"debezium_slot","publication.name":"dbz_publication","database.history.kafka.bootstrap.servers":"172.25.33.214:9092","schema.history.internal.kafka.topic":"testkafkaconnect-schema","topic.prefix":"dbz_","snapshot.mode":"initial","name":"postgres-connector"},"tasks":[],"type":"source"}

Which parameters in this API request can I remove, and which ones are mandatory?

After executing this API request, the following topics are created in Kafka:

dbz_.public.aa

dbz_.public.bb

topic testkafkaconnect-schema was not created.

The data in the topics dbz_.public.aa and dbz_.public.bb includes the table schema within the schema section. This section contains information about the fields of the table, such as their names, data types, and optionality. However, it does not include the full database schema definition, such as key types (primary or foreign keys), indexes, or other database constraints and settings.

As a result, I cannot determine the relationships between database tables solely from the data in Kafka.

Did I configure everything correctly?

Is the data correctly written to the topics? Or did I make a mistake somewhere that I need to fix?

发布评论

评论列表(0)

  1. 暂无评论