We are trying to consume Protobuf-encoded messages from a Kafka topic using PyFlink. However, we are encountering message parsing errors when attempting to decode the messages later in our pipeline.
Kafka Connector Configuration: We are using the following KafkaSource configuration:
return (
KafkaSource.builder()
.set_topics("frames")
.set_properties(properties)
.set_starting_offsets(offset)
.set_value_only_deserializer(SimpleStringSchema()) # Issue?
.build()
)
The issue arises because SimpleStringSchema() automatically converts the incoming message to a string.
Attempted Fix (Converting String Back to Bytes): We attempted to convert the received string back into bytes before parsing:
if isinstance(data, str):
try:
data_bytes = data.encode("UTF-8")
except json.JSONDecodeError:
logger.info("Not valid JSON, trying direct bytes conversion")
data = data.encode("utf-8")
frame_message = FrameChunk()
frame_message.ParseFromString(data_bytes) # erroring out here
frame_dict = MessageToDict(frame_message,
preserving_proto_field_name=True)
However, when calling ParseFromString(data_bytes), we encounter a message parsing error, suggesting the byte conversion is incorrect.
- Is SimpleStringSchema() the correct choice for consuming Protobuf messages in PyFlink?
- How can we properly deserialize Protobuf messages directly from Kafka without unwanted string conversion?
- What is the best way to ensure the byte format is preserved during message consumption?
Would appreciate any insights or suggestions!