I have an async service that processes data. My current approach is to process different folds created using TimeSeriesSplit
in parallel, since this is a CPU heavy task I decided to uses concurrent.futures.process.ProcessPoolExecutor
where each worker loads it's own fold (using the list of corresponding indices that get passed as argument to this blocking function process_single_fold
) and then performs CPU heavy operations on that data. Whenever a fold is processed I store it in the database, using asyncio.as_completed(futures)
.
However I noticed that when I increase the max_workers
in ProcessPoolExecutor
(e.g to 5) my code hangs sometimes, I guess depending on the workload on my machine (and in that case even though I see log messages that the processing finished, I still see that some workers are still 'running').
with ProcessPoolExecutor(max_workers=min(config.n_splits, os.cpu_count())) as pool:
loop = asyncio.get_running_loop()
table_created_flag = asyncio.Event()
futures = [
loop.run_in_executor(
pool,
process_single_fold,
ctx._settings.SRCDB_URL,
fold,
train_index,
test_index,
config,
)
for fold, (train_index, test_index) in enumerate(splits)
]
table_cache = {}
for fold, future in enumerate(asyncio.as_completed(futures)):
await store_dataframes_incrementally(
ctx.dst_db.engine,
future,
config,
fold,
table_created_flag,
table_cache,
)
process_single_fold
performs I/O connecting to the database (creates it's own SQLAlchemy engine using NullPool) and loads the data for a specific fold. The table flag I use in the code is an indicator whether the table has been created or not (which happens when the first fold is processed and ready)
My question is, first, is there a flaw in the design of this solution and why? and how do I handle this problem of a process getting randomly terminated (possibly by the OS) or hanging (can I kill the process and retry for that fold for example) and second if this is an expected outcome and the main thing I need to do is set max_workers
wisely?
I am using WSL.
I have an async service that processes data. My current approach is to process different folds created using TimeSeriesSplit
in parallel, since this is a CPU heavy task I decided to uses concurrent.futures.process.ProcessPoolExecutor
where each worker loads it's own fold (using the list of corresponding indices that get passed as argument to this blocking function process_single_fold
) and then performs CPU heavy operations on that data. Whenever a fold is processed I store it in the database, using asyncio.as_completed(futures)
.
However I noticed that when I increase the max_workers
in ProcessPoolExecutor
(e.g to 5) my code hangs sometimes, I guess depending on the workload on my machine (and in that case even though I see log messages that the processing finished, I still see that some workers are still 'running').
with ProcessPoolExecutor(max_workers=min(config.n_splits, os.cpu_count())) as pool:
loop = asyncio.get_running_loop()
table_created_flag = asyncio.Event()
futures = [
loop.run_in_executor(
pool,
process_single_fold,
ctx._settings.SRCDB_URL,
fold,
train_index,
test_index,
config,
)
for fold, (train_index, test_index) in enumerate(splits)
]
table_cache = {}
for fold, future in enumerate(asyncio.as_completed(futures)):
await store_dataframes_incrementally(
ctx.dst_db.engine,
future,
config,
fold,
table_created_flag,
table_cache,
)
process_single_fold
performs I/O connecting to the database (creates it's own SQLAlchemy engine using NullPool) and loads the data for a specific fold. The table flag I use in the code is an indicator whether the table has been created or not (which happens when the first fold is processed and ready)
My question is, first, is there a flaw in the design of this solution and why? and how do I handle this problem of a process getting randomly terminated (possibly by the OS) or hanging (can I kill the process and retry for that fold for example) and second if this is an expected outcome and the main thing I need to do is set max_workers
wisely?
I am using WSL.
Share Improve this question edited Feb 4 at 19:41 Ahmed Troudi asked Feb 4 at 19:21 Ahmed TroudiAhmed Troudi 33 bronze badges 7- 1 It can be tricky to mix multiprocessing and asyncio. Do you really need asyncio here? Because of your ProcessPoolExecutor, each member of the pool is going to be in a different process with its own memory space, so they won't impede each other. – Tim Roberts Commented Feb 4 at 19:33
- My service listens to a rabbitMQ queue, consumes task messages as they come and should preferably be available for ping requests, that is the reason I am making it async. – Ahmed Troudi Commented Feb 4 at 19:37
- I am open to other suggestions though and if making it fully synchronous also makes sense with rabbitMQ and in a production setting then I would like to know more about that – Ahmed Troudi Commented Feb 4 at 20:52
- Where is the rabbit listener? Is that in the main process? I don't think you can share a listener across multiple processes, and if you do they don't share state. That's what I mean about being "tricky". – Tim Roberts Commented Feb 4 at 22:40
- it is in the main process, when a message is received the main process creates workers and assign independent tasks to the workers, I don't want the workers to share state that's why I'm using the ProcessPoolExecutor. – Ahmed Troudi Commented Feb 5 at 7:54
1 Answer
Reset to default 0I had a silly bug in that the ProcessPoolExecutor was getting recreated for each request and I believe that was causing the hanging problem as a new request could disrupt one that is already running. Make sure the pool is created only once and reused by multiple requests.