I want to read messages published to a kafka topic (in this case first_topic
) and feed those messages into a neural network with tensorflow. To achieve this I am using the tfio.experimental.streaming.KafkaBatchIODataset
module with the following configuration:
online_train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["first_topic"],
group_id="",
servers="kafka:9092",
stream_timeout=30000,
message_poll_timeout=1000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
When testing the configuration I always recieve the following command output in python:
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels:1001] Local: Timed out
...
Only at the end, when the timeout period is reached, the messages sent to kafka are processed.
What I want to do is process all messages from the kafka topic until no message is sent for 30s similar to this article: . At the moment I recieve the time out for about 30s and then all messages are processed and the script exits.
My kafka broker runs as a docker container with this configuration:
services:
kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_CONTROLLER_BROKER_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
ports:
- '9092:9092'
- '9093:9093'
What am I doing wrong?