最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Can `spawn` be made as memory efficient as `fork` with multiprocessing? - Stack Overflow

programmeradmin1浏览0评论

I am on Linux and have working multiprocessing code that uses fork. Here is a MWE version:

from multiprocessing import Pool
from time import perf_counter as now
import numpy as np


def make_func():
    n = 20000
    np.random.seed(6)
    start = now()
    M = np.random.rand(n, n)
    return lambda x, y: M[x, x] + M[y, y]


class ParallelProcessor:
    def __init__(self):
        pass
        
    def process_task(self, args):
        """Unpack arguments internally"""
        index, integer_arg = args
        print(f(index, integer_arg))

    def run_parallel(self, tasks, num_cores=None):
        """Simplified parallel execution without partial"""
        num_cores = num_cores

        task_args = [(idx, val) for idx, val in enumerate(tasks)]
        start = now()
        global f
        f = make_func()
        print(f"************** {now() - start} seconds to make f")
        start = now()
        with Pool(num_cores) as pool:
            results = pool.map( self.process_task, task_args)
        print(f"************** {now() - start} seconds to run all jobs")
        return results


if __name__ == "__main__":
    processor = ParallelProcessor()
    processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)

This uses the fact that global f is shared with the workers and as the numpy array is not modified by them, it is not copied.

In 3.14 Linux multiprocessing will be moved to spawn. So in anticipation I made a version using spawn.

from multiprocessing import Pool, RawArray, set_start_method
from time import perf_counter as now
import numpy as np

def init_worker(shared_array_base, shape):
    """Initializer function to set up shared memory for each worker"""
    global M
    M = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape)

def worker_task(args):
    """Worker function that reconstructs f using shared memory"""
    index, integer_arg = args
    result = M[index, index] + M[integer_arg, integer_arg]
    print(result)
    return result

class ParallelProcessor:
    def __init__(self):
        pass

    def run_parallel(self, tasks, num_cores=None):
        """Run tasks in parallel using spawn and shared memory"""
        set_start_method('spawn', force=True)  # Ensure 'spawn' is used

        n = 20000
        shape = (n, n)

        # Use 'd' for double-precision float (float64) instead of np.float64
        shared_array_base = RawArray('d', n * n)
        M_local = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape)

        # Initialize the array in the main process
        np.random.seed(7)
        start = now()
        M_local[:] = np.random.rand(n, n)
        print(f"************** {now() - start} seconds to make M")

        # Prepare arguments for worker tasks
        task_args = [(idx, val) for idx, val in enumerate(tasks)]

        start = now()
        with Pool(num_cores, initializer=init_worker, initargs=(shared_array_base, shape)) as pool:
            results = pool.map(worker_task, task_args)
        print(f"************** {now() - start} seconds to run all jobs")
        return results

if __name__ == "__main__":
    processor = ParallelProcessor()
    processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)

Unfortunately, not only is this slower, it also seems to use much more memory. Maybe it is making a copy of the numpy array?

Is there a way to make the spawn version as efficient as the fork version?

I am on Linux and have working multiprocessing code that uses fork. Here is a MWE version:

from multiprocessing import Pool
from time import perf_counter as now
import numpy as np


def make_func():
    n = 20000
    np.random.seed(6)
    start = now()
    M = np.random.rand(n, n)
    return lambda x, y: M[x, x] + M[y, y]


class ParallelProcessor:
    def __init__(self):
        pass
        
    def process_task(self, args):
        """Unpack arguments internally"""
        index, integer_arg = args
        print(f(index, integer_arg))

    def run_parallel(self, tasks, num_cores=None):
        """Simplified parallel execution without partial"""
        num_cores = num_cores

        task_args = [(idx, val) for idx, val in enumerate(tasks)]
        start = now()
        global f
        f = make_func()
        print(f"************** {now() - start} seconds to make f")
        start = now()
        with Pool(num_cores) as pool:
            results = pool.map( self.process_task, task_args)
        print(f"************** {now() - start} seconds to run all jobs")
        return results


if __name__ == "__main__":
    processor = ParallelProcessor()
    processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)

This uses the fact that global f is shared with the workers and as the numpy array is not modified by them, it is not copied.

In 3.14 Linux multiprocessing will be moved to spawn. So in anticipation I made a version using spawn.

from multiprocessing import Pool, RawArray, set_start_method
from time import perf_counter as now
import numpy as np

def init_worker(shared_array_base, shape):
    """Initializer function to set up shared memory for each worker"""
    global M
    M = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape)

def worker_task(args):
    """Worker function that reconstructs f using shared memory"""
    index, integer_arg = args
    result = M[index, index] + M[integer_arg, integer_arg]
    print(result)
    return result

