I need to parallelize a function that modifies values of one of the arguments. For example modify this simple code:
def Test(i, a, length):
if i % 2 == 0:
for x in range(0, length, 2):
a[x] = 2
else:
for x in range(1, length, 2):
a[x] = 1
a0 = numpy.ndarray(shape=10, dtype=numpy.uint8)
Test(0, a, a.shape[0])
Test(1, a, a.shape[0])
print(a)
I tried to use joblib
using Parallel(n_jobs=2)(delayed(Test)(i, a, 10) for i in range(2))
and multiprocessing.Pool
with:
with multiprocessing.Pool(processes=2) as Pool:
data = [(0, a, a.shape[0]), (1, a, a.shape[0])]
Pool.starmap(Test, data)
But both solutions do not work because they fork the data, so the parameters do not get modified.
What would be my options to do such parallelization?
I need to parallelize a function that modifies values of one of the arguments. For example modify this simple code:
def Test(i, a, length):
if i % 2 == 0:
for x in range(0, length, 2):
a[x] = 2
else:
for x in range(1, length, 2):
a[x] = 1
a0 = numpy.ndarray(shape=10, dtype=numpy.uint8)
Test(0, a, a.shape[0])
Test(1, a, a.shape[0])
print(a)
I tried to use joblib
using Parallel(n_jobs=2)(delayed(Test)(i, a, 10) for i in range(2))
and multiprocessing.Pool
with:
with multiprocessing.Pool(processes=2) as Pool:
data = [(0, a, a.shape[0]), (1, a, a.shape[0])]
Pool.starmap(Test, data)
But both solutions do not work because they fork the data, so the parameters do not get modified.
What would be my options to do such parallelization?
Share Improve this question asked Apr 1 at 0:11 FiReTiTiFiReTiTi 5,89813 gold badges34 silver badges62 bronze badges 6 | Show 1 more comment1 Answer
Reset to default 2We use two functions:
np_array_to_shared_array
: Takes a numpy array and initializes amultiprocessing.Array
instance with the contents of the numpy array.shared_array_to_np_array
: Takes amultiprocessing.Array
instance created withnp_array_to_shared_array
and reconstructs a numpy array whose buffer is the sharablemultiprocessing.Array
instance.
We then create our multiprocessing pool specifying a pool initializer that is passed a multiprocessi.Array
instance created with np_array_to_shared_array
and then using shared_array_to_np_array
reconstructs the sharable numpy array for each process in the pool as a global variable 'a':
import numpy as np
import multiprocessing
def np_array_to_shared_array(np_array: np.ndarray,
lock: bool=True
) -> multiprocessing.Array:
"""Initialize a multiprocessing.Array instance with the contents of a numpy array."""
shared_array = multiprocessing.Array('B', np_array.nbytes, lock=lock)
buffer = shared_array.get_obj() if lock else shared_array
arr = np.frombuffer(buffer, np_array.dtype)
arr[:] = np_array.flatten(order='C')
return shared_array
def shared_array_to_np_array(shared_array: multiprocessing.Array,
shape: tuple[int],
dtype: object
) -> np.ndarray:
"""Initialize a numpy array using a multiprocessing.Array instance as the buffer."""
buffer = (
shared_array.get_obj() if getattr(shared_array, 'get_obj', None)
else shared_array
)
return np.ndarray(shape, dtype=dtype, buffer=buffer)
def init_pool(shared_array, shape, dtype):
global a
a = shared_array_to_np_array(shared_array, shape, dtype)
def test(i):
len = a.shape[0]
if i % 2 == 0:
for x in range(0, len, 2):
a[x] = 2
else:
for x in range(1, len, 2):
a[x] = 1
def main():
arr = np.zeros(shape=10, dtype=np.uint8)
shape = arr.shape
dtype = arr.dtype
shared_array = np_array_to_shared_array(arr, lock=False)
arr = shared_array_to_np_array(shared_array, shape, dtype)
with multiprocessing.Pool(processes=2,
initializer=init_pool,
initargs=(shared_array, shape, dtype)
) as pool:
pool.map(test, [0, 1])
print(arr)
if __name__ == '__main__':
main()
Prints:
[2 1 2 1 2 1 2 1 2 1]
multiprocessing.shared_memory
. – Jérôme Richard Commented Apr 1 at 0:31starmap
? why notapply_async
? – Suramuthu R Commented Apr 1 at 5:15starmap
as I got used toapply_async
in most of the occasions. – Suramuthu R Commented Apr 1 at 6:41apply_async
andmap
, I had to find out from the internet aboutstarmap
which consumed some since I got immersed into few other things in the net. – Suramuthu R Commented Apr 1 at 9:43