最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Why should I pass a function using initializer and can I use shared memory instead? - Stack Overflow

programmeradmin2浏览0评论

Take this MWE:

from multiprocessing import Pool
from time import perf_counter as now
import numpy as np


def make_func():
    n = 20000
    np.random.seed(7)
    M = np.random.rand(n, n)
    return lambda x, y: M[x, x] + M[y, y]


class ParallelProcessor:
    def __init__(self):
        pass
        
    def process_task(self, args):
        """Unpack arguments internally"""
        index, integer_arg = args
        print(f(index, integer_arg))

    def run_parallel(self, tasks, num_cores=None):
        """Simplified parallel execution without partial"""
        num_cores = num_cores

        task_args = [(idx, val) for idx, val in enumerate(tasks)]
        start = now()
        global f
        f = make_func()
        print(f"************** {now() - start} seconds to make f")
        start = now()
        with Pool(num_cores) as pool:
            results = pool.map( self.process_task, task_args)
        print(f"************** {now() - start} seconds to run all jobs")
        return results


if __name__ == "__main__":
    processor = ParallelProcessor()
    processor.run_parallel(tasks=[1, 2, 3, 4, 5], num_cores=2)

I have declared f to be global. I think that means that a copy of the large numpy array will be made in each worker.

Alternatively I could use initializer with:

from multiprocessing import Pool
from time import perf_counter as now
import time
import os
import numpy as np

def make_func():
    n = 20000
    np.random.seed(7)
    M = np.random.rand(n, n)
    return lambda x, y: M[x, x] + M[y, y]

def init_worker():
    global f
    f = make_func()

class ParallelProcessor:
    def __init__(self):
        pass
        
    def process_task(self, args):
        """Unpack arguments internally"""
        index, integer_arg = args
        print(f(index, integer_arg))

    def run_parallel(self, tasks, num_cores=None):
        """Parallel execution with proper initialization"""
        num_cores = num_cores or len(os.sched_getaffinity(0))
        task_args = [(idx, val) for idx, val in enumerate(tasks)]
        
        start = now()
        with Pool(num_cores, initializer=init_worker) as pool:
            results = pool.map(self.process_task, task_args)
        print(f"************** {now() - start} seconds to run all jobs")
        return results

if __name__ == "__main__":
    processor = ParallelProcessor()
    processor.run_parallel(tasks=[1, 2, 3, 4, 5], num_cores=2)

I am told this is better style but I can't see what the advantage is. I am not sure why f has to be declared global in `init_worker. In any case a copy of the large numpy array is still sent to each worker. Overall it also seems to be slower.

I am using Linux.


Ideally I would like not to make a copy of the array at each worker. Is there a fast way to use shared memory to avoid that?

Take this MWE:

from multiprocessing import Pool
from time import perf_counter as now
import numpy as np


def make_func():
    n = 20000
    np.random.seed(7)
    M = np.random.rand(n, n)
    return lambda x, y: M[x, x] + M[y, y]


class ParallelProcessor:
    def __init__(self):
        pass
        
    def process_task(self, args):
        """Unpack arguments internally"""
        index, integer_arg = args
        print(f(index, integer_arg))

    def run_parallel(self, tasks, num_cores=None):
        """Simplified parallel execution without partial"""
        num_cores = num_cores

        task_args = [(idx, val) for idx, val in enumerate(tasks)]
        start = now()
        global f
        f = make_func()
        print(f"************** {now() - start} seconds to make f")
        start = now()
        with Pool(num_cores) as pool:
            results = pool.map( self.process_task, task_args)
        print(f"************** {now() - start} seconds to run all jobs")
        return results


if __name__ == "__main__":
    processor = ParallelProcessor()
    processor.run_parallel(tasks=[1, 2, 3, 4, 5], num_cores=2)

I have declared f to be global. I think that means that a copy of the large numpy array will be made in each worker.

Alternatively I could use initializer with:

from multiprocessing import Pool
from time import perf_counter as now
import time
import os
import numpy as np

def make_func():
    n = 20000
    np.random.seed(7)
    M = np.random.rand(n, n)
    return lambda x, y: M[x, x] + M[y, y]

def init_worker():
    global f
    f = make_func()

class ParallelProcessor:
    def __init__(self):
        pass
        
    def process_task(self, args):
        """Unpack arguments internally"""
        index, integer_arg = args
        print(f(index, integer_arg))

    def run_parallel(self, tasks, num_cores=None):
        """Parallel execution with proper initialization"""
        num_cores = num_cores or len(os.sched_getaffinity(0))
        task_args = [(idx, val) for idx, val in enumerate(tasks)]
        
        start = now()
        with Pool(num_cores, initializer=init_worker) as pool:
            results = pool.map(self.process_task, task_args)
        print(f"************** {now() - start} seconds to run all jobs")
        return results

if __name__ == "__main__":
    processor = ParallelProcessor()
    processor.run_parallel(tasks=[1, 2, 3, 4, 5], num_cores=2)

