I'm ingesting a large zip file in Azure Databricks (345 gb zip file containing a single 1.5 tb csv with ~3 billion rows). The intention is to convert the csv into a delta table for faster ingestion in a data pipeline. Both are stored in Azure blob storage.
I've included the whole code block below but the gist of my process is:
- instantiate spark session
- open the csv in the zip file into an iterator using
pd.read_csv(chunksize=CHUNKSIZE)
- go through each chunk in the iterator
- for each chunk:
- convert the chunk pd df into spark df
- write/append to delta table in Azure blob storage
- clear memory
With CHUNKSIZE=5_000_000
I get this error at iteration 83 (415 million rows processed)
.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 42776:450 was 282494392 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
After reducing chunksize to CHUNKSIZE=3_000_000
I still get the same error but at a later iteration (139 or 417 million rows, so it seems like it's happening after a certain number of rows):
.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 43293:250 was 282494392 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
I understand that reading directly into spark df would be ideal but I haven't been able to find a way to read a zip file in Azure blob directly into a spark df, so definitely open to suggestions on this front too.
My cluster config:
- 2 workers
Standard_DS3_v2
14 GB memory, 4 cores - driver
Standard_DS13_v2
56 GB memory, 8 cores
Things I've tried:
- setting
spark.rpc.message.maxSize
as suggested by the error message usingSparkSession.builder.config("spark.rpc.message.maxSize", "512")
assuming the unit is in MiB (which seems to be the case as suggested by the spark config docs). This was also suggested by databricks SparkSession.builder.config("spark.rpc.message.maxSize", "536870912")
in case the unit is in bytes somehow- Both of these still give me the same error even though when I print
spark.conf.get("spark.rpc.message.maxSize")
it shows the new setting - Setting it after instantiating the spark instance using
spark.conf.set("spark.rpc.message.maxSize", "512")
which gave me an error saying the parameter can't be changed after spark's been instantiated
Entire codeblock:
def convert_zip_to_delta(snapshot_date: str, start_chunk: int = 0):
# File paths
zip_file = f"{snapshot_date}.zip"
delta_file = f"{snapshot_date}_delta"
delta_table_path = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows/{delta_file}/"
spark = (
SparkSession.builder.config("spark.sql.shuffle.partitions", "100")
.config("spark.hadoop.fs.azure.retries", "10")
.config("spark.rpc.message.maxSize", "536870912") # 512 MiB in bytes
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", ".apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
spark.conf.set(f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows", BLOB_CREDENTIAL)
print("**** spark.rpc.message.maxSize = ", spark.conf.get("spark.rpc.message.maxSize"))
if start_chunk == 0:
# Delete file if exists
print("**** Deleting existing delta table")
if fs.exists(f"{CONTAINER_NAME}/{delta_file}"):
fs.rm(f"{CONTAINER_NAME}/{delta_file}", recursive=True)
chunksize = 3_000_000
with fs.open(f"{CONTAINER_NAME}/{zip_file}", "rb") as file:
with zipfile.ZipFile(file, "r") as zip_ref:
file_name = zip_ref.namelist()[0]
with zip_ref.open(file_name) as csv_file:
csv_io = TextIOWrapper(csv_file, "utf-8")
headers = pd.read_csv(csv_io, sep="\t", nrows=0).columns.tolist()
chunk_iter = pd.read_csv(
csv_io,
sep="\t",
header=None,
names=headers,
usecols=["col1", "col2", "col3"],
dtype=str, # Read all as strings to avoid errors
chunksize=chunksize,
skiprows=start_chunk*chunksize
)
for chunk in tqdm(chunk_iter, desc="Processing chunks"):
# Convert pd DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(chunk)
(spark_df.repartition(8).write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(delta_table_path)
)
# Clear memory after each iteration
spark_df.unpersist(blocking=True)
del chunk
del spark_df
gc.collect()
I'm ingesting a large zip file in Azure Databricks (345 gb zip file containing a single 1.5 tb csv with ~3 billion rows). The intention is to convert the csv into a delta table for faster ingestion in a data pipeline. Both are stored in Azure blob storage.
I've included the whole code block below but the gist of my process is:
- instantiate spark session
- open the csv in the zip file into an iterator using
pd.read_csv(chunksize=CHUNKSIZE)
- go through each chunk in the iterator
- for each chunk:
- convert the chunk pd df into spark df
- write/append to delta table in Azure blob storage
- clear memory
With CHUNKSIZE=5_000_000
I get this error at iteration 83 (415 million rows processed)
.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 42776:450 was 282494392 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
After reducing chunksize to CHUNKSIZE=3_000_000
I still get the same error but at a later iteration (139 or 417 million rows, so it seems like it's happening after a certain number of rows):
.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 43293:250 was 282494392 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
I understand that reading directly into spark df would be ideal but I haven't been able to find a way to read a zip file in Azure blob directly into a spark df, so definitely open to suggestions on this front too.
My cluster config:
- 2 workers
Standard_DS3_v2
14 GB memory, 4 cores - driver
Standard_DS13_v2
56 GB memory, 8 cores
Things I've tried:
- setting
spark.rpc.message.maxSize
as suggested by the error message usingSparkSession.builder.config("spark.rpc.message.maxSize", "512")
assuming the unit is in MiB (which seems to be the case as suggested by the spark config docs). This was also suggested by databricks SparkSession.builder.config("spark.rpc.message.maxSize", "536870912")
in case the unit is in bytes somehow- Both of these still give me the same error even though when I print
spark.conf.get("spark.rpc.message.maxSize")
it shows the new setting - Setting it after instantiating the spark instance using
spark.conf.set("spark.rpc.message.maxSize", "512")
which gave me an error saying the parameter can't be changed after spark's been instantiated
Entire codeblock:
def convert_zip_to_delta(snapshot_date: str, start_chunk: int = 0):
# File paths
zip_file = f"{snapshot_date}.zip"
delta_file = f"{snapshot_date}_delta"
delta_table_path = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows/{delta_file}/"
spark = (
SparkSession.builder.config("spark.sql.shuffle.partitions", "100")
.config("spark.hadoop.fs.azure.retries", "10")
.config("spark.rpc.message.maxSize", "536870912") # 512 MiB in bytes
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", ".apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
spark.conf.set(f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows", BLOB_CREDENTIAL)
print("**** spark.rpc.message.maxSize = ", spark.conf.get("spark.rpc.message.maxSize"))
if start_chunk == 0:
# Delete file if exists
print("**** Deleting existing delta table")
if fs.exists(f"{CONTAINER_NAME}/{delta_file}"):
fs.rm(f"{CONTAINER_NAME}/{delta_file}", recursive=True)
chunksize = 3_000_000
with fs.open(f"{CONTAINER_NAME}/{zip_file}", "rb") as file:
with zipfile.ZipFile(file, "r") as zip_ref:
file_name = zip_ref.namelist()[0]
with zip_ref.open(file_name) as csv_file:
csv_io = TextIOWrapper(csv_file, "utf-8")
headers = pd.read_csv(csv_io, sep="\t", nrows=0).columns.tolist()
chunk_iter = pd.read_csv(
csv_io,
sep="\t",
header=None,
names=headers,
usecols=["col1", "col2", "col3"],
dtype=str, # Read all as strings to avoid errors
chunksize=chunksize,
skiprows=start_chunk*chunksize
)
for chunk in tqdm(chunk_iter, desc="Processing chunks"):
# Convert pd DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(chunk)
(spark_df.repartition(8).write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(delta_table_path)
)
# Clear memory after each iteration
spark_df.unpersist(blocking=True)
del chunk
del spark_df
gc.collect()
Share
Improve this question
edited Mar 17 at 0:00
yellowsubmarine
asked Mar 16 at 23:57
yellowsubmarineyellowsubmarine
135 bronze badges
1 Answer
Reset to default 0If this is a one time operation why not unzip into a temporary location and try with Spark's read.csv
method.
Another advantage is that now you could split it into smaller chunks.