最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Parallel processing with arguments to modify - Stack Overflow

programmeradmin9浏览0评论

I need to parallelize a function that modifies values of one of the arguments. For example modify this simple code:

def Test(i, a, length):
    if i % 2 == 0:
        for x in range(0, length, 2):
            a[x] = 2
    else:
        for x in range(1, length, 2):
            a[x] = 1

a0 = numpy.ndarray(shape=10, dtype=numpy.uint8)
Test(0, a, a.shape[0])
Test(1, a, a.shape[0])
print(a)

I tried to use joblib using Parallel(n_jobs=2)(delayed(Test)(i, a, 10) for i in range(2)) and multiprocessing.Pool with:

with multiprocessing.Pool(processes=2) as Pool:
    data = [(0, a, a.shape[0]), (1, a, a.shape[0])]
    Pool.starmap(Test, data)

But both solutions do not work because they fork the data, so the parameters do not get modified.

What would be my options to do such parallelization?

I need to parallelize a function that modifies values of one of the arguments. For example modify this simple code:

def Test(i, a, length):
    if i % 2 == 0:
        for x in range(0, length, 2):
            a[x] = 2
    else:
        for x in range(1, length, 2):
            a[x] = 1

a0 = numpy.ndarray(shape=10, dtype=numpy.uint8)
Test(0, a, a.shape[0])
Test(1, a, a.shape[0])
print(a)

I tried to use joblib using Parallel(n_jobs=2)(delayed(Test)(i, a, 10) for i in range(2)) and multiprocessing.Pool with:

with multiprocessing.Pool(processes=2) as Pool:
    data = [(0, a, a.shape[0]), (1, a, a.shape[0])]
    Pool.starmap(Test, data)

But both solutions do not work because they fork the data, so the parameters do not get modified.

What would be my options to do such parallelization?

Share Improve this question asked Apr 1 at 0:11 FiReTiTiFiReTiTi 5,89813 gold badges34 silver badges62 bronze badges 6
  • 2 With multithreading data are shared, but with multiprocessing they are pickled (i.e. copied). If you want to mutate the input, then you need to use multithreading or to explicitly use shared memory with multiprocessing. See multiprocessing.shared_memory. – Jérôme Richard Commented Apr 1 at 0:31
  • why starmap? why not apply_async? – Suramuthu R Commented Apr 1 at 5:15
  • @SuramuthuR What would it change? – FiReTiTi Commented Apr 1 at 6:39
  • @FiReTiTi, No, I want to know whether there is any specific reason for using starmap as I got used to apply_async in most of the occasions. – Suramuthu R Commented Apr 1 at 6:41
  • @FiReTiTi, Sorry for the late reply. Check my answer. As I got used to apply_async and map, I had to find out from the internet about starmap which consumed some since I got immersed into few other things in the net. – Suramuthu R Commented Apr 1 at 9:43
 |  Show 1 more comment

1 Answer 1

Reset to default 2

We use two functions:

  1. np_array_to_shared_array: Takes a numpy array and initializes a multiprocessing.Array instance with the contents of the numpy array.
  2. shared_array_to_np_array: Takes a multiprocessing.Array instance created with np_array_to_shared_array and reconstructs a numpy array whose buffer is the sharable multiprocessing.Array instance.

We then create our multiprocessing pool specifying a pool initializer that is passed a multiprocessi.Array instance created with np_array_to_shared_array and then using shared_array_to_np_array reconstructs the sharable numpy array for each process in the pool as a global variable 'a':

import numpy as np
import multiprocessing

def np_array_to_shared_array(np_array: np.ndarray,
                             lock: bool=True
                             ) -> multiprocessing.Array:
    """Initialize a multiprocessing.Array instance with the contents of a numpy array."""

    shared_array = multiprocessing.Array('B', np_array.nbytes, lock=lock)
    buffer = shared_array.get_obj() if lock else shared_array
    arr = np.frombuffer(buffer, np_array.dtype)
    arr[:] = np_array.flatten(order='C')
    return shared_array

def shared_array_to_np_array(shared_array: multiprocessing.Array,
                             shape: tuple[int],
                             dtype: object
                             ) -> np.ndarray:
    """Initialize a numpy array using a multiprocessing.Array instance as the buffer."""

    buffer = (
        shared_array.get_obj() if getattr(shared_array, 'get_obj', None)
        else shared_array
    )
    return np.ndarray(shape, dtype=dtype, buffer=buffer)

def init_pool(shared_array, shape, dtype):
    global a

    a = shared_array_to_np_array(shared_array, shape, dtype)

def test(i):
    len = a.shape[0]

    if i % 2 == 0:
        for x in range(0, len, 2):
            a[x] = 2
    else:
        for x in range(1, len, 2):
            a[x] = 1

def main():
    arr = np.zeros(shape=10, dtype=np.uint8)
    shape = arr.shape
    dtype = arr.dtype
    shared_array = np_array_to_shared_array(arr, lock=False)
    arr = shared_array_to_np_array(shared_array, shape, dtype)

    with multiprocessing.Pool(processes=2,
                              initializer=init_pool,
                              initargs=(shared_array, shape, dtype)
                             ) as pool:
        pool.map(test, [0, 1])

    print(arr)


if __name__ == '__main__':
    main()

Prints:

[2 1 2 1 2 1 2 1 2 1]
发布评论

评论列表(0)

  1. 暂无评论