I am working with large datasets stored in Parquet files and need to perform an upsert (update + insert) operation using Polars. If the files grow to a couple of GBs, I run into memory issues and the update operation fails. My system has 16 GB of RAM.
Here’s a simplified example where I generate a large dataset and a smaller dataset for updating:
import polars as pl
def generate_data(groups, nids, ncols, f=1.0):
symbols = pl.LazyFrame({'group': groups})
ids = pl.LazyFrame({'id': pl.arange(nids, dtype=pl.Int64, eager=True)})
cols_expr = [pl.lit(i*f, dtype=pl.Float64).alias(f'val_{i}') for i in range(1, ncols+1)]
return symbols.join(ids, how='cross').with_columns(cols_expr).collect()
# Generate large dataset
df_old = generate_data(groups=list('ABCDEFGHIJKLMNOPQRSTUVWXYZ'), nids=10**7, ncols=4)
print(f'df_old: {round(df_old.estimated_size()/10**9, 3)} GB')
# df_old: 10.66 GB
# Generate relatively small dataset update
df_new = generate_data(groups=['A', 'D', 'XYZ'], nids=10**4, ncols=4, f=10.)
print(f'df_new: {round(df_new.estimated_size()/10**9, 3)} GB')
# df_new: 0.001 GB
# Update fails probably due to memory issues
df = df_old.update(df_new, on=['group', 'id'], how='full').sort(['group', 'id'])
print(df)
# The kernel died, restarting...
# Polars version 1.17.1
The above code works with smaller datasets, but when the data size increases (e.g., df_old being 10 GB), I encounter kernel crashes.
What is the most memory-efficient way to perform an upsert on large datasets using Polars? Are there strategies to avoid memory issues while updating large datasets?
I am working with large datasets stored in Parquet files and need to perform an upsert (update + insert) operation using Polars. If the files grow to a couple of GBs, I run into memory issues and the update operation fails. My system has 16 GB of RAM.
Here’s a simplified example where I generate a large dataset and a smaller dataset for updating:
import polars as pl
def generate_data(groups, nids, ncols, f=1.0):
symbols = pl.LazyFrame({'group': groups})
ids = pl.LazyFrame({'id': pl.arange(nids, dtype=pl.Int64, eager=True)})
cols_expr = [pl.lit(i*f, dtype=pl.Float64).alias(f'val_{i}') for i in range(1, ncols+1)]
return symbols.join(ids, how='cross').with_columns(cols_expr).collect()
# Generate large dataset
df_old = generate_data(groups=list('ABCDEFGHIJKLMNOPQRSTUVWXYZ'), nids=10**7, ncols=4)
print(f'df_old: {round(df_old.estimated_size()/10**9, 3)} GB')
# df_old: 10.66 GB
# Generate relatively small dataset update
df_new = generate_data(groups=['A', 'D', 'XYZ'], nids=10**4, ncols=4, f=10.)
print(f'df_new: {round(df_new.estimated_size()/10**9, 3)} GB')
# df_new: 0.001 GB
# Update fails probably due to memory issues
df = df_old.update(df_new, on=['group', 'id'], how='full').sort(['group', 'id'])
print(df)
# The kernel died, restarting...
# Polars version 1.17.1
The above code works with smaller datasets, but when the data size increases (e.g., df_old being 10 GB), I encounter kernel crashes.
What is the most memory-efficient way to perform an upsert on large datasets using Polars? Are there strategies to avoid memory issues while updating large datasets?
Share Improve this question asked Jan 18 at 0:25 OlibarerOlibarer 2871 silver badge9 bronze badges 2 |1 Answer
Reset to default 2There are different ways or replicating "update" functionality with joins, I've found that sometimes splitting the job in 3 parts works best:
df1 = (
df_new
.join(df_old, on = ["group","id"], how="inner")
.select(df_new.columns)
)
df2 = (
df_new
.join(df_old, on = ["group","id"], how="anti")
)
df3 = (
df_old
.join(df_new, on = ["group","id"], how="anti")
)
df_all = pl.concat([df1, df2, df3])
Also I've found that sometimes DuckDB works better than Polars with large dataframes, unless you take some effort to rewrite the polars code. You can also use that:
df_both = duckdb.sql("""
select
coalesce(o.group, n.group) as group,
coalesce(o.id, n.id) as id,
coalesce(o.val_1, n.val_1) as val_1,
coalesce(o.val_2, n.val_2) as val_2,
coalesce(o.val_3, n.val_3) as val_3,
coalesce(o.val_4, n.val_4) as val_4
from df_old as o
full join df_new as n on
n.group = o.group and
n.id = o.id
""").pl()
1.20.0
release withPOLARS_AUTO_NEW_STREAMING=1
set in your environment to test it out. (It's still a WIP though.) – jqurious Commented Jan 18 at 18:54