I get the following error when trying create a stream in KSQL that has a STRUCT as the key:
Key format does not support schema.
format: KAFKA
schema: Persistence{columns=[`topic` STRUCT<`topic` STRING, `ID` INTEGER> KEY], features=[]}
reason: The 'KAFKA' format does not support type 'STRUCT'
The output (from Lenses MQTT Source Connector using ByteArrayConverter):
kcat -C -b broker:29092 -t t10 -f 'Key is %k, and message payload is: %s \n'
Key is Struct{topic=m/bgmdev/af1/t5/test,id=0}, and message payload is: {"val":39}
Key is Struct{topic=m/bgmdev/af1/t5/test,id=0}, and message payload is: {"val":40}
tried creating stream with ksql: Note that topic is reserved word so had to use back ticks:
CREATE STREAM TEST50 (`topic` STRUCT<`topic` VARCHAR, id INT> KEY, val INT)
WITH (KAFKA_TOPIC='t10', PARTITIONS=1, VALUE_FORMAT='JSON');
error:
Key format does not support schema.
format: KAFKA
schema: Persistence{columns=[`topic` STRUCT<`topic` STRING, `ID` INTEGER> KEY], features=[]}
reason: The 'KAFKA' format does not support type 'STRUCT'
I followed format from:
/
Create a stream with a struct key in KSQL
Also noticed this from a much previous version:
I also tried to force the KEY format as JSON:
CREATE STREAM TEST50 (`topic` STRUCT<`topic` VARCHAR, id INT> KEY, val INT)
WITH (KAFKA_TOPIC='t10', PARTITIONS=1, KEY_FORMAT='JSON', VALUE_FORMAT='JSON');
ksql> describe TEST51;
Name : TEST51
Field | Type
---------------------------------------------------------
topic | STRUCT<topic VARCHAR(STRING), ID INTEGER> (key)
VAL | INTEGER
---------------------------------------------------------
But I don't get any messages coming in and no errors in the logs. Even turned up logging levels. I'm sure I'm doing something wrong with creating the STRUCT.