最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Why does Kafka Go's ListOffsets method show the zero time for the Offsets after producing messages to the topic? - Stack

programmeradmin1浏览0评论

I'm trying to run this simplified example program:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github/google/uuid"
    "github/segmentio/kafka-go"
)

func main() {
    addr := kafka.TCP("localhost:9092")
    topic := "topic-" + uuid.New().String()

    client := &kafka.Client{
        Addr:    addr,
        Timeout: 10 * time.Second,
    }

    if _, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
        Addr: addr,
        Topics: []kafka.TopicConfig{
            {
                Topic:             topic,
                NumPartitions:     1,
                ReplicationFactor: 1,
            },
        },
    }); err != nil {
        log.Fatalf("create topic: %v", err)
    }
    log.Println("Created topic", topic)

    writer := &kafka.Writer{
        Addr:  addr,
        Topic: topic,
    }

    writeMessage := func(msg string) {
        if err := writer.WriteMessages(context.Background(), kafka.Message{
            Value: []byte(msg),
        }); err != nil {
            log.Fatalf("write messages: %v", err)
        }
    }
    writeMessage("one")
    writeMessage("two")

    listOffsetsResponse, err := client.ListOffsets(context.Background(), &kafka.ListOffsetsRequest{
        Addr: addr,
        Topics: map[string][]kafka.OffsetRequest{
            topic: {
                {Partition: 0},
            },
        },
    })
    if err != nil {
        log.Fatalf("list offsets: %v", err)
    }

    fmt.Println(jsonMarshal(listOffsetsResponse.Topics[topic]))
}

func jsonMarshal(v any) string {
    b, err := json.Marshal(v)
    if err != nil {
        log.Fatalf("marshal %T to JSON: %v", v, err)
    }
    return string(b)
}

where I'm running Bitnami's Kafka container in KRaft mode like so,

docker run -p 9092:9092 \
    -e KAFKA_CFG_NODE_ID=1 \
    -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
    -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
    -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
    -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
    -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
    -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
    bitnami/kafka:latest

What the program prints is:

> go run . | jq
2025/03/14 09:54:05 Created topic topic-9f6f0f60-3dfa-4d24-ba44-674c16165798
[
  {
    "Partition": 0,
    "FirstOffset": -1,
    "LastOffset": -1,
    "Offsets": {
      "0": "0001-01-01T00:00:00Z"
    },
    "Error": null
  }
]

In short, the Offsets shows that partition 0 has a value that is Go's zero time. It's not immediately clear to me from the source code, .go#L37-L45, what the Offsets field should represent, but if I've produced two messages to the partition, shouldn't the offset be 2 and show a non-zero time of when the message was produced?

Update: some time later, I ran kafka-get-offsets (from /opt/homebrew/Cellar/kafka/3.9.0/bin after running brew install kafka on my macOS Apple Silicon laptop) and it does seem to indicate the offset being 2:

> ./kafka-get-offsets --bootstrap-server localhost:9092 --topic topic-9f6f0f60-3dfa-4d24-ba44-674c16165798
topic-9f6f0f60-3dfa-4d24-ba44-674c16165798:0:2

I'm trying to run this simplified example program:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github/google/uuid"
    "github/segmentio/kafka-go"
)

func main() {
    addr := kafka.TCP("localhost:9092")
    topic := "topic-" + uuid.New().String()

    client := &kafka.Client{
        Addr:    addr,
        Timeout: 10 * time.Second,
    }

    if _, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
        Addr: addr,
        Topics: []kafka.TopicConfig{
            {
                Topic:             topic,
                NumPartitions:     1,
                ReplicationFactor: 1,
            },
        },
    }); err != nil {
        log.Fatalf("create topic: %v", err)
    }
    log.Println("Created topic", topic)

    writer := &kafka.Writer{
        Addr:  addr,
        Topic: topic,
    }

    writeMessage := func(msg string) {
        if err := writer.WriteMessages(context.Background(), kafka.Message{
            Value: []byte(msg),
        }); err != nil {
            log.Fatalf("write messages: %v", err)
        }
    }
    writeMessage("one")
    writeMessage("two")

    listOffsetsResponse, err := client.ListOffsets(context.Background(), &kafka.ListOffsetsRequest{
        Addr: addr,
        Topics: map[string][]kafka.OffsetRequest{
            topic: {
                {Partition: 0},
            },
        },
    })
    if err != nil {
        log.Fatalf("list offsets: %v", err)
    }

    fmt.Println(jsonMarshal(listOffsetsResponse.Topics[topic]))
}

func jsonMarshal(v any) string {
    b, err := json.Marshal(v)
    if err != nil {
        log.Fatalf("marshal %T to JSON: %v", v, err)
    }
    return string(b)
}

where I'm running Bitnami's Kafka container in KRaft mode like so,

docker run -p 9092:9092 \
    -e KAFKA_CFG_NODE_ID=1 \
    -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
    -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
    -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
    -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
    -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
    -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
    bitnami/kafka:latest

What the program prints is:

> go run . | jq
2025/03/14 09:54:05 Created topic topic-9f6f0f60-3dfa-4d24-ba44-674c16165798
[
  {
    "Partition": 0,
    "FirstOffset": -1,
    "LastOffset": -1,
    "Offsets": {
      "0": "0001-01-01T00:00:00Z"
    },
    "Error": null
  }
]

In short, the Offsets shows that partition 0 has a value that is Go's zero time. It's not immediately clear to me from the source code, https://github/segmentio/kafka-go/blob/a558bb8629a8f5a920a8e1bb640064ad7efb7cd8/listoffset.go#L37-L45, what the Offsets field should represent, but if I've produced two messages to the partition, shouldn't the offset be 2 and show a non-zero time of when the message was produced?

Update: some time later, I ran kafka-get-offsets (from /opt/homebrew/Cellar/kafka/3.9.0/bin after running brew install kafka on my macOS Apple Silicon laptop) and it does seem to indicate the offset being 2:

> ./kafka-get-offsets --bootstrap-server localhost:9092 --topic topic-9f6f0f60-3dfa-4d24-ba44-674c16165798
topic-9f6f0f60-3dfa-4d24-ba44-674c16165798:0:2
Share Improve this question edited Mar 14 at 17:32 Kurt Peek asked Mar 14 at 17:19 Kurt PeekKurt Peek 57.9k104 gold badges346 silver badges564 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

The default for Writer.RequiredAcks is RequireNone, so your writes might still be in progress when you call Client.ListOffsets.

Try adding a delay or setting RequiredAcks to RequireAll to observe the effect on the offsets.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论