最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

apache kafka - Stream processing with flink using BroadcastStream is providing inconsistent results - Stack Overflow

programmeradmin4浏览0评论
// Define the incoming raw data stream sourcing from kafka topic
DataStream<RawMessage> mainStream = env.addSource(...);

// Define the reference data stream sourcing from kafka topic
DataStream<ReferenceData> referenceDataStream = env.addSource(...);

// Define the broadcast state descriptor
MapStateDescriptor<String, ReferenceData> broadcastStateDescriptor = new MapStateDescriptor<>(
        "BroadcastReferenceState",
        BasicTypeInfo.STRING_TYPE_INFO,
        TypeInformation.of(ReferenceData.class)
);

// Broadcast the reference data stream
BroadcastStream<ReferenceData> broadcastStream = referenceDataStream.broadcast(broadcastStateDescriptor);

// Connect the raw data stream with the broadcast stream
mainStream
    .keyBy(raw -> raw.getKey()) // Key by the same key used for reference
    .connect(broadcastStream)
    .process(new KeyedBroadcastProcessFunction<String, mainMessage, ReferenceData, EnrichedMainMessage>() {

        // Keyed state to store the reference data
        private transient MapState<String, ReferenceData> keyedReferenceState;
        private ListState<ObjectNode> bufferedMessages;

        @Override
        public void open(Configuration parameters) {
            // Initialize the keyed state
            MapStateDescriptor<String, ReferenceData> keyedStateDescriptor = new MapStateDescriptor<>(
                "KeyedReferenceState",
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(ReferenceData.class)
            );
            keyedReferenceState = getRuntimeContext().getMapState(keyedStateDescriptor);
            ListStateDescriptor<ObjectNode> bufferedMessagesDescriptor = new ListStateDescriptor<>("bufferedMessages", String.class);
            bufferedMessages = getRuntimeContext().getListState(bufferedMessagesDescriptor);
        }

        @Override
        public void processElement(MainMessage mainMessage, ReadOnlyContext ctx, Collector<EnrichedMainMessage> out) throws Exception {
            // Lookup the reference data in the keyed state
            ReferenceData referenceData = keyedReferenceState.get(mainMessage.getKey());

            if (referenceData != null) {
                // Enrich the main message with the reference data
                EnrichedMainMessage enrichedMainMessage = new EnrichedMainMessage(mainMessage, referenceData);
                // Emit the enriched message
                out.collect(enrichedMainMessage);
            } else {
                // Handle the case where reference data is not available 
                // by buffering the main message to the listState
                // re-process the messages in the bufferedMessages list
                // if referenceData is not null
                **bufferedMessages.add(cleanMessage);**
            }
        }

        @Override
        public void processBroadcastElement(ReferenceData referenceData, Context ctx, Collector<EnrichedMainMessage> out) throws Exception {
            // Update the keyed state with the latest reference data
            keyedReferenceState.put(referenceData.getKey(), referenceData);
        }
    });

The above is a pseudocode for how we are currently processing our main stream data with the reference data that we get every one hour. We source the reference data from five different kafka topics and then we broadcast them using a BroadcastStream API. We are doing this to replicate the globalKTable functionality in kafka streams. We process data using the KeyedBroadcastProcessFunction where we use a MapState to put the key and its corresponding reference data in the processBroadcastElement and then we process every element in the main stream in the processElement by passing the key from main stream to the Mapstate and retrieving the corresponding reference data which is then used to enrich/join with the main stream. We are currently facing two issues with this approach

  1. Reference data is not readily available during startup thereby causing some of the messages to be dropped or not joined.
  2. To circumvent the above issue we implemented our own **buffering ** mechanism using listState to buffer the initial set of messages from the main stream hoping that the reference data is caught up and ready for join. The listState keeps growing with the messages from main stream before crashing with the below exceptions which is seen at different intervals i.e Jobs restarts with the following exceptions randomly after a couple of minutes:

java.lang.OutOfMemoryError: Java heap space (This happens even after setting the TTL to clear the buffer to 2 hours)

java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: .apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id 172.XX.XX.XX:6XXX-8XXXX (ip) is no longer reachable.

Can you advise if broadcasting the reference data using BroadcastStream api is the right approach for this use case? Also, is there a Kafka streams GlobalKTable equivalent in flink?

We tried to buffer the main stream using listState to allow the reference data in the broadcast stream Map state to catch up (expectation). However, the join is failing due to the reference data not being caught up and throws additional exceptions on startup after a couple of minutes as mentioned above. Thereby, Application crashes and is unstable.

// Define the incoming raw data stream sourcing from kafka topic
DataStream<RawMessage> mainStream = env.addSource(...);

// Define the reference data stream sourcing from kafka topic
DataStream<ReferenceData> referenceDataStream = env.addSource(...);

