最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

ksqldb - The 'KAFKA' format does not support type 'STRUCT' when trying to create STREAM with STR

programmeradmin1浏览0评论

I get the following error when trying create a stream in KSQL that has a STRUCT as the key:

Key format does not support schema.
format: KAFKA
schema: Persistence{columns=[`topic` STRUCT<`topic` STRING, `ID` INTEGER> KEY], features=[]}
reason: The 'KAFKA' format does not support type 'STRUCT'

The output (from Lenses MQTT Source Connector using ByteArrayConverter):

kcat -C -b broker:29092 -t t10 -f 'Key is %k, and message payload is: %s \n'

Key is Struct{topic=m/bgmdev/af1/t5/test,id=0}, and message payload is: {"val":39}
Key is Struct{topic=m/bgmdev/af1/t5/test,id=0}, and message payload is: {"val":40}

tried creating stream with ksql: Note that topic is reserved word so had to use back ticks:

 CREATE STREAM TEST50 (`topic` STRUCT<`topic` VARCHAR, id INT> KEY, val INT)
  WITH (KAFKA_TOPIC='t10', PARTITIONS=1, VALUE_FORMAT='JSON');

error:

Key format does not support schema.
format: KAFKA
schema: Persistence{columns=[`topic` STRUCT<`topic` STRING, `ID` INTEGER> KEY], features=[]}
reason: The 'KAFKA' format does not support type 'STRUCT'

I followed format from:

/

Create a stream with a struct key in KSQL

Also noticed this from a much previous version:

I also tried to force the KEY format as JSON:

 CREATE STREAM TEST50 (`topic` STRUCT<`topic` VARCHAR, id INT> KEY, val INT)
  WITH (KAFKA_TOPIC='t10', PARTITIONS=1, KEY_FORMAT='JSON', VALUE_FORMAT='JSON');
ksql> describe TEST51;

Name                 : TEST51
 Field | Type
---------------------------------------------------------
 topic | STRUCT<topic VARCHAR(STRING), ID INTEGER> (key)
 VAL   | INTEGER
---------------------------------------------------------

But I don't get any messages coming in and no errors in the logs. Even turned up logging levels. I'm sure I'm doing something wrong with creating the STRUCT.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论