I have a Postgres database named test_kafkaconnect
, which contains two tables: aa
and bb
.
I want to stream all database changes to Kafka.
I figured out that I need to use Debezium Kafka Connect
for this.
Here is my docker-compose configuration for Postgres, Kafka, and Debezium Kafka Connect:
version: '3.8'
services:
postgres:
image: postgres:17.2
container_name: postgres
restart: always
shm_size: 128mb
environment:
POSTGRES_PASSWORD: pass
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- 5432:5432
# Kafka broker in KRaft mode
kafka:
image: apache/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
# KRaft-specific settings
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.25.33.214:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka-connect:
image: debezium/connect:3.0.0.Final
container_name: kafka-connect
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: "172.25.33.214:9092"
GROUP_ID: "postgres_to_kafka_groupid"
CONFIG_STORAGE_TOPIC: "connect-configs"
OFFSET_STORAGE_TOPIC: "connect-offsets"
STATUS_STORAGE_TOPIC: "connect-status"
CONFIG_STORAGE_REPLICATION_FACTOR: "1"
OFFSET_STORAGE_REPLICATION_FACTOR: "1"
STATUS_STORAGE_REPLICATION_FACTOR: "1"
volumes:
postgres_data:
When I run this Docker setup, the following topics are created in Kafka:
connect-configs
connect-offsets
connect-status
After running this Docker setup, I send an API request to Kafka Connect:
curl -X POST "http://172.25.33.214:8083/connectors" -H "Content-Type: application/json" -d '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "172.25.33.214",
"database.port": "5432",
"database.user": "postgres",
"database.password": "pass",
"database.dbname": "test_kafkaconnect",
"database.server.name": "pgserver",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"database.history.kafka.bootstrap.servers": "172.25.33.214:9092",
"schema.history.internal.kafka.topic": "testkafkaconnect-schema",
"topic.prefix": "dbz_",
"snapshot.mode": "initial"
}
}'
Response:
{"name":"postgres-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"172.25.33.214","database.port":"5432","database.user":"postgres","database.password":"pass","database.dbname":"test_kafkaconnect","database.server.name":"pgserver","plugin.name":"pgoutput","slot.name":"debezium_slot","publication.name":"dbz_publication","database.history.kafka.bootstrap.servers":"172.25.33.214:9092","schema.history.internal.kafka.topic":"testkafkaconnect-schema","topic.prefix":"dbz_","snapshot.mode":"initial","name":"postgres-connector"},"tasks":[],"type":"source"}
Which parameters in this API request can I remove, and which ones are mandatory?
After executing this API request, the following topics are created in Kafka:
dbz_.public.aa
dbz_.public.bb
topic testkafkaconnect-schema
was not created.
The data in the topics dbz_.public.aa
and dbz_.public.bb
includes the table schema within the schema section. This section contains information about the fields of the table, such as their names, data types, and optionality. However, it does not include the full database schema definition, such as key types (primary or foreign keys), indexes, or other database constraints and settings.
As a result, I cannot determine the relationships between database tables solely from the data in Kafka.
Did I configure everything correctly?
Is the data correctly written to the topics? Or did I make a mistake somewhere that I need to fix?