// Define the broadcast state descriptor
MapStateDescriptor<String, ReferenceData> broadcastStateDescriptor = new MapStateDescriptor<>(
        "BroadcastReferenceState",
        BasicTypeInfo.STRING_TYPE_INFO,
        TypeInformation.of(ReferenceData.class)
);

// Broadcast the reference data stream
BroadcastStream<ReferenceData> broadcastStream = referenceDataStream.broadcast(broadcastStateDescriptor);

// Connect the raw data stream with the broadcast stream
mainStream
    .keyBy(raw -> raw.getKey()) // Key by the same key used for reference
    .connect(broadcastStream)
    .process(new KeyedBroadcastProcessFunction<String, mainMessage, ReferenceData, EnrichedMainMessage>() {

        // Keyed state to store the reference data
        private transient MapState<String, ReferenceData> keyedReferenceState;
        private ListState<ObjectNode> bufferedMessages;

        @Override
        public void open(Configuration parameters) {
            // Initialize the keyed state
            MapStateDescriptor<String, ReferenceData> keyedStateDescriptor = new MapStateDescriptor<>(
                "KeyedReferenceState",
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(ReferenceData.class)
            );
            keyedReferenceState = getRuntimeContext().getMapState(keyedStateDescriptor);
            ListStateDescriptor<ObjectNode> bufferedMessagesDescriptor = new ListStateDescriptor<>("bufferedMessages", String.class);
            bufferedMessages = getRuntimeContext().getListState(bufferedMessagesDescriptor);
        }

        @Override
        public void processElement(MainMessage mainMessage, ReadOnlyContext ctx, Collector<EnrichedMainMessage> out) throws Exception {
            // Lookup the reference data in the keyed state
            ReferenceData referenceData = keyedReferenceState.get(mainMessage.getKey());

            if (referenceData != null) {
                // Enrich the main message with the reference data
                EnrichedMainMessage enrichedMainMessage = new EnrichedMainMessage(mainMessage, referenceData);
                // Emit the enriched message
                out.collect(enrichedMainMessage);
            } else {
                // Handle the case where reference data is not available 
                // by buffering the main message to the listState
                // re-process the messages in the bufferedMessages list
                // if referenceData is not null
                **bufferedMessages.add(cleanMessage);**
            }
        }

        @Override
        public void processBroadcastElement(ReferenceData referenceData, Context ctx, Collector<EnrichedMainMessage> out) throws Exception {
            // Update the keyed state with the latest reference data
            keyedReferenceState.put(referenceData.getKey(), referenceData);
        }
    });

The above is a pseudocode for how we are currently processing our main stream data with the reference data that we get every one hour. We source the reference data from five different kafka topics and then we broadcast them using a BroadcastStream API. We are doing this to replicate the globalKTable functionality in kafka streams. We process data using the KeyedBroadcastProcessFunction where we use a MapState to put the key and its corresponding reference data in the processBroadcastElement and then we process every element in the main stream in the processElement by passing the key from main stream to the Mapstate and retrieving the corresponding reference data which is then used to enrich/join with the main stream. We are currently facing two issues with this approach

  1. Reference data is not readily available during startup thereby causing some of the messages to be dropped or not joined.
  2. To circumvent the above issue we implemented our own **buffering ** mechanism using listState to buffer the initial set of messages from the main stream hoping that the reference data is caught up and ready for join. The listState keeps growing with the messages from main stream before crashing with the below exceptions which is seen at different intervals i.e Jobs restarts with the following exceptions randomly after a couple of minutes:

java.lang.OutOfMemoryError: Java heap space (This happens even after setting the TTL to clear the buffer to 2 hours)

java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: .apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id 172.XX.XX.XX:6XXX-8XXXX (ip) is no longer reachable.

Can you advise if broadcasting the reference data using BroadcastStream api is the right approach for this use case? Also, is there a Kafka streams GlobalKTable equivalent in flink?

We tried to buffer the main stream using listState to allow the reference data in the broadcast stream Map state to catch up (expectation). However, the join is failing due to the reference data not being caught up and throws additional exceptions on startup after a couple of minutes as mentioned above. Thereby, Application crashes and is unstable.

Share Improve this question edited yesterday Ajeet Verma 3,0563 gold badges16 silver badges27 bronze badges asked yesterday Hariharan JanakiramanHariharan Janakiraman 1 New contributor Hariharan Janakiraman is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.
Add a comment  | 

1 Answer 1

Reset to default 0

Since the reference data being broadcast is being accessed using the same key used to key partition the main stream (via keyBy), I don't see any compelling reason to use broadcast.

For an alternative implementation based on a KeyedCoProcessFunction, see https://github/confluentinc/flink-cookbook/tree/master/enrichment-join-with-buffering.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论