最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - Kafka Streams send custom headers with Transformer when multiple messages output - Stack Overflow

programmeradmin1浏览0评论

I want to send multiple messages downstream using Transformer (kafka streams dsl)

    private ProcessorContext context;


    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

@Override
    public KeyValue<String, Map<String, String>> transform(String s, String msg) {
        Headers headers = context.headers();
        for (int i = 0; i < 2; i++) {
            headers.add("test", "test".getBytes());
            headers.add("meta", "test".getBytes());

            context.forward("new key", msg);
        }
        return null;
    }

On downstream there would be TopicNameExtractor

.transform(TestTransformer::new)
                .to(orderTopicNameExtractor, Produced.with(Serdes.serdeFrom(String.class),
                        Serdes.serdeFrom(String.class)));

Where I need to use data from header (there would be topic name), also other headers must be written to Kafka to.

The problem is, that on second message next step is receiving multiple headers (so there would be 2 "test" keys), only way is to remove them before adding in transform cycle. It works locally, but what about thread safety and high throughput? Context seems to be shared, can there be a situation where remove() will remove header before downstream extractor processes a message?

headers.remove("meta");
headers.add("meta", "test".getBytes());
context.forward("new key", msg);

Is there any more right way to solve this problem? The only way i think maybe will work is to use Process interface and send message with withHeaders(new RecordHeaders()), or just add them to message directly

I want to send multiple messages downstream using Transformer (kafka streams dsl)

    private ProcessorContext context;


    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

@Override
    public KeyValue<String, Map<String, String>> transform(String s, String msg) {
        Headers headers = context.headers();
        for (int i = 0; i < 2; i++) {
            headers.add("test", "test".getBytes());
            headers.add("meta", "test".getBytes());

            context.forward("new key", msg);
        }
        return null;
    }

On downstream there would be TopicNameExtractor

.transform(TestTransformer::new)
                .to(orderTopicNameExtractor, Produced.with(Serdes.serdeFrom(String.class),
                        Serdes.serdeFrom(String.class)));

Where I need to use data from header (there would be topic name), also other headers must be written to Kafka to.

The problem is, that on second message next step is receiving multiple headers (so there would be 2 "test" keys), only way is to remove them before adding in transform cycle. It works locally, but what about thread safety and high throughput? Context seems to be shared, can there be a situation where remove() will remove header before downstream extractor processes a message?

headers.remove("meta");
headers.add("meta", "test".getBytes());
context.forward("new key", msg);

Is there any more right way to solve this problem? The only way i think maybe will work is to use Process interface and send message with withHeaders(new RecordHeaders()), or just add them to message directly

Share Improve this question asked Nov 20, 2024 at 9:18 xmm_581xmm_581 416 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

Header support is somewhat limited... I would recommend to move to the new KStream.process(...) which takes an api.Processor instead of the old (and already deprecated transform()). It should handle header better.

发布评论

评论列表(0)

  1. 暂无评论