Python script below reads json data from kafka topic using spark structured streaming. Any action after writeStream related to this dataframe causes the error in spark3-conenct.log ERROR MicroBatchExecution: Query test [id = d3856d89-78f2-41e7-84f1-635c1ad1b831, runId = 4e301d2e-56de-47ab-afa2-1c031de5fe53] terminated with error java.lang.NoClassDefFoundError: scala/Serializable
What can be reason of this error ?
- Spark 3.5.2
- openjdk version "1.8.0_442"
- scala doest installed
Here is the example
from pyspark.sql.types import StructType, StringType, StructField, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.connect.functions import col, from_json
if __name__ == "__main__":
spark = SparkSession.builder.remote("sc://hadoop:15002") \
.appName('SparkKafkaJsonToAvro') \
.config("packages", ".apache.spark:spark-sql-kafka-0-10_2.13:3.5.2") \
.getOrCreate()
json_schema = StructType([
StructField("airline", StringType()),
StructField("destination", StructType([
StructField("airport", StringType()),
StructField("city", StringType()),
StructField("country", StringType()),
StructField("iata", StringType()),
StructField("icao", StringType()),
StructField("state", StringType())
])),
StructField("origin", StructType([
StructField("airport", StringType()),
StructField("city", StringType()),
StructField("country", StringType()),
StructField("iata", StringType()),
StructField("icao", StringType()),
StructField("state", StringType())
])),
StructField("price", IntegerType()),
StructField("stops", IntegerType())
])
df = spark.readStream.format('kafka') \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("startingOffsets", "earliest") \
.option("subscribe", "flights-json") \
.load()
df_json = df.select(from_json(col("value").cast("string"), json_schema).alias("value"))
query = df_json.writeStream.format("console").outputMode("append").start()
print(query.status)
query.awaitTermination()
And its output in PyCharm
{'message': 'Terminated with exception: scala/Serializable', 'isDataAvailable': False, 'isTriggerActive': False}
pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED] Query [id = 9b9938a6-90b9-44f8-9bb8-de9b0ebe433e, runId = c8a7128e-a770-40f2-9801-ef635795bb39] terminated with exception: scala/Serializable
I tried to change format of writeStream to kafka, memory but status and awaitTermination() method return error
I expect data from source topic will be written to console output or other kafka topic