I am told this is better style but I can't see what the advantage is. I am not sure why f has to be declared global in `init_worker. In any case a copy of the large numpy array is still sent to each worker. Overall it also seems to be slower.

I am using Linux.


Ideally I would like not to make a copy of the array at each worker. Is there a fast way to use shared memory to avoid that?

Share Improve this question edited yesterday Simd asked yesterday SimdSimd 21.2k47 gold badges154 silver badges313 bronze badges 16
  • There's something I'm not sure about: in the first version, you create f before creating the Pool, so each process will get a copy of the same data structure. But if you use Pool(num_cores, initializer=init_worker), the documentation says that each worker will call the initializer, so I would expect each worker to get a different f with a different random M. Can you confirm if they're all the same or all different? – joanis Commented yesterday
  • Your first code block cannot run, "name 'f' is not defined" – Alfred Luu Commented yesterday
  • Oh, wait, never mind, you seed the random number generator in make_func(), so it'll be the same every time, either way. Then I guess version 1 creates it once and copies it in each process, while version 2 creates it in each process without having to copy it. – joanis Commented yesterday
  • @AlfredLuu I assume adding f = None before def make_func() will fix that. – joanis Commented yesterday
  • @joanis That makes sense. But version 2 is slower for me. Do you see the same thing? – Simd Commented yesterday
 |  Show 11 more comments

2 Answers 2

Reset to default 1

A worker initializer runs in the workers. If you need to run initialization in the workers, use a worker initializer.

For example, your first snippet performs initialization in the parent, and only the parent. If your workers are created with the fork startmethod, they will inherit a state where this initialization has happened.

But if the workers are created with spawn, as happens on Windows, they will be fresh processes where this initialization has not happened. f will not exist in the workers.

If you want to share memory in a portable way (eg. if using Windows or MacOS), then you should use the multiprocessing.shared_memory module. You can use this to create a shared memory block, which you can then use to back a numpy array.

NB. It should be noted that if your program is multi-threaded in the main/parent process, then you MUST use the spawn method for multiprocessing (even on Linux). This is because fork() only resumes a single arbitrary thread in the child process. This can lead to forever-broken state in the child. Without fork(), you must rely on methods for starting child processes that do not allow CoW memory sharing.

Example Usage of SharedMemory

import atexit
import functools
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
import numpy as np

def init_child(name, shape, dtype):
    global ARR
    # initialise array from pre-existing shared memory buffer
    mem = SharedMemory(name)
    ARR = np.ndarray(shape, dtype=dtype, buffer=mem.buf)
    # the following will also keep the SharedMemory object alive until the end 
    # of the child process
    atexit.register(functools.partial(cleanup_child, mem))

def cleanup_child(mem: SharedMemory):
    import os
    print(f'child: cleaning up (PID: {os.getpid()})')
    mem.close()  # let OS know this process is done with the memory
    print(f'child: done in PID {os.getpid()}')

def job(arg: tuple[int, int]) -> np.float64:
    """set value in ARR, and return original value"""
    i, value = arg
    original = ARR[i]
    ARR[i] = value
    return original

def main():
    # initialise shared buffer and numpy wrapper around it
    shape = (10,)
    dtype = np.float64
    # NB: OS may decide to allocate more memory than you requested. This is
    # usually because an OS only wants to care about allocating pages and not
    # individual bytes 
    mem = SharedMemory(
        create=True, size=np.dtype(dtype).itemsize * np.prod(shape),
    )
    arr = np.ndarray(shape, dtype=dtype, buffer=mem.buf)

    print('parent: starting multiprocessing pool')
    # Use a context to force spawn over fork. This shows that data is only 
    # shared via the SharedMemory object.
    ctx = mp.get_context('spawn')
    with ctx.Pool(
        initializer=init_child, initargs=(mem.name, shape, dtype),
    ) as pool:
        # set some data in the array
        # I would usually do this before creating the pool, but this way will
        # prove the child process can see changes to the data the parent makes
        arr[:] = np.arange(10, dtype=dtype)
        assert np.all(arr == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
        if False:
            # or use a numpy Generator to fill an array in-place
            np.random.default_rng().random(out=arr)
            # ie. as no copy is required, it is more efficient than:
            # arr[:] = np.random.random(shape)

        # retrieve initial values in ARR from children and set value to 10-index
        result = pool.map(job, [(i, 10-i) for i in range(10)])

        # ensure we wait for child processes to close and run their cleanup
        pool.close()
        pool.join()
    print('parent: done with pool')

    # assert that memory changes by parent are visible in child processes
    assert np.all(result == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
    # assert that memory changes in child processes are visible in parent
    assert np.all(arr == [10, 9, 8, 7, 6, 5, 4, 3, 2, 1])

    # Let OS know parent process is done with the shared memory and release
    # it back to the OS
    mem.close()
    mem.unlink()

    print('parent: finished')

if __name__ == '__main__':
    main()
发布评论

评论列表(0)

  1. 暂无评论