最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

apache kafka - Spark Streaming error: Required attribute 'value' not found - Stack Overflow

programmeradmin3浏览0评论

I'm trying to realize to-protobuf transformation inside Spark Streaming code, which read data from kafka topic.

Income dataframe:

 readStreamFromKafka(config).writeStream
    .foreachBatch { (kafkaBatch: DataFrame, batchId: Long) =>

    kafkaBatch.printSchema()

has following schema:

  root
   |-- key: binary (nullable = true)
   |-- value: binary (nullable = true)
   |-- topic: string (nullable = true)
   |-- partition: integer (nullable = true)
   |-- offset: long (nullable = true)
   |-- timestamp: timestamp (nullable = true)
   |-- timestampType: integer (nullable = true)
   |-- headers: array (nullable = true)
   |    |-- element: struct (containsNull = true)
   |    |    |-- key: string (nullable = true)
   |    |    |-- value: binary (nullable = true)

kafkaBatch.show(truncate = false) return not null "value":

+----+-----------       ------+
|key |value        ....       |
+----+-----------       ------+
|null|[C0 E2 4F    .... E2 4F |
+----+-----------       ------+

Logical plan is:

   StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], .apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ad7b8e9, KafkaV2[Subscribe[bg-intermediate]]

But my code:

  kafkaBatch
        .select(
          from_avro(
            col("value"),
            readSchemaFromResource("/schemas/avro/schema1.avsc")
          ).as("value")
        )
        .select(
          to_protobuf(
            col("value"),
            "extractor.api.grpc.face.v1.BestShotSource",
            "/schemas/proto/schema1.proto"
          ).as("new_value")
        )
       .write
        .format("kafka")
        ....

return error:

AnalysisException: Required attribute 'value' not found

What am I doing wrong?

发布评论

评论列表(0)

  1. 暂无评论