最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Efficient (and Safe) Way of Accessing Larger-than-Memory Datasets in Parallel - Stack Overflow

programmeradmin2浏览0评论

I am trying to use polars~=1.24.0 on Python 3.13 to process larger-than-memory sized datasets. Specifically, I am loading many (i.e., 35 of them) parquet files via the polars.scan_parquet('base-name-*.parquet') API to create the LazyFrame.

In my use case, I need to batch process this dataset and ingest it to a remote database backend. I am using concurrent.futures.ThreadPoolExecutor to spawn multiple threads and do the batch processing.

What is the best way to cover the whole dataset while making sure that the workers would not result in OOM issues on my local machine?

I have thought of a kernel (to be passed to the ThreadPoolExecutor) of the form:

def parse(df: pl.LazyFrame, rank: int, size: int):
    split, remain = divmod(size, rank)
    start = rank * split + min(rank, remain)
    end = start + split + (rank < remain)

    for row in df.slice(start, end-start).collect(new_streaming=True).iter_rows():
        # do something with the row
        pass

but I have the following questions:

  1. Is it safe to pass df: pl.LazyFrame to other threads like this,
  2. Is it safe to (read-only) access the dataframe/lazyframe via .slice and cover all the rows,
  3. .collect still loads (or seems to load, in htop) the whole chunk to the memory -- is there a more efficient/correct way to stream those while being able to use the "iterator" interface? Or should I further slice / mini-batch the range [start, end)?

I am new to polars and I would love to get some suggestions/help along the above use case.

I am trying to use polars~=1.24.0 on Python 3.13 to process larger-than-memory sized datasets. Specifically, I am loading many (i.e., 35 of them) parquet files via the polars.scan_parquet('base-name-*.parquet') API to create the LazyFrame.

In my use case, I need to batch process this dataset and ingest it to a remote database backend. I am using concurrent.futures.ThreadPoolExecutor to spawn multiple threads and do the batch processing.

What is the best way to cover the whole dataset while making sure that the workers would not result in OOM issues on my local machine?

I have thought of a kernel (to be passed to the ThreadPoolExecutor) of the form:

def parse(df: pl.LazyFrame, rank: int, size: int):
    split, remain = divmod(size, rank)
    start = rank * split + min(rank, remain)
    end = start + split + (rank < remain)

    for row in df.slice(start, end-start).collect(new_streaming=True).iter_rows():
        # do something with the row
        pass

but I have the following questions:

  1. Is it safe to pass df: pl.LazyFrame to other threads like this,
  2. Is it safe to (read-only) access the dataframe/lazyframe via .slice and cover all the rows,
  3. .collect still loads (or seems to load, in htop) the whole chunk to the memory -- is there a more efficient/correct way to stream those while being able to use the "iterator" interface? Or should I further slice / mini-batch the range [start, end)?

I am new to polars and I would love to get some suggestions/help along the above use case.

Share Improve this question edited Mar 17 at 18:32 jqurious 22.1k5 gold badges20 silver badges39 bronze badges asked Mar 13 at 12:48 Arda AytekinArda Aytekin 1,30315 silver badges25 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

This is probably not a good job for polars until/unless they incorporate this feature. I think you'd be better off using pyarrow and iterate over files and row groups. If you want to use python to parallelize, use MultiProcess with spawn mode, don't use fork.

Here's a serial version of iterating with pyarrow. You could multiprocess over the files but not the inner row groups.

from pathlib import Path
from pyarrow import parquet as pq
import polars as pl
rootpath = Path(".")
files = rootpath.iterdir()

for fl in files:
    pqfile = pq.ParquetFile(fl)
    num_rgs = pqfile.num_row_groups
    for rg_i in range(num_rgs):
        rg_table = pqfile.read_row_group(rg_i)
        df = pl.from_arrow(rg_table) # optional
        ## do whatever with pyarrow table or df
        
发布评论

评论列表(0)

  1. 暂无评论