I am trying to use polars~=1.24.0
on Python 3.13 to process larger-than-memory sized datasets. Specifically, I am loading many (i.e., 35 of them) parquet
files via the polars.scan_parquet('base-name-*.parquet')
API to create the LazyFrame
.
In my use case, I need to batch process this dataset and ingest it to a remote database backend. I am using concurrent.futures.ThreadPoolExecutor
to spawn multiple threads and do the batch processing.
What is the best way to cover the whole dataset while making sure that the workers would not result in OOM issues on my local machine?
I have thought of a kernel (to be passed to the ThreadPoolExecutor
) of the form:
def parse(df: pl.LazyFrame, rank: int, size: int):
split, remain = divmod(size, rank)
start = rank * split + min(rank, remain)
end = start + split + (rank < remain)
for row in df.slice(start, end-start).collect(new_streaming=True).iter_rows():
# do something with the row
pass
but I have the following questions:
- Is it safe to pass
df: pl.LazyFrame
to other threads like this, - Is it safe to (read-only) access the dataframe/lazyframe via
.slice
and cover all the rows, .collect
still loads (or seems to load, inhtop
) the whole chunk to the memory -- is there a more efficient/correct way to stream those while being able to use the "iterator" interface? Or should I further slice / mini-batch the range[start, end)
?
I am new to polars
and I would love to get some suggestions/help along the above use case.
I am trying to use polars~=1.24.0
on Python 3.13 to process larger-than-memory sized datasets. Specifically, I am loading many (i.e., 35 of them) parquet
files via the polars.scan_parquet('base-name-*.parquet')
API to create the LazyFrame
.
In my use case, I need to batch process this dataset and ingest it to a remote database backend. I am using concurrent.futures.ThreadPoolExecutor
to spawn multiple threads and do the batch processing.
What is the best way to cover the whole dataset while making sure that the workers would not result in OOM issues on my local machine?
I have thought of a kernel (to be passed to the ThreadPoolExecutor
) of the form:
def parse(df: pl.LazyFrame, rank: int, size: int):
split, remain = divmod(size, rank)
start = rank * split + min(rank, remain)
end = start + split + (rank < remain)
for row in df.slice(start, end-start).collect(new_streaming=True).iter_rows():
# do something with the row
pass
but I have the following questions:
- Is it safe to pass
df: pl.LazyFrame
to other threads like this, - Is it safe to (read-only) access the dataframe/lazyframe via
.slice
and cover all the rows, .collect
still loads (or seems to load, inhtop
) the whole chunk to the memory -- is there a more efficient/correct way to stream those while being able to use the "iterator" interface? Or should I further slice / mini-batch the range[start, end)
?
I am new to polars
and I would love to get some suggestions/help along the above use case.
1 Answer
Reset to default 1This is probably not a good job for polars until/unless they incorporate this feature. I think you'd be better off using pyarrow and iterate over files and row groups. If you want to use python to parallelize, use MultiProcess with spawn mode, don't use fork.
Here's a serial version of iterating with pyarrow. You could multiprocess over the files but not the inner row groups.
from pathlib import Path
from pyarrow import parquet as pq
import polars as pl
rootpath = Path(".")
files = rootpath.iterdir()
for fl in files:
pqfile = pq.ParquetFile(fl)
num_rgs = pqfile.num_row_groups
for rg_i in range(num_rgs):
rg_table = pqfile.read_row_group(rg_i)
df = pl.from_arrow(rg_table) # optional
## do whatever with pyarrow table or df