最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Getting spark.rpc.message.maxSize error with large file - Stack Overflow

programmeradmin4浏览0评论

I'm ingesting a large zip file in Azure Databricks (345 gb zip file containing a single 1.5 tb csv with ~3 billion rows). The intention is to convert the csv into a delta table for faster ingestion in a data pipeline. Both are stored in Azure blob storage.

I've included the whole code block below but the gist of my process is:

  • instantiate spark session
  • open the csv in the zip file into an iterator using pd.read_csv(chunksize=CHUNKSIZE)
  • go through each chunk in the iterator
  • for each chunk:
    • convert the chunk pd df into spark df
    • write/append to delta table in Azure blob storage
    • clear memory

With CHUNKSIZE=5_000_000 I get this error at iteration 83 (415 million rows processed)

.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 42776:450 was 282494392 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

After reducing chunksize to CHUNKSIZE=3_000_000 I still get the same error but at a later iteration (139 or 417 million rows, so it seems like it's happening after a certain number of rows):

.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 43293:250 was 282494392 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

I understand that reading directly into spark df would be ideal but I haven't been able to find a way to read a zip file in Azure blob directly into a spark df, so definitely open to suggestions on this front too.

My cluster config:

  • 2 workers Standard_DS3_v2 14 GB memory, 4 cores
  • driver Standard_DS13_v2 56 GB memory, 8 cores

Things I've tried:

  • setting spark.rpc.message.maxSize as suggested by the error message using SparkSession.builder.config("spark.rpc.message.maxSize", "512") assuming the unit is in MiB (which seems to be the case as suggested by the spark config docs). This was also suggested by databricks
  • SparkSession.builder.config("spark.rpc.message.maxSize", "536870912") in case the unit is in bytes somehow
  • Both of these still give me the same error even though when I print spark.conf.get("spark.rpc.message.maxSize") it shows the new setting
  • Setting it after instantiating the spark instance using spark.conf.set("spark.rpc.message.maxSize", "512") which gave me an error saying the parameter can't be changed after spark's been instantiated

Entire codeblock:

def convert_zip_to_delta(snapshot_date: str, start_chunk: int = 0):

    # File paths
    zip_file = f"{snapshot_date}.zip"
    delta_file = f"{snapshot_date}_delta"
    delta_table_path = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows/{delta_file}/"

    spark = (
        SparkSession.builder.config("spark.sql.shuffle.partitions", "100")
        .config("spark.hadoop.fs.azure.retries", "10")
        .config("spark.rpc.message.maxSize", "536870912")  # 512 MiB in bytes
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", ".apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()
    )

    spark.conf.set(f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows", BLOB_CREDENTIAL)

    print("**** spark.rpc.message.maxSize = ", spark.conf.get("spark.rpc.message.maxSize"))

    if start_chunk == 0:
        # Delete file if exists
        print("**** Deleting existing delta table")
        if fs.exists(f"{CONTAINER_NAME}/{delta_file}"):
            fs.rm(f"{CONTAINER_NAME}/{delta_file}", recursive=True)

    chunksize = 3_000_000

    with fs.open(f"{CONTAINER_NAME}/{zip_file}", "rb") as file:
        with zipfile.ZipFile(file, "r") as zip_ref:
            file_name = zip_ref.namelist()[0]

            with zip_ref.open(file_name) as csv_file:
                csv_io = TextIOWrapper(csv_file, "utf-8")
                headers = pd.read_csv(csv_io, sep="\t", nrows=0).columns.tolist()
                chunk_iter = pd.read_csv(
                    csv_io,
                    sep="\t",
                    header=None,
                    names=headers,
                    usecols=["col1", "col2", "col3"],
                    dtype=str,  # Read all as strings to avoid errors
                    chunksize=chunksize,
                    skiprows=start_chunk*chunksize
                )

                for chunk in tqdm(chunk_iter, desc="Processing chunks"):
                    # Convert pd DataFrame to Spark DataFrame
                    spark_df = spark.createDataFrame(chunk)

                    (spark_df.repartition(8).write
                     .format("delta")
                     .mode("append")
                     .option("mergeSchema", "true")
                     .save(delta_table_path)
                    )

                    # Clear memory after each iteration
                    spark_df.unpersist(blocking=True)
                    del chunk
                    del spark_df
                    gc.collect()

I'm ingesting a large zip file in Azure Databricks (345 gb zip file containing a single 1.5 tb csv with ~3 billion rows). The intention is to convert the csv into a delta table for faster ingestion in a data pipeline. Both are stored in Azure blob storage.

I've included the whole code block below but the gist of my process is:

  • instantiate spark session
  • open the csv in the zip file into an iterator using pd.read_csv(chunksize=CHUNKSIZE)
  • go through each chunk in the iterator
  • for each chunk:
    • convert the chunk pd df into spark df
    • write/append to delta table in Azure blob storage
    • clear memory

With CHUNKSIZE=5_000_000 I get this error at iteration 83 (415 million rows processed)

.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 42776:450 was 282494392 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

After reducing chunksize to CHUNKSIZE=3_000_000 I still get the same error but at a later iteration (139 or 417 million rows, so it seems like it's happening after a certain number of rows):

.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 43293:250 was 282494392 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

I understand that reading directly into spark df would be ideal but I haven't been able to find a way to read a zip file in Azure blob directly into a spark df, so definitely open to suggestions on this front too.

My cluster config:

  • 2 workers Standard_DS3_v2 14 GB memory, 4 cores
  • driver Standard_DS13_v2 56 GB memory, 8 cores

Things I've tried:

  • setting spark.rpc.message.maxSize as suggested by the error message using SparkSession.builder.config("spark.rpc.message.maxSize", "512") assuming the unit is in MiB (which seems to be the case as suggested by the spark config docs). This was also suggested by databricks
  • SparkSession.builder.config("spark.rpc.message.maxSize", "536870912") in case the unit is in bytes somehow
  • Both of these still give me the same error even though when I print spark.conf.get("spark.rpc.message.maxSize") it shows the new setting
  • Setting it after instantiating the spark instance using spark.conf.set("spark.rpc.message.maxSize", "512") which gave me an error saying the parameter can't be changed after spark's been instantiated

Entire codeblock:

def convert_zip_to_delta(snapshot_date: str, start_chunk: int = 0):

    # File paths
    zip_file = f"{snapshot_date}.zip"
    delta_file = f"{snapshot_date}_delta"
    delta_table_path = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows/{delta_file}/"

    spark = (
        SparkSession.builder.config("spark.sql.shuffle.partitions", "100")
        .config("spark.hadoop.fs.azure.retries", "10")
        .config("spark.rpc.message.maxSize", "536870912")  # 512 MiB in bytes
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", ".apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()
    )

    spark.conf.set(f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows", BLOB_CREDENTIAL)

    print("**** spark.rpc.message.maxSize = ", spark.conf.get("spark.rpc.message.maxSize"))

    if start_chunk == 0:
        # Delete file if exists
        print("**** Deleting existing delta table")
        if fs.exists(f"{CONTAINER_NAME}/{delta_file}"):
            fs.rm(f"{CONTAINER_NAME}/{delta_file}", recursive=True)

    chunksize = 3_000_000

    with fs.open(f"{CONTAINER_NAME}/{zip_file}", "rb") as file:
        with zipfile.ZipFile(file, "r") as zip_ref:
            file_name = zip_ref.namelist()[0]

            with zip_ref.open(file_name) as csv_file:
                csv_io = TextIOWrapper(csv_file, "utf-8")
                headers = pd.read_csv(csv_io, sep="\t", nrows=0).columns.tolist()
                chunk_iter = pd.read_csv(
                    csv_io,
                    sep="\t",
                    header=None,
                    names=headers,
                    usecols=["col1", "col2", "col3"],
                    dtype=str,  # Read all as strings to avoid errors
                    chunksize=chunksize,
                    skiprows=start_chunk*chunksize
                )

                for chunk in tqdm(chunk_iter, desc="Processing chunks"):
                    # Convert pd DataFrame to Spark DataFrame
                    spark_df = spark.createDataFrame(chunk)

                    (spark_df.repartition(8).write
                     .format("delta")
                     .mode("append")
                     .option("mergeSchema", "true")
                     .save(delta_table_path)
                    )

                    # Clear memory after each iteration
                    spark_df.unpersist(blocking=True)
                    del chunk
                    del spark_df
                    gc.collect()
Share Improve this question edited Mar 17 at 0:00 yellowsubmarine asked Mar 16 at 23:57 yellowsubmarineyellowsubmarine 135 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

If this is a one time operation why not unzip into a temporary location and try with Spark's read.csv method.
Another advantage is that now you could split it into smaller chunks.

发布评论

评论列表(0)

  1. 暂无评论