最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Subprocesses work perfectly, until there's more than 1 - Stack Overflow

programmeradmin2浏览0评论

I am working on a job submission script, that handles parallelised model fitting (with scipy.optimize) on Torque and Slurm clusters, as well as on the local hardware. The former two work fine, but the latter is giving me some issues.

My approach is as follows:

Given a model configuration and some data containing M response variables (RVs), I will have to fit M models.

  1. I split the data into n chunks, each chunk corresponding to an equal part of the M RVs. I save the data to disk.
  2. I create a .sh script that loads the data and fits a model on it.

The problem is only on the local machine and only when n>1. When n==1 this works perfectly. When n > 1, the processes slow down by several orders of magnitude, and they use all of my CPU. This makes me believe that there is some shared resource that they are contending for, but I can't imagine what that might be because I have all the resources saved separately on disk, and if I understand correctly, the python process that is created by the script has its own GIL.

My approaches to solve the problem so far have been:

  • Running the callable using multiprocess instead of with the bash script. Besides that this also does not work (but due to unserializable objects, or jobs seemingly disappearing without any result), I want to stick to the job script approach because it makes it easier to re-run failed models, and it is more consistent with how the batch jobs are run.
  • Trying the forks of multiprocess that use dill or cloudpickle internally, did not work.
  • Limiting the available cores per job with a custom preexec_fn, also resulted in an error: 'fork' resource unavailable.

My experience with multiprocess is limited, and although I have visited and read many other questions I have yet to understand what is special about my case. If anyone here could provide any insight, it would be very helpful.

What I tried: Running a bash script with multiprocess that runs a python script that handles some files saved to disk.

What I expect: I expect the process to run normally, about as fast as the process runs without multiprocess.

What actually resulted: A slowdown of around 5 orders of magnitude.

To illustrate how big of a slowdown it gives:

  • if n==1, fitting 2 models takes less than a second
  • if n==2, fitting 2 models take the whole night

EDIT:

This problem occurs on a Macbook with M3 processor, and on a Lenovo Thinkpad with Ubuntu Linux. I tried running the exact same script on a HPC compute node (but not submitting the models as separate jobs, just running local parallel jobs from within an interactive job), and there it completes without problems.

EDIT 2: A minimal working example.

#
import subprocess
import sys
def test_parallel_fitting(n_covariates, n_models, n_processes):
    # Create two identical but independent processes
    processes = []
    n_covariates = int(n_covariates)
    n_models = int(n_models)
    n_processes = int(n_processes)
    n_models_per_process = n_models // n_processes
    for i in range(n_processes):
        command = ["python", "-c", f"""
import numpy as np
import time
from scipy import optimize
# Simple model fitting that shouldn't interact with other processes
for f in range({n_models_per_process}):
    start_time = time.time()
    X = np.random.rand(1500, {n_covariates})
    y = np.random.rand(1500)
    def model(params, X, y):
        return np.sum((y - np.dot(X, params)) ** 2)
    result = optimize.minimize(model, x0=np.ones({n_covariates}), args=(X, y))
    print(f"Process {i}:",f," finished in ", time.time() - start_time, " seconds")
"""]
        p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        processes.append(p)
    
    # Wait for completion and time it
    for p in processes:
        p.wait()
        output, error = pmunicate()
        print(output.decode())
        print(error.decode())
    print("Parallel fitting completed")

if __name__ == "__main__":
    args = sys.argv[1:]
    test_parallel_fitting(*args)

Works flawlessly:

python script.py 6 6 3
python script.py 6 120 10
python script.py 7 120 1

Takes forever:

python script.py 7 6 3
python script.py 7 120 10
python script.py 7 120 2

So I think the number of covariates is the problem, maybe when the matrix multiplications exceed a certain size, another process handles them, and the parallel workers are waiting for that process to come available?

How does this script run for others?

EDIT 3: Formating and removing some superfluous text.

I am working on a job submission script, that handles parallelised model fitting (with scipy.optimize) on Torque and Slurm clusters, as well as on the local hardware. The former two work fine, but the latter is giving me some issues.

My approach is as follows:

Given a model configuration and some data containing M response variables (RVs), I will have to fit M models.

  1. I split the data into n chunks, each chunk corresponding to an equal part of the M RVs. I save the data to disk.
  2. I create a .sh script that loads the data and fits a model on it.

The problem is only on the local machine and only when n>1. When n==1 this works perfectly. When n > 1, the processes slow down by several orders of magnitude, and they use all of my CPU. This makes me believe that there is some shared resource that they are contending for, but I can't imagine what that might be because I have all the resources saved separately on disk, and if I understand correctly, the python process that is created by the script has its own GIL.

My approaches to solve the problem so far have been:

  • Running the callable using multiprocess instead of with the bash script. Besides that this also does not work (but due to unserializable objects, or jobs seemingly disappearing without any result), I want to stick to the job script approach because it makes it easier to re-run failed models, and it is more consistent with how the batch jobs are run.
  • Trying the forks of multiprocess that use dill or cloudpickle internally, did not work.
  • Limiting the available cores per job with a custom preexec_fn, also resulted in an error: 'fork' resource unavailable.

My experience with multiprocess is limited, and although I have visited and read many other questions I have yet to understand what is special about my case. If anyone here could provide any insight, it would be very helpful.

What I tried: Running a bash script with multiprocess that runs a python script that handles some files saved to disk.

