最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - pyspark strecttured streaming. awaitTermination() causes java.lang.NoClassDefFoundError: scalaSerializable - Stack Over

programmeradmin3浏览0评论

Python script below reads json data from kafka topic using spark structured streaming. Any action after writeStream related to this dataframe causes the error in spark3-conenct.log ERROR MicroBatchExecution: Query test [id = d3856d89-78f2-41e7-84f1-635c1ad1b831, runId = 4e301d2e-56de-47ab-afa2-1c031de5fe53] terminated with error java.lang.NoClassDefFoundError: scala/Serializable

What can be reason of this error ?

  • Spark 3.5.2
  • openjdk version "1.8.0_442"
  • scala doest installed

Here is the example

from pyspark.sql.types import StructType, StringType, StructField, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.connect.functions import col, from_json

if __name__ == "__main__":
    spark = SparkSession.builder.remote("sc://hadoop:15002") \
        .appName('SparkKafkaJsonToAvro') \
        .config("packages", ".apache.spark:spark-sql-kafka-0-10_2.13:3.5.2") \
        .getOrCreate()

    json_schema = StructType([
        StructField("airline", StringType()),
        StructField("destination", StructType([
            StructField("airport", StringType()),
            StructField("city", StringType()),
            StructField("country", StringType()),
            StructField("iata", StringType()),
            StructField("icao", StringType()),
            StructField("state", StringType())
        ])),
        StructField("origin", StructType([
            StructField("airport", StringType()),
            StructField("city", StringType()),
            StructField("country", StringType()),
            StructField("iata", StringType()),
            StructField("icao", StringType()),
            StructField("state", StringType())
        ])),
        StructField("price", IntegerType()),
        StructField("stops", IntegerType())
        ])

    df = spark.readStream.format('kafka') \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("startingOffsets", "earliest") \
        .option("subscribe", "flights-json") \
        .load()

    df_json = df.select(from_json(col("value").cast("string"), json_schema).alias("value"))

    query = df_json.writeStream.format("console").outputMode("append").start()
    print(query.status)
    query.awaitTermination()

And its output in PyCharm

{'message': 'Terminated with exception: scala/Serializable', 'isDataAvailable': False, 'isTriggerActive': False}
pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED] Query [id = 9b9938a6-90b9-44f8-9bb8-de9b0ebe433e, runId = c8a7128e-a770-40f2-9801-ef635795bb39] terminated with exception: scala/Serializable

I tried to change format of writeStream to kafka, memory but status and awaitTermination() method return error

I expect data from source topic will be written to console output or other kafka topic

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论