I'm trying to run this simplified example program:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github/google/uuid"
"github/segmentio/kafka-go"
)
func main() {
addr := kafka.TCP("localhost:9092")
topic := "topic-" + uuid.New().String()
client := &kafka.Client{
Addr: addr,
Timeout: 10 * time.Second,
}
if _, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
Addr: addr,
Topics: []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
},
}); err != nil {
log.Fatalf("create topic: %v", err)
}
log.Println("Created topic", topic)
writer := &kafka.Writer{
Addr: addr,
Topic: topic,
}
writeMessage := func(msg string) {
if err := writer.WriteMessages(context.Background(), kafka.Message{
Value: []byte(msg),
}); err != nil {
log.Fatalf("write messages: %v", err)
}
}
writeMessage("one")
writeMessage("two")
listOffsetsResponse, err := client.ListOffsets(context.Background(), &kafka.ListOffsetsRequest{
Addr: addr,
Topics: map[string][]kafka.OffsetRequest{
topic: {
{Partition: 0},
},
},
})
if err != nil {
log.Fatalf("list offsets: %v", err)
}
fmt.Println(jsonMarshal(listOffsetsResponse.Topics[topic]))
}
func jsonMarshal(v any) string {
b, err := json.Marshal(v)
if err != nil {
log.Fatalf("marshal %T to JSON: %v", v, err)
}
return string(b)
}
where I'm running Bitnami's Kafka container in KRaft mode like so,
docker run -p 9092:9092 \
-e KAFKA_CFG_NODE_ID=1 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
bitnami/kafka:latest
What the program prints is:
> go run . | jq
2025/03/14 09:54:05 Created topic topic-9f6f0f60-3dfa-4d24-ba44-674c16165798
[
{
"Partition": 0,
"FirstOffset": -1,
"LastOffset": -1,
"Offsets": {
"0": "0001-01-01T00:00:00Z"
},
"Error": null
}
]
In short, the Offsets
shows that partition 0
has a value that is Go's zero time. It's not immediately clear to me from the source code, .go#L37-L45, what the Offsets
field should represent, but if I've produced two messages to the partition, shouldn't the offset be 2 and show a non-zero time of when the message was produced?
Update: some time later, I ran kafka-get-offsets
(from /opt/homebrew/Cellar/kafka/3.9.0/bin
after running brew install kafka
on my macOS Apple Silicon laptop) and it does seem to indicate the offset being 2
:
> ./kafka-get-offsets --bootstrap-server localhost:9092 --topic topic-9f6f0f60-3dfa-4d24-ba44-674c16165798
topic-9f6f0f60-3dfa-4d24-ba44-674c16165798:0:2
I'm trying to run this simplified example program:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github/google/uuid"
"github/segmentio/kafka-go"
)
func main() {
addr := kafka.TCP("localhost:9092")
topic := "topic-" + uuid.New().String()
client := &kafka.Client{
Addr: addr,
Timeout: 10 * time.Second,
}
if _, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
Addr: addr,
Topics: []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
},
}); err != nil {
log.Fatalf("create topic: %v", err)
}
log.Println("Created topic", topic)
writer := &kafka.Writer{
Addr: addr,
Topic: topic,
}
writeMessage := func(msg string) {
if err := writer.WriteMessages(context.Background(), kafka.Message{
Value: []byte(msg),
}); err != nil {
log.Fatalf("write messages: %v", err)
}
}
writeMessage("one")
writeMessage("two")
listOffsetsResponse, err := client.ListOffsets(context.Background(), &kafka.ListOffsetsRequest{
Addr: addr,
Topics: map[string][]kafka.OffsetRequest{
topic: {
{Partition: 0},
},
},
})
if err != nil {
log.Fatalf("list offsets: %v", err)
}
fmt.Println(jsonMarshal(listOffsetsResponse.Topics[topic]))
}
func jsonMarshal(v any) string {
b, err := json.Marshal(v)
if err != nil {
log.Fatalf("marshal %T to JSON: %v", v, err)
}
return string(b)
}
where I'm running Bitnami's Kafka container in KRaft mode like so,
docker run -p 9092:9092 \
-e KAFKA_CFG_NODE_ID=1 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
bitnami/kafka:latest
What the program prints is:
> go run . | jq
2025/03/14 09:54:05 Created topic topic-9f6f0f60-3dfa-4d24-ba44-674c16165798
[
{
"Partition": 0,
"FirstOffset": -1,
"LastOffset": -1,
"Offsets": {
"0": "0001-01-01T00:00:00Z"
},
"Error": null
}
]
In short, the Offsets
shows that partition 0
has a value that is Go's zero time. It's not immediately clear to me from the source code, https://github/segmentio/kafka-go/blob/a558bb8629a8f5a920a8e1bb640064ad7efb7cd8/listoffset.go#L37-L45, what the Offsets
field should represent, but if I've produced two messages to the partition, shouldn't the offset be 2 and show a non-zero time of when the message was produced?
Update: some time later, I ran kafka-get-offsets
(from /opt/homebrew/Cellar/kafka/3.9.0/bin
after running brew install kafka
on my macOS Apple Silicon laptop) and it does seem to indicate the offset being 2
:
> ./kafka-get-offsets --bootstrap-server localhost:9092 --topic topic-9f6f0f60-3dfa-4d24-ba44-674c16165798
topic-9f6f0f60-3dfa-4d24-ba44-674c16165798:0:2
Share
Improve this question
edited Mar 14 at 17:32
Kurt Peek
asked Mar 14 at 17:19
Kurt PeekKurt Peek
57.9k104 gold badges346 silver badges564 bronze badges
1 Answer
Reset to default 0The default for Writer.RequiredAcks
is RequireNone
, so your writes might still be in progress when you call Client.ListOffsets
.
Try adding a delay or setting RequiredAcks
to RequireAll
to observe the effect on the offsets.