最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Kafka streams sink message num as the program runs more and more - Stack Overflow

programmeradmin3浏览0评论

Code:

Map<String, KStream<String, MqttMessageDto>> splits = sizeFilter
                .split(Named.as("size-"))
                .branch((key, value) -> value.getMessageSize() <= 1024, Branched.as("1kb"))
                .branch((key, value) -> value.getMessageSize() > 1024 && value.getMessageSize() <= 1024 << 3, Branched.as("8kb"))
                .branch((key, value) -> value.getMessageSize() > 1024 << 3 && value.getMessageSize() <= 1024 << 4, Branched.as("16kb"))
                .branch((key, value) -> value.getMessageSize() > 1024 << 4 && value.getMessageSize() <= 1024 << 5, Branched.as("32kb"))
                .branch((key, value) -> value.getMessageSize() > 1024 << 5 && value.getMessageSize() <= 1024 << 6, Branched.as("64kb"))
                .branch((key, value) -> value.getMessageSize() > 1024 << 6, Branched.as("other"))
                .noDefaultBranch();
        splits.forEach((branchedName, stream) -> {
            KStream<Windowed<String>, Long> sizeStream =  stream
                    .groupBy((key, value) -> value.getProductId())
                    .windowedBy(
                        TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(TIME_WINDOW_SECONDS))
                                .advanceBy(Duration.ofSeconds(ADVANCED_BY_SECONDS)))
                    .count(Named.as(branchedName + "-count")).toStream();
            mqttMessageSizeSink.sink(sizeStream, branchedName);
        });

Sink rate: mqtt-message-size message count

The productId is single-digit, I expect sink message rate remain basically unchanged, How can i do?

发布评论

评论列表(0)

  1. 暂无评论