I want to send multiple messages downstream using Transformer (kafka streams dsl)
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, Map<String, String>> transform(String s, String msg) {
Headers headers = context.headers();
for (int i = 0; i < 2; i++) {
headers.add("test", "test".getBytes());
headers.add("meta", "test".getBytes());
context.forward("new key", msg);
}
return null;
}
On downstream there would be TopicNameExtractor
.transform(TestTransformer::new)
.to(orderTopicNameExtractor, Produced.with(Serdes.serdeFrom(String.class),
Serdes.serdeFrom(String.class)));
Where I need to use data from header (there would be topic name), also other headers must be written to Kafka to.
The problem is, that on second message next step is receiving multiple headers (so there would be 2 "test" keys), only way is to remove them before adding in transform cycle. It works locally, but what about thread safety and high throughput? Context seems to be shared, can there be a situation where remove() will remove header before downstream extractor processes a message?
headers.remove("meta");
headers.add("meta", "test".getBytes());
context.forward("new key", msg);
Is there any more right way to solve this problem? The only way i think maybe will work is to use Process interface and send message with withHeaders(new RecordHeaders()), or just add them to message directly
I want to send multiple messages downstream using Transformer (kafka streams dsl)
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, Map<String, String>> transform(String s, String msg) {
Headers headers = context.headers();
for (int i = 0; i < 2; i++) {
headers.add("test", "test".getBytes());
headers.add("meta", "test".getBytes());
context.forward("new key", msg);
}
return null;
}
On downstream there would be TopicNameExtractor
.transform(TestTransformer::new)
.to(orderTopicNameExtractor, Produced.with(Serdes.serdeFrom(String.class),
Serdes.serdeFrom(String.class)));
Where I need to use data from header (there would be topic name), also other headers must be written to Kafka to.
The problem is, that on second message next step is receiving multiple headers (so there would be 2 "test" keys), only way is to remove them before adding in transform cycle. It works locally, but what about thread safety and high throughput? Context seems to be shared, can there be a situation where remove() will remove header before downstream extractor processes a message?
headers.remove("meta");
headers.add("meta", "test".getBytes());
context.forward("new key", msg);
Is there any more right way to solve this problem? The only way i think maybe will work is to use Process interface and send message with withHeaders(new RecordHeaders()), or just add them to message directly
Share Improve this question asked Nov 20, 2024 at 9:18 xmm_581xmm_581 416 bronze badges1 Answer
Reset to default 0Header support is somewhat limited... I would recommend to move to the new KStream.process(...)
which takes an api.Processor
instead of the old (and already deprecated transform()
). It should handle header better.