最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Multiprocessing with nested paralellism in python - Stack Overflow

programmeradmin2浏览0评论

I've been having many issues trying to run some code with multiprocessing tools. I've found many examples of using different tools for the task, but most of them were too simplistic, thus I am not sure if I used them incorrectly or was it wrong to use them at all.

I have a large code that runs on several large datasets. To save time, I'd like to run the datasets in parallel. In general, my case is as follows:

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   funA(x1, x2, x3)
   funA(y1, y2, y3)

   # Some Code #

# Some Code #

funB(z1)
funB(z2)

# Some Code #

I'd like to run instances of funB in parallel and when the code reaches funA, I'd like its instances to parallelize as well. So far I've tried several solutions: Attempt 1:

from multiprocessing import Pool

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   pool1 = Pool()
   pool1.apply_async(funA, [x1, x2, x3])
   pool1.apply_async(funA, [y1, y2, y3])

   # Some Code #

# Some Code #

pool0 = Pool()
pool0.map(funB, [z1, z2])

# Some Code #

This gave me some windows error. Attempt 2:

from pathos.multiprocessing import ProcessingPool, ThreadingPool
amap = ProcessingPool().amap
tmap = ThreadingPool().map

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   tmap(funA, [x1, x2, x3])
   tmap(funA, [y1, y2, y3])

   # Some Code #

# Some Code #

amap(funB, [z1, z2])

# Some Code #

This also gave me some windows error. Attempt 3:

from concurrent.futures import ThreadPoolExecutor
Pool1 = ThreadPoolExecutor(max_workers=2)
Pool2 = ThreadPoolExecutor(max_workers=2)

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   Pool2.submit(funA, x1, x2, x3)
   Pool2.submit(funA, y1, y2, y3)
   Pool2.shutdown(wait=True)

   # Some Code #

# Some Code #

Pool1.submit(funB, [z1, z2])
Pool1.shutdown(wait=True)

# Some Code #

This does not crash, but it doesn't execute the functions either. It skips right away to the code that follows the parallel functions. I've read that using shutdown(wait=True) should prevent that, but even after adding it, the result is the same. It doesn't seem to execute the functions.

I would be very grateful if someone could explain to me how to do this properly. I am using a Windows 11 device and Anaconda with Python 3.11. Thank you all in advance.

I've been having many issues trying to run some code with multiprocessing tools. I've found many examples of using different tools for the task, but most of them were too simplistic, thus I am not sure if I used them incorrectly or was it wrong to use them at all.

I have a large code that runs on several large datasets. To save time, I'd like to run the datasets in parallel. In general, my case is as follows:

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   funA(x1, x2, x3)
   funA(y1, y2, y3)

   # Some Code #

# Some Code #

funB(z1)
funB(z2)

# Some Code #

I'd like to run instances of funB in parallel and when the code reaches funA, I'd like its instances to parallelize as well. So far I've tried several solutions: Attempt 1:

from multiprocessing import Pool

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   pool1 = Pool()
   pool1.apply_async(funA, [x1, x2, x3])
   pool1.apply_async(funA, [y1, y2, y3])

   # Some Code #

# Some Code #

pool0 = Pool()
pool0.map(funB, [z1, z2])

# Some Code #

This gave me some windows error. Attempt 2:

from pathos.multiprocessing import ProcessingPool, ThreadingPool
amap = ProcessingPool().amap
tmap = ThreadingPool().map

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   tmap(funA, [x1, x2, x3])
   tmap(funA, [y1, y2, y3])

   # Some Code #

# Some Code #

amap(funB, [z1, z2])

# Some Code #

This also gave me some windows error. Attempt 3:

from concurrent.futures import ThreadPoolExecutor
Pool1 = ThreadPoolExecutor(max_workers=2)
Pool2 = ThreadPoolExecutor(max_workers=2)

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   Pool2.submit(funA, x1, x2, x3)
   Pool2.submit(funA, y1, y2, y3)
   Pool2.shutdown(wait=True)

   # Some Code #

# Some Code #

Pool1.submit(funB, [z1, z2])
Pool1.shutdown(wait=True)

# Some Code #

This does not crash, but it doesn't execute the functions either. It skips right away to the code that follows the parallel functions. I've read that using shutdown(wait=True) should prevent that, but even after adding it, the result is the same. It doesn't seem to execute the functions.

