I am on Linux and have working multiprocessing code that uses fork. Here is a MWE version:
from multiprocessing import Pool
from time import perf_counter as now
import numpy as np
def make_func():
n = 20000
np.random.seed(6)
start = now()
M = np.random.rand(n, n)
return lambda x, y: M[x, x] + M[y, y]
class ParallelProcessor:
def __init__(self):
pass
def process_task(self, args):
"""Unpack arguments internally"""
index, integer_arg = args
print(f(index, integer_arg))
def run_parallel(self, tasks, num_cores=None):
"""Simplified parallel execution without partial"""
num_cores = num_cores
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
global f
f = make_func()
print(f"************** {now() - start} seconds to make f")
start = now()
with Pool(num_cores) as pool:
results = pool.map( self.process_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)
This uses the fact that global f
is shared with the workers and as the numpy array is not modified by them, it is not copied.
In 3.14 Linux multiprocessing will be moved to spawn
. So in anticipation I made a version using spawn
.
from multiprocessing import Pool, RawArray, set_start_method
from time import perf_counter as now
import numpy as np
def init_worker(shared_array_base, shape):
"""Initializer function to set up shared memory for each worker"""
global M
M = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape)
def worker_task(args):
"""Worker function that reconstructs f using shared memory"""
index, integer_arg = args
result = M[index, index] + M[integer_arg, integer_arg]
print(result)
return result
class ParallelProcessor:
def __init__(self):
pass
def run_parallel(self, tasks, num_cores=None):
"""Run tasks in parallel using spawn and shared memory"""
set_start_method('spawn', force=True) # Ensure 'spawn' is used
n = 20000
shape = (n, n)
# Use 'd' for double-precision float (float64) instead of np.float64
shared_array_base = RawArray('d', n * n)
M_local = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape)
# Initialize the array in the main process
np.random.seed(7)
start = now()
M_local[:] = np.random.rand(n, n)
print(f"************** {now() - start} seconds to make M")
# Prepare arguments for worker tasks
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
with Pool(num_cores, initializer=init_worker, initargs=(shared_array_base, shape)) as pool:
results = pool.map(worker_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)
Unfortunately, not only is this slower, it also seems to use much more memory. Maybe it is making a copy of the numpy array?
Is there a way to make the spawn
version as efficient as the fork
version?
I am on Linux and have working multiprocessing code that uses fork. Here is a MWE version:
from multiprocessing import Pool
from time import perf_counter as now
import numpy as np
def make_func():
n = 20000
np.random.seed(6)
start = now()
M = np.random.rand(n, n)
return lambda x, y: M[x, x] + M[y, y]
class ParallelProcessor:
def __init__(self):
pass
def process_task(self, args):
"""Unpack arguments internally"""
index, integer_arg = args
print(f(index, integer_arg))
def run_parallel(self, tasks, num_cores=None):
"""Simplified parallel execution without partial"""
num_cores = num_cores
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
global f
f = make_func()
print(f"************** {now() - start} seconds to make f")
start = now()
with Pool(num_cores) as pool:
results = pool.map( self.process_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)
This uses the fact that global f
is shared with the workers and as the numpy array is not modified by them, it is not copied.
In 3.14 Linux multiprocessing will be moved to spawn
. So in anticipation I made a version using spawn
.
from multiprocessing import Pool, RawArray, set_start_method
from time import perf_counter as now
import numpy as np
def init_worker(shared_array_base, shape):
"""Initializer function to set up shared memory for each worker"""
global M
M = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape)
def worker_task(args):
"""Worker function that reconstructs f using shared memory"""
index, integer_arg = args
result = M[index, index] + M[integer_arg, integer_arg]
print(result)
return result
class ParallelProcessor:
def __init__(self):
pass
def run_parallel(self, tasks, num_cores=None):
"""Run tasks in parallel using spawn and shared memory"""
set_start_method('spawn', force=True) # Ensure 'spawn' is used
n = 20000
shape = (n, n)
# Use 'd' for double-precision float (float64) instead of np.float64
shared_array_base = RawArray('d', n * n)
M_local = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape)
# Initialize the array in the main process
np.random.seed(7)
start = now()
M_local[:] = np.random.rand(n, n)
print(f"************** {now() - start} seconds to make M")
# Prepare arguments for worker tasks
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
with Pool(num_cores, initializer=init_worker, initargs=(shared_array_base, shape)) as pool:
results = pool.map(worker_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)
Unfortunately, not only is this slower, it also seems to use much more memory. Maybe it is making a copy of the numpy array?
Is there a way to make the spawn
version as efficient as the fork
version?
1 Answer
Reset to default 2Update 1
My original code was relying on thread local storage to be cleaned up when the pool terminates. While this has been reliable for me when using a multithreading pool, with a multiprocessing pool not so much. In this version we create a wrapper class, SharedMemoryWrapper
, that lazily creates the sharable array when its M
property is accessed. The class has a cleanup
method that closes shared memory if a handle to it was created in this process. We arrange for cleanup
to be invoked when the pool process terminates.
Update 2
The Update 1 modified code still used twice the memory than what would be desirable, i.e. the memory taken up by the initial numpy
array and the shared memory copy. I believe that the problem can be solved by saving the original array to disk, allocating shared memory and updating shared memory by reading in the saved array bufsize
bytes at a time. In the updated code below if SAVE_MEMORY
is set to False
, then we essentially have the original processing that uses more memory but initializes shared memory more quickly.
from multiprocessing import Pool, set_start_method, shared_memory, current_process
import atexit
from time import perf_counter as now, sleep
import os
import numpy as np
SAVE_MEMORY = True
class SharedMemoryWrapper:
def __init__(self, shm_name, shape, dtype):
self._shm_name = shm_name
self._shape = shape
self._dtype = dtype
self._arr = None
self._shared_mem = None
@property
def M(self):
# Create this lazily:
if self._arr is None:
self._shared_mem = shared_memory.SharedMemory(name=self._shm_name)
self._arr = np.ndarray(self._shape, dtype=self._dtype, buffer=self._shared_mem.buf)
print('Process', current_process().pid, 'accessed shared memory')
return self._arr
def cleanup(self):
if self._shared_mem:
self._shared_mem.close()
print('Process', current_process().pid, 'cleaned up shared memory')
def init_worker(shm_name, shape, dtype):
"""Initializer function to set up shared memory for each worker"""
global shared_memory_wrapper
shared_memory_wrapper = SharedMemoryWrapper(shm_name, shape, dtype)
# We could also rename method cleanup to __del__ and
# then we would not need the next statement:
atexit.register(shared_memory_wrapper.cleanup)
def worker_task(args):
"""Worker function that reconstructs M using shared memory"""
M = shared_memory_wrapper.M
index, integer_arg = args
result = M[index, index] + M[integer_arg, integer_arg]
print(result)
return result
class ParallelProcessor:
def __init__(self):
pass
def run_parallel(self, tasks, num_cores=None):
"""Run tasks in parallel using spawn and shared memory"""
set_start_method('spawn', force=True) # Ensure 'spawn' is used
# Initialize the array in the main process
start = now()
n = 10 # Just for demo purposes so I don't blowout memory
shape = (n, n)
np.random.seed(7)
arr = np.random.rand(n, n)
dtype = arr.dtype
nbytes = arr.nbytes
if SAVE_MEMORY:
# Save the mumpy array to a temp file,
# (for demo purposes I will just use a local file)
path = 'test.dat'
arr.tofile(path)
# Now delete the array to reclaim storage:
del arr
# allocate and initialize shared memory
shared_mem = shared_memory.SharedMemory(create=True, size=nbytes)
M_local = np.ndarray(shape=shape, dtype=dtype, buffer=shared_mem.buf)
if SAVE_MEMORY:
offset = 0
bufsize = 1024 * 1024 # However large you can afford to make it
with open(path, 'rb') as f:
while True:
buffer = f.read(bufsize)
if (lnth := len(buffer)) == 0:
break
shared_mem.buf[offset:offset+lnth] = buffer
offset += lnth
os.unlink(path) # we do not need this anymore
else:
M_local[:] = arr[:]
del arr
print(f"************** {now() - start} seconds to make M")
# Prepare arguments for worker tasks
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
pool = Pool(num_cores, initializer=init_worker, initargs=(shared_mem.name, shape, dtype))
results = pool.map(worker_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
# Wait for all pool processes to terminate and thus all shared memory handles to be closed:
pool.close()
pool.join()
# Clean up shared memory:
shared_mem.close()
shared_mem.unlink()
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)
Prints:
************** 0.011767300005885772 seconds to make M
Process 12416 accessed shared memory
0.8800473254783326
1.0340419151253402
0.7077040338725532
0.46051985845418963
Process 5632 accessed shared memory
0.966397880098885
0.9364941327430198
************** 0.3403474999940954 seconds to run all jobs
Process 12416 cleaned up shared memory
Process 5632 cleaned up shared memory
Notes
My original code that used a class wrapper with a __del__
method that did the necessary cleanup when the pool is shutting down did not work because the implicit call to pool.terminate()
that occurs when the with Pool(...) as pool:
block exited prevented the normal shutdown of the pool processes. Had I used instead (as in the above posted code) explicit calls to pool.close()
followed by pool.join()
then everything would have worked as planned. When using a multithreading pool you can get away with using a with ...
context manager because the pools threads are not abruptly terminated when the block exits.
When the pool is gracefully shutdown with close
and join
, then any specified pool initializer will get to run for each pool process whereas when the call to pool.terminate()
is made in the context manager situation, the pool initializer might have only been invoked once for the one and only pool process that was able to process all tasks on the queue before the other pool processes finished starting and initializing. If you put a print statement in the pool initializer function, init_process
, and you modify the code to use a with Pool() ...
context manager, you will see the print statement executed far fewer than 6 times (usually just once). But then the cleanup
function may not run at all as a result of that change! Since gracefully shutting down the pool results in the pool initializer being called 6 times even though perhaps only one process handled all the submitted tasks, we have the situation where the numpy
array is recreated from shared memory 6 times only to be immediately followed by the execution of the cleanup
function. Although this is not too serious, I decided to implement class SharedMemoryWrapper
so that the actual array is lazily recreated only when the M
property is accessed, which as the code currently stands only occurs in the worker function worker_task
. Alternatively, init_worker
could be modified to create global variable M
once and for all to be used by the worker function if you prefer. If you are submitting enough tasks and they are sufficiently long running so that all pool processes will handle at least one submitted task, then you might as well do this.
Note that when you are using fork
to create child processes to process global data defined in the main process, there can still be a memory cost. Although memory is inherited from the main process, copy-on-write semantics are used so that any segment of memory that is modified by the child process must first be copied. Even if your code is not explicitly modifying memory, there are reference counts that get updated and thus some memory is bound to be copied.
spawn
because it's more consistent across platforms, not because it's better. You might be able to get comparable performance with shared memory, but I wouldn't bother with the trouble just because upstream is changing the default. – Charles Duffy Commented Feb 10 at 18:38M = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape, copy=False)
? – juanpa.arrivillaga Commented Feb 10 at 18:52import numpy; print(numpy.array([1,2,3,4]).reshape(2,2, copy=False))
, e.g. from a terminal,python -c 'import numpy; print(numpy.array([1,2,3,4]).reshape(2,2, copy=False))'
– juanpa.arrivillaga Commented Feb 10 at 20:39Maximum resident set size (kbytes): 6284592
. Using the fork version it gives meMaximum resident set size (kbytes): 3160404
. It looks like it is storing two versions of the array from that. – Simd Commented Feb 10 at 20:45