class ParallelProcessor:
    def __init__(self):
        pass

    def run_parallel(self, tasks, num_cores=None):
        """Run tasks in parallel using spawn and shared memory"""
        set_start_method('spawn', force=True)  # Ensure 'spawn' is used

        n = 20000
        shape = (n, n)

        # Use 'd' for double-precision float (float64) instead of np.float64
        shared_array_base = RawArray('d', n * n)
        M_local = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape)

        # Initialize the array in the main process
        np.random.seed(7)
        start = now()
        M_local[:] = np.random.rand(n, n)
        print(f"************** {now() - start} seconds to make M")

        # Prepare arguments for worker tasks
        task_args = [(idx, val) for idx, val in enumerate(tasks)]

        start = now()
        with Pool(num_cores, initializer=init_worker, initargs=(shared_array_base, shape)) as pool:
            results = pool.map(worker_task, task_args)
        print(f"************** {now() - start} seconds to run all jobs")
        return results

if __name__ == "__main__":
    processor = ParallelProcessor()
    processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)

Unfortunately, not only is this slower, it also seems to use much more memory. Maybe it is making a copy of the numpy array?

Is there a way to make the spawn version as efficient as the fork version?

Share Improve this question edited Feb 10 at 18:28 Simd asked Feb 10 at 18:23 SimdSimd 21.4k48 gold badges154 silver badges313 bronze badges 7
  • I would argue that the default is being changed to spawn because it's more consistent across platforms, not because it's better. You might be able to get comparable performance with shared memory, but I wouldn't bother with the trouble just because upstream is changing the default. – Charles Duffy Commented Feb 10 at 18:38
  • does anything change if you do M = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape, copy=False)? – juanpa.arrivillaga Commented Feb 10 at 18:52
  • @juanpa.arrivillaga 'copy' is an invalid keyword argument. Did you mean something different? – Simd Commented Feb 10 at 20:36
  • @Simd what version of numpy are you using? It should definitely be available. E.g. the following works: import numpy; print(numpy.array([1,2,3,4]).reshape(2,2, copy=False)), e.g. from a terminal, python -c 'import numpy; print(numpy.array([1,2,3,4]).reshape(2,2, copy=False))' – juanpa.arrivillaga Commented Feb 10 at 20:39
  • 1 @juanpa.arrivillaga I upgraded to numpy 2 (I was on 1.26.4) and it is supported and the code runs. I am not sure it helps with the memory problem though. /usr/bin/time -v gives meMaximum resident set size (kbytes): 6284592. Using the fork version it gives me Maximum resident set size (kbytes): 3160404. It looks like it is storing two versions of the array from that. – Simd Commented Feb 10 at 20:45
 |  Show 2 more comments

1 Answer 1

Reset to default 2

Update 1

My original code was relying on thread local storage to be cleaned up when the pool terminates. While this has been reliable for me when using a multithreading pool, with a multiprocessing pool not so much. In this version we create a wrapper class, SharedMemoryWrapper, that lazily creates the sharable array when its M property is accessed. The class has a cleanup method that closes shared memory if a handle to it was created in this process. We arrange for cleanup to be invoked when the pool process terminates.

Update 2

The Update 1 modified code still used twice the memory than what would be desirable, i.e. the memory taken up by the initial numpy array and the shared memory copy. I believe that the problem can be solved by saving the original array to disk, allocating shared memory and updating shared memory by reading in the saved array bufsize bytes at a time. In the updated code below if SAVE_MEMORYis set to False, then we essentially have the original processing that uses more memory but initializes shared memory more quickly.

from multiprocessing import Pool, set_start_method, shared_memory, current_process
import atexit
from time import perf_counter as now, sleep
import os
import numpy as np

SAVE_MEMORY = True

class SharedMemoryWrapper:
    def __init__(self, shm_name, shape, dtype):
        self._shm_name = shm_name
        self._shape = shape
        self._dtype = dtype
        self._arr = None
        self._shared_mem = None

    @property
    def M(self):
        # Create this lazily:
        if self._arr is None:
            self._shared_mem = shared_memory.SharedMemory(name=self._shm_name)
            self._arr = np.ndarray(self._shape, dtype=self._dtype, buffer=self._shared_mem.buf)
            print('Process', current_process().pid, 'accessed shared memory')

        return self._arr

    def cleanup(self):
        if self._shared_mem:
            self._shared_mem.close()
            print('Process', current_process().pid, 'cleaned up shared memory')


def init_worker(shm_name, shape, dtype):
    """Initializer function to set up shared memory for each worker"""
    global shared_memory_wrapper

    shared_memory_wrapper = SharedMemoryWrapper(shm_name, shape, dtype)
    # We could also rename method cleanup to __del__ and
    # then we would not need the next statement:
    atexit.register(shared_memory_wrapper.cleanup)

