Code:
Map<String, KStream<String, MqttMessageDto>> splits = sizeFilter
.split(Named.as("size-"))
.branch((key, value) -> value.getMessageSize() <= 1024, Branched.as("1kb"))
.branch((key, value) -> value.getMessageSize() > 1024 && value.getMessageSize() <= 1024 << 3, Branched.as("8kb"))
.branch((key, value) -> value.getMessageSize() > 1024 << 3 && value.getMessageSize() <= 1024 << 4, Branched.as("16kb"))
.branch((key, value) -> value.getMessageSize() > 1024 << 4 && value.getMessageSize() <= 1024 << 5, Branched.as("32kb"))
.branch((key, value) -> value.getMessageSize() > 1024 << 5 && value.getMessageSize() <= 1024 << 6, Branched.as("64kb"))
.branch((key, value) -> value.getMessageSize() > 1024 << 6, Branched.as("other"))
.noDefaultBranch();
splits.forEach((branchedName, stream) -> {
KStream<Windowed<String>, Long> sizeStream = stream
.groupBy((key, value) -> value.getProductId())
.windowedBy(
TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(TIME_WINDOW_SECONDS))
.advanceBy(Duration.ofSeconds(ADVANCED_BY_SECONDS)))
.count(Named.as(branchedName + "-count")).toStream();
mqttMessageSizeSink.sink(sizeStream, branchedName);
});
Sink rate: mqtt-message-size message count
The productId is single-digit, I expect sink message rate remain basically unchanged, How can i do?