What I expect: I expect the process to run normally, about as fast as the process runs without multiprocess.

What actually resulted: A slowdown of around 5 orders of magnitude.

To illustrate how big of a slowdown it gives:

  • if n==1, fitting 2 models takes less than a second
  • if n==2, fitting 2 models take the whole night

EDIT:

This problem occurs on a Macbook with M3 processor, and on a Lenovo Thinkpad with Ubuntu Linux. I tried running the exact same script on a HPC compute node (but not submitting the models as separate jobs, just running local parallel jobs from within an interactive job), and there it completes without problems.

EDIT 2: A minimal working example.

#
import subprocess
import sys
def test_parallel_fitting(n_covariates, n_models, n_processes):
    # Create two identical but independent processes
    processes = []
    n_covariates = int(n_covariates)
    n_models = int(n_models)
    n_processes = int(n_processes)
    n_models_per_process = n_models // n_processes
    for i in range(n_processes):
        command = ["python", "-c", f"""
import numpy as np
import time
from scipy import optimize
# Simple model fitting that shouldn't interact with other processes
for f in range({n_models_per_process}):
    start_time = time.time()
    X = np.random.rand(1500, {n_covariates})
    y = np.random.rand(1500)
    def model(params, X, y):
        return np.sum((y - np.dot(X, params)) ** 2)
    result = optimize.minimize(model, x0=np.ones({n_covariates}), args=(X, y))
    print(f"Process {i}:",f," finished in ", time.time() - start_time, " seconds")
"""]
        p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        processes.append(p)
    
    # Wait for completion and time it
    for p in processes:
        p.wait()
        output, error = p.communicate()
        print(output.decode())
        print(error.decode())
    print("Parallel fitting completed")

if __name__ == "__main__":
    args = sys.argv[1:]
    test_parallel_fitting(*args)

Works flawlessly:

python script.py 6 6 3
python script.py 6 120 10
python script.py 7 120 1

Takes forever:

python script.py 7 6 3
python script.py 7 120 10
python script.py 7 120 2

So I think the number of covariates is the problem, maybe when the matrix multiplications exceed a certain size, another process handles them, and the parallel workers are waiting for that process to come available?

How does this script run for others?

EDIT 3: Formating and removing some superfluous text.

Share Improve this question edited Feb 8 at 21:25 Augustijn de Boer asked Feb 7 at 11:37 Augustijn de BoerAugustijn de Boer 113 bronze badges New contributor Augustijn de Boer is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct. 20
  • 1 Why don't you use Python multiprocessing instead of subprocess? – Barmar Commented Feb 7 at 16:36
  • I have tried that, and I run into the same issue. I will give it another shot, but I am afraid that I will run into the same problem again. – Augustijn de Boer Commented Feb 7 at 17:51
  • Are you sure the CPU is the bottleneck? If it's disk access, using multiple processes won't help much. – Barmar Commented Feb 7 at 18:00
  • I don't think the CPU is the bottleneck, I think there is a shared resource that these processes are competing for, but I can't for the life of me figure out what it is. The job scripts, the data, and the executable are all written to disk separately for each job. The subprocess.Popen call should give each subpythonprocess that it creates its own GIL, so that can't be the problem either. However, htop shows full usage of my CPU when there are 2 jobs running, each fitting 1 model. FYI, these are not big models, but simple optimizations that usually finish in a matter of seconds. – Augustijn de Boer Commented Feb 7 at 18:47
  • It's hard to imagine what other shared resource there could be. – Barmar Commented Feb 7 at 18:48
 |  Show 15 more comments

1 Answer 1

Reset to default 0

After some digging I found out that it was indeed the BLAS backend that was causing problems. The amazing threadpoolctl package provides contexts that set limits on the backends, and a single line completely solved my problem.

The working example, but now running as expected:

import subprocess
import sys
def test_parallel_fitting(n_covariates, n_models, n_processes):
    # Create two identical but independent processes
    processes = []
    n_covariates = int(n_covariates)
    n_models = int(n_models)
    n_processes = int(n_processes)
    n_models_per_process = n_models // n_processes
    for i in range(n_processes):
        command = ["python", "-c", f"""
import numpy as np
import time
from scipy import optimize
import multiprocessing
from threadpoolctl import threadpool_limits

n_cpu_cores = multiprocessing.cpu_count()
cores_per_process = n_cpu_cores // {n_processes}
with threadpool_limits(limits=cores_per_process, user_api='blas'):
    for f in range({n_models_per_process}):
        start_time = time.time()
        X = np.random.rand(1500, {n_covariates})
        y = np.random.rand(1500)
        def model(params, X, y):
            return np.sum((y - np.dot(X, params)) ** 2)
        result = optimize.minimize(model, x0=np.ones({n_covariates}), args=(X, y))
        print(f"Process {i}:",f," finished in ", time.time() - start_time, " seconds")
"""]
        p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        processes.append(p)
    
    # Wait for completion and time it
    for p in processes:
        p.wait()
        output, error = p.communicate()
        print(output.decode())
        print(error.decode())
    print("Parallel fitting completed")

if __name__ == "__main__":
    args = sys.argv[1:]
    test_parallel_fitting(*args)

Of course setting num_processes > n_cpu_cores will still cause problems, but that's to be expected anyway, and I consider my problem solved.

Thanks Nick ODell for the pointer, I quickly zeroed in on the problem after that.

发布评论

评论列表(0)

  1. 暂无评论