最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - How to efficiently upsert (update+insert) large datasets with Polars - Stack Overflow

programmeradmin4浏览0评论

I am working with large datasets stored in Parquet files and need to perform an upsert (update + insert) operation using Polars. If the files grow to a couple of GBs, I run into memory issues and the update operation fails. My system has 16 GB of RAM.

Here’s a simplified example where I generate a large dataset and a smaller dataset for updating:

import polars as pl

def generate_data(groups, nids, ncols, f=1.0):
    symbols = pl.LazyFrame({'group': groups})
    ids = pl.LazyFrame({'id': pl.arange(nids, dtype=pl.Int64, eager=True)})
    cols_expr = [pl.lit(i*f, dtype=pl.Float64).alias(f'val_{i}') for i in range(1, ncols+1)]
    return symbols.join(ids, how='cross').with_columns(cols_expr).collect()


# Generate large dataset
df_old = generate_data(groups=list('ABCDEFGHIJKLMNOPQRSTUVWXYZ'), nids=10**7, ncols=4)
print(f'df_old: {round(df_old.estimated_size()/10**9, 3)} GB')
# df_old: 10.66 GB

# Generate relatively small dataset update
df_new = generate_data(groups=['A', 'D', 'XYZ'], nids=10**4, ncols=4, f=10.)
print(f'df_new: {round(df_new.estimated_size()/10**9, 3)} GB')
# df_new: 0.001 GB

# Update fails probably due to memory issues
df = df_old.update(df_new, on=['group', 'id'], how='full').sort(['group', 'id'])
print(df)
# The kernel died, restarting...

# Polars version 1.17.1

The above code works with smaller datasets, but when the data size increases (e.g., df_old being 10 GB), I encounter kernel crashes.

What is the most memory-efficient way to perform an upsert on large datasets using Polars? Are there strategies to avoid memory issues while updating large datasets?

I am working with large datasets stored in Parquet files and need to perform an upsert (update + insert) operation using Polars. If the files grow to a couple of GBs, I run into memory issues and the update operation fails. My system has 16 GB of RAM.

Here’s a simplified example where I generate a large dataset and a smaller dataset for updating:

import polars as pl

def generate_data(groups, nids, ncols, f=1.0):
    symbols = pl.LazyFrame({'group': groups})
    ids = pl.LazyFrame({'id': pl.arange(nids, dtype=pl.Int64, eager=True)})
    cols_expr = [pl.lit(i*f, dtype=pl.Float64).alias(f'val_{i}') for i in range(1, ncols+1)]
    return symbols.join(ids, how='cross').with_columns(cols_expr).collect()


# Generate large dataset
df_old = generate_data(groups=list('ABCDEFGHIJKLMNOPQRSTUVWXYZ'), nids=10**7, ncols=4)
print(f'df_old: {round(df_old.estimated_size()/10**9, 3)} GB')
# df_old: 10.66 GB

# Generate relatively small dataset update
df_new = generate_data(groups=['A', 'D', 'XYZ'], nids=10**4, ncols=4, f=10.)
print(f'df_new: {round(df_new.estimated_size()/10**9, 3)} GB')
# df_new: 0.001 GB

# Update fails probably due to memory issues
df = df_old.update(df_new, on=['group', 'id'], how='full').sort(['group', 'id'])
print(df)
# The kernel died, restarting...

# Polars version 1.17.1

The above code works with smaller datasets, but when the data size increases (e.g., df_old being 10 GB), I encounter kernel crashes.

What is the most memory-efficient way to perform an upsert on large datasets using Polars? Are there strategies to avoid memory issues while updating large datasets?

Share Improve this question asked Jan 18 at 0:25 OlibarerOlibarer 2871 silver badge9 bronze badges 2
  • 1 Polars is going to soon replace its streaming engine with new-streaming - You could try the latest 1.20.0 release with POLARS_AUTO_NEW_STREAMING=1 set in your environment to test it out. (It's still a WIP though.) – jqurious Commented Jan 18 at 18:54
  • 1 @jqurious thank you very much. sink_parquet together with the new streaming engine (still in alpha) should do the job in future. I'm using Polars in a conda environment where Polars is still v.1.17.1. I assume it will take a couple of weeks until this feature is available for me. – Olibarer Commented Jan 18 at 22:24
Add a comment  | 

1 Answer 1

Reset to default 2

There are different ways or replicating "update" functionality with joins, I've found that sometimes splitting the job in 3 parts works best:

df1 = (
    df_new
    .join(df_old, on = ["group","id"], how="inner")
    .select(df_new.columns)
)

df2 = (
    df_new
    .join(df_old, on = ["group","id"], how="anti")
)

df3 = (
    df_old
    .join(df_new, on = ["group","id"], how="anti")
)

df_all = pl.concat([df1, df2, df3])

Also I've found that sometimes DuckDB works better than Polars with large dataframes, unless you take some effort to rewrite the polars code. You can also use that:

df_both = duckdb.sql("""
    select
        coalesce(o.group, n.group) as group,
        coalesce(o.id, n.id) as id,
        coalesce(o.val_1, n.val_1) as val_1,
        coalesce(o.val_2, n.val_2) as val_2,
        coalesce(o.val_3, n.val_3) as val_3,
        coalesce(o.val_4, n.val_4) as val_4
    from df_old as o
        full join df_new as n on
            n.group = o.group and
            n.id = o.id
""").pl()
发布评论

评论列表(0)

  1. 暂无评论