def worker_task(args):
    """Worker function that reconstructs M using shared memory"""
    M = shared_memory_wrapper.M

    index, integer_arg = args
    result = M[index, index] + M[integer_arg, integer_arg]
    print(result)
    return result

class ParallelProcessor:
    def __init__(self):
        pass

    def run_parallel(self, tasks, num_cores=None):
        """Run tasks in parallel using spawn and shared memory"""
        set_start_method('spawn', force=True)  # Ensure 'spawn' is used

        # Initialize the array in the main process
        start = now()
        n = 10 # Just for demo purposes so I don't blowout memory
        shape = (n, n)
        np.random.seed(7)
        arr = np.random.rand(n, n)
        dtype = arr.dtype
        nbytes = arr.nbytes

        if SAVE_MEMORY:
            # Save the mumpy array to a temp file,
            # (for demo purposes I will just use a local file)
            path = 'test.dat'
            arr.tofile(path)
            # Now delete the array to reclaim storage:
            del arr

        # allocate and initialize shared memory
        shared_mem = shared_memory.SharedMemory(create=True, size=nbytes)
        M_local = np.ndarray(shape=shape, dtype=dtype, buffer=shared_mem.buf)

        if SAVE_MEMORY:
            offset = 0
            bufsize = 1024 * 1024  # However large you can afford to make it
            with open(path, 'rb') as f:
                while True:
                    buffer = f.read(bufsize)
                    if (lnth := len(buffer)) == 0:
                        break
                    shared_mem.buf[offset:offset+lnth] = buffer
                    offset += lnth
            os.unlink(path)  # we do not need this anymore
        else:
            M_local[:] = arr[:]
            del arr

        print(f"************** {now() - start} seconds to make M")

        # Prepare arguments for worker tasks
        task_args = [(idx, val) for idx, val in enumerate(tasks)]

        start = now()
        pool = Pool(num_cores, initializer=init_worker, initargs=(shared_mem.name, shape, dtype))
        results = pool.map(worker_task, task_args)
        print(f"************** {now() - start} seconds to run all jobs")

        # Wait for all pool processes to terminate and thus all shared memory handles to be closed:
        pool.close()
        pool.join()

        # Clean up shared memory:
        shared_mem.close()
        shared_mem.unlink()

        return results

if __name__ == "__main__":
    processor = ParallelProcessor()
    processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)

Prints:

************** 0.011767300005885772 seconds to make M
Process 12416 accessed shared memory
0.8800473254783326
1.0340419151253402
0.7077040338725532
0.46051985845418963
Process 5632 accessed shared memory
0.966397880098885
0.9364941327430198
************** 0.3403474999940954 seconds to run all jobs
Process 12416 cleaned up shared memory
Process 5632 cleaned up shared memory

Notes

My original code that used a class wrapper with a __del__ method that did the necessary cleanup when the pool is shutting down did not work because the implicit call to pool.terminate() that occurs when the with Pool(...) as pool: block exited prevented the normal shutdown of the pool processes. Had I used instead (as in the above posted code) explicit calls to pool.close() followed by pool.join() then everything would have worked as planned. When using a multithreading pool you can get away with using a with ... context manager because the pools threads are not abruptly terminated when the block exits.

When the pool is gracefully shutdown with close and join, then any specified pool initializer will get to run for each pool process whereas when the call to pool.terminate() is made in the context manager situation, the pool initializer might have only been invoked once for the one and only pool process that was able to process all tasks on the queue before the other pool processes finished starting and initializing. If you put a print statement in the pool initializer function, init_process, and you modify the code to use a with Pool() ... context manager, you will see the print statement executed far fewer than 6 times (usually just once). But then the cleanup function may not run at all as a result of that change! Since gracefully shutting down the pool results in the pool initializer being called 6 times even though perhaps only one process handled all the submitted tasks, we have the situation where the numpy array is recreated from shared memory 6 times only to be immediately followed by the execution of the cleanup function. Although this is not too serious, I decided to implement class SharedMemoryWrapper so that the actual array is lazily recreated only when the M property is accessed, which as the code currently stands only occurs in the worker function worker_task. Alternatively, init_worker could be modified to create global variable M once and for all to be used by the worker function if you prefer. If you are submitting enough tasks and they are sufficiently long running so that all pool processes will handle at least one submitted task, then you might as well do this.

Note that when you are using fork to create child processes to process global data defined in the main process, there can still be a memory cost. Although memory is inherited from the main process, copy-on-write semantics are used so that any segment of memory that is modified by the child process must first be copied. Even if your code is not explicitly modifying memory, there are reference counts that get updated and thus some memory is bound to be copied.

发布评论

评论列表(0)

  1. 暂无评论