最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

mongodb - The Confluent Kafka Connector is not creating a topic to produce the payload - Stack Overflow

programmeradmin0浏览0评论

I'm using Confluent’s Kafka Connect MongoDB Source Connector in a Docker Compose environment. I expect the connector to create an output topic based on my configuration, but it isn’t creating any new topic nor publishing any data.

Environment:

Kafka Version: Confluent Platform 7.6.1 (running in Docker) Connector: MongoDB Source Connector (version 1.15.0) Broker: Accessible via broker:29092 within the Docker network Schema Registry: Running and accessible

Connector Configuration:

{
  "name": "mongo-source-connector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "tasks.max": "3",
    "connection.uri": "${MONGODB_CONNECTION_URL}",
    "database": "test",
    "collection": "testCollection",
    "topic.prefix": "aaaaaaaa",
    "pipeline": "[{\"$match\": {\"operationType\": { \"$in\": [\"insert\", \"update\", \"replace\"] }}}]"
  }
}

docker-compose:

  broker:
    image: confluentinc/cp-kafka:7.6.1
    hostname: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      CLUSTER_ID: ''

connect:
    build:
      context: .
      dockerfile: Dockerfile
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status-compact
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: .apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: .apache.kafka.connect.json.JsonConverter
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: ".apache.kafka.connect.runtime.rest=WARN,.reflections=ERROR,com.mongodb.kafka=DEBUG"
      CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
      MONGODB_CONNECTION_URL: mongodb://host.docker.internal:27017
      CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: "15000"
      CONNECT_WORKER_SHUTDOWN_GRACEFUL_TIMEOUT_MS: "15000"

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.1
    hostname: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'


networks:
  localnet:
    attachable: true

I expect the connector to create an output topic named mytopic.test.testCollection (or similar, based on database and collection), but the topic is never created. I have verified that the internal topics (like docker-connect-offsets, docker-connect-configs, and docker-connect-status) are present, yet there is no topic corresponding to the expected output.

The connector logs indicate that it is running, but there are repeated warnings about failing to resume the change stream with an “Unrecognized pipeline stage name: ''”. It appears the connector may be attempting to resume using an old offset or resume token.

I attempted to reset offsets by either changing the offset.partition.name in the connector configuration and manually clearing the offsets via Kafka topics commands, but even then the output topic isn’t being created.

发布评论

评论列表(0)

  1. 暂无评论