I'm trying to realize to-protobuf transformation inside Spark Streaming code, which read data from kafka topic.
Income dataframe:
readStreamFromKafka(config).writeStream
.foreachBatch { (kafkaBatch: DataFrame, batchId: Long) =>
kafkaBatch.printSchema()
has following schema:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
|-- headers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: binary (nullable = true)
kafkaBatch.show(truncate = false) return not null "value":
+----+----------- ------+
|key |value .... |
+----+----------- ------+
|null|[C0 E2 4F .... E2 4F |
+----+----------- ------+
Logical plan is:
StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], .apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ad7b8e9, KafkaV2[Subscribe[bg-intermediate]]
But my code:
kafkaBatch
.select(
from_avro(
col("value"),
readSchemaFromResource("/schemas/avro/schema1.avsc")
).as("value")
)
.select(
to_protobuf(
col("value"),
"extractor.api.grpc.face.v1.BestShotSource",
"/schemas/proto/schema1.proto"
).as("new_value")
)
.write
.format("kafka")
....
return error:
AnalysisException: Required attribute 'value' not found
What am I doing wrong?