Take this MWE:
from multiprocessing import Pool
from time import perf_counter as now
import numpy as np
def make_func():
n = 20000
np.random.seed(7)
M = np.random.rand(n, n)
return lambda x, y: M[x, x] + M[y, y]
class ParallelProcessor:
def __init__(self):
pass
def process_task(self, args):
"""Unpack arguments internally"""
index, integer_arg = args
print(f(index, integer_arg))
def run_parallel(self, tasks, num_cores=None):
"""Simplified parallel execution without partial"""
num_cores = num_cores
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
global f
f = make_func()
print(f"************** {now() - start} seconds to make f")
start = now()
with Pool(num_cores) as pool:
results = pool.map( self.process_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5], num_cores=2)
I have declared f
to be global. I think that means that a copy of the large numpy array will be made in each worker.
Alternatively I could use initializer with:
from multiprocessing import Pool
from time import perf_counter as now
import time
import os
import numpy as np
def make_func():
n = 20000
np.random.seed(7)
M = np.random.rand(n, n)
return lambda x, y: M[x, x] + M[y, y]
def init_worker():
global f
f = make_func()
class ParallelProcessor:
def __init__(self):
pass
def process_task(self, args):
"""Unpack arguments internally"""
index, integer_arg = args
print(f(index, integer_arg))
def run_parallel(self, tasks, num_cores=None):
"""Parallel execution with proper initialization"""
num_cores = num_cores or len(os.sched_getaffinity(0))
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
with Pool(num_cores, initializer=init_worker) as pool:
results = pool.map(self.process_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5], num_cores=2)
I am told this is better style but I can't see what the advantage is. I am not sure why f
has to be declared global in `init_worker. In any case a copy of the large numpy array is still sent to each worker. Overall it also seems to be slower.
I am using Linux.
Ideally I would like not to make a copy of the array at each worker. Is there a fast way to use shared memory to avoid that?
Take this MWE:
from multiprocessing import Pool
from time import perf_counter as now
import numpy as np
def make_func():
n = 20000
np.random.seed(7)
M = np.random.rand(n, n)
return lambda x, y: M[x, x] + M[y, y]
class ParallelProcessor:
def __init__(self):
pass
def process_task(self, args):
"""Unpack arguments internally"""
index, integer_arg = args
print(f(index, integer_arg))
def run_parallel(self, tasks, num_cores=None):
"""Simplified parallel execution without partial"""
num_cores = num_cores
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
global f
f = make_func()
print(f"************** {now() - start} seconds to make f")
start = now()
with Pool(num_cores) as pool:
results = pool.map( self.process_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5], num_cores=2)
I have declared f
to be global. I think that means that a copy of the large numpy array will be made in each worker.
Alternatively I could use initializer with:
from multiprocessing import Pool
from time import perf_counter as now
import time
import os
import numpy as np
def make_func():
n = 20000
np.random.seed(7)
M = np.random.rand(n, n)
return lambda x, y: M[x, x] + M[y, y]
def init_worker():
global f
f = make_func()
class ParallelProcessor:
def __init__(self):
pass
def process_task(self, args):
"""Unpack arguments internally"""
index, integer_arg = args
print(f(index, integer_arg))
def run_parallel(self, tasks, num_cores=None):
"""Parallel execution with proper initialization"""
num_cores = num_cores or len(os.sched_getaffinity(0))
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
with Pool(num_cores, initializer=init_worker) as pool:
results = pool.map(self.process_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5], num_cores=2)
I am told this is better style but I can't see what the advantage is. I am not sure why f
has to be declared global in `init_worker. In any case a copy of the large numpy array is still sent to each worker. Overall it also seems to be slower.
I am using Linux.
Ideally I would like not to make a copy of the array at each worker. Is there a fast way to use shared memory to avoid that?
Share Improve this question edited yesterday Simd asked yesterday SimdSimd 21.2k47 gold badges154 silver badges313 bronze badges 16 | Show 11 more comments2 Answers
Reset to default 1A worker initializer runs in the workers. If you need to run initialization in the workers, use a worker initializer.
For example, your first snippet performs initialization in the parent, and only the parent. If your workers are created with the fork
startmethod, they will inherit a state where this initialization has happened.
But if the workers are created with spawn
, as happens on Windows, they will be fresh processes where this initialization has not happened. f
will not exist in the workers.
If you want to share memory in a portable way (eg. if using Windows or MacOS), then you should use the multiprocessing.shared_memory
module. You can use this to create a shared memory block, which you can then use to back a numpy array.
NB. It should be noted that if your program is multi-threaded in the main/parent process, then you MUST use the spawn
method for multiprocessing (even on Linux). This is because fork()
only resumes a single arbitrary thread in the child process. This can lead to forever-broken state in the child. Without fork()
, you must rely on methods for starting child processes that do not allow CoW memory sharing.
Example Usage of SharedMemory
import atexit
import functools
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def init_child(name, shape, dtype):
global ARR
# initialise array from pre-existing shared memory buffer
mem = SharedMemory(name)
ARR = np.ndarray(shape, dtype=dtype, buffer=mem.buf)
# the following will also keep the SharedMemory object alive until the end
# of the child process
atexit.register(functools.partial(cleanup_child, mem))
def cleanup_child(mem: SharedMemory):
import os
print(f'child: cleaning up (PID: {os.getpid()})')
mem.close() # let OS know this process is done with the memory
print(f'child: done in PID {os.getpid()}')
def job(arg: tuple[int, int]) -> np.float64:
"""set value in ARR, and return original value"""
i, value = arg
original = ARR[i]
ARR[i] = value
return original
def main():
# initialise shared buffer and numpy wrapper around it
shape = (10,)
dtype = np.float64
# NB: OS may decide to allocate more memory than you requested. This is
# usually because an OS only wants to care about allocating pages and not
# individual bytes
mem = SharedMemory(
create=True, size=np.dtype(dtype).itemsize * np.prod(shape),
)
arr = np.ndarray(shape, dtype=dtype, buffer=mem.buf)
print('parent: starting multiprocessing pool')
# Use a context to force spawn over fork. This shows that data is only
# shared via the SharedMemory object.
ctx = mp.get_context('spawn')
with ctx.Pool(
initializer=init_child, initargs=(mem.name, shape, dtype),
) as pool:
# set some data in the array
# I would usually do this before creating the pool, but this way will
# prove the child process can see changes to the data the parent makes
arr[:] = np.arange(10, dtype=dtype)
assert np.all(arr == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
if False:
# or use a numpy Generator to fill an array in-place
np.random.default_rng().random(out=arr)
# ie. as no copy is required, it is more efficient than:
# arr[:] = np.random.random(shape)
# retrieve initial values in ARR from children and set value to 10-index
result = pool.map(job, [(i, 10-i) for i in range(10)])
# ensure we wait for child processes to close and run their cleanup
pool.close()
pool.join()
print('parent: done with pool')
# assert that memory changes by parent are visible in child processes
assert np.all(result == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
# assert that memory changes in child processes are visible in parent
assert np.all(arr == [10, 9, 8, 7, 6, 5, 4, 3, 2, 1])
# Let OS know parent process is done with the shared memory and release
# it back to the OS
mem.close()
mem.unlink()
print('parent: finished')
if __name__ == '__main__':
main()
f
before creating thePool
, so each process will get a copy of the same data structure. But if you usePool(num_cores, initializer=init_worker)
, the documentation says that each worker will call the initializer, so I would expect each worker to get a differentf
with a different randomM
. Can you confirm if they're all the same or all different? – joanis Commented yesterdaymake_func()
, so it'll be the same every time, either way. Then I guess version 1 creates it once and copies it in each process, while version 2 creates it in each process without having to copy it. – joanis Commented yesterdayf = None
beforedef make_func()
will fix that. – joanis Commented yesterday