I would be very grateful if someone could explain to me how to do this properly. I am using a Windows 11 device and Anaconda with Python 3.11. Thank you all in advance.

Share Improve this question asked Mar 10 at 15:10 DmitryPDmitryP 618 bronze badges 3
  • Take a look at this question – steve-ed Commented Mar 10 at 15:19
  • What errors do you get? – Adon Bilivit Commented Mar 10 at 16:03
  • If your funA and funB are not CPU intensive, you might want to try asyncio. – Hai Vu Commented Mar 22 at 18:32
Add a comment  | 

1 Answer 1

Reset to default 1

As I understand it you have a function, e.g. funB, that you would like to run in parallel and each time funB is invoked its processing uses funA multiple times with different arguments and you would like to run these invocations of funA in parallel.

In a perfect world you would create a multiprocessing pool to run your funB tasks in parallel and funB would create its own multiprocessing pool to run its funA tasks in parallel. The first problem with this is that multiprocessing pools create what's known as daemon processes, i.e. processes that are automatically terminated when their parent process terminates. But daemon processes are not allowed to create child processes, which would be required for funB to create its own multiprocessing pool. There is some "clever" coding that can be done (search this site) so that the multiprocessing pool instead creates non-daemon processes. But this raises another problem. Let's say you need to invoke funB M times and each invocation would create a multiprocessing pool of size N because it needs to invoke funA N times. That is a total of M * N processes being created to run all M * N funB invocations in parallel. First, process creation, especially on Windows (you didn't specify your platform as you are requested to do), is rather expensive. Moreover, M * N might be considerably larger than the number of CPUs you have. There is nothing to be gained by creating more processes than CPUs you have unless these processes are doing a fair amount of I/O or network processing and therefore are idle a large percentage of the time waiting for I/O or network requests to complete.

I suggest, therefore, that you create both a multithreading pool and a multiprocessing pool. On the assumption that most of your processing is CPU-intensive, the multiprocessing pool size should not exceed the number of CPUs you have. I would use the multiprocessing pool to run your funB invocations concurrently. Each invocation of funB will be passed the single multiprocessing pool that is used to execute in parallel its funA tasks and any other CPU-intensive processing it might have. For example:

from multiprocessing.pool import Pool
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
from functools import partial

import time

def fun_A(var1: int, var2: int, var3: int) -> int:
    # CPU-intensive code will be simulated
    print('fun_A called at time', time.time(), flush=True)
    time.sleep(3)
    return var1 * var2 * var3

def fun_B(pool: Pool,
          x1: int,
          x2: int,
          x3: int,
          y1: int,
          y2: int,
          y3: int
    ) -> int:
    result1 = pool.apply_async(fun_A, args=(x1, x2, x3))
    result2 = pool.apply_async(fun_A, args=(y1, y2, y3))

    return result1.get() + result2.get()

def main():
    args = [
        [1, 2, 3, 4, 5, 6],
        [10, 20, 30, 40, 50, 60],
        [100, 200, 300, 400, 500, 600],
    ]

    # First result should be (1 * 2 * 3) + (4 * 5 * 6) = 126
    # Subsequent results are 1000 times larger than the prior result

    thread_pool_size = len(args)
    # Each element of args is used to call fun_B, which
    # creates 2 tasks. But there is no point in creating
    # a pool size greater than the number of CPUs we have
    # if we assume that fun_B is very CPU-intensive with minimal
    # I/O or network activity:
    pool_size = min(cpu_count(), len(args) * 2)

    with ThreadPool(thread_pool_size) as thread_pool, \
    Pool(pool_size) as pool:
        t = time.monotonic()
        worker = partial(fun_B, pool)
        results = thread_pool.starmap(worker, args)
        elapsed_time = time.monotonic() - t

    print(results, 'elapsed time =', elapsed_time)

if __name__ == '__main__':
    main()

Prints:

fun_A called at time 1741726531.5217073
fun_A called at time 1741726531.5217073
fun_A called at time 1741726531.5217073
fun_A called at time 1741726531.5217073
fun_A called at time 1741726531.5217073
fun_A called at time 1741726531.5217073
[126, 126000, 126000000] elapsed time = 3.155999999959022
发布评论

评论列表(0)

  1. 暂无评论