最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Multithreading Python jobs to shared state - Stack Overflow

programmeradmin3浏览0评论

I think I'm lacking the understanding on multithreading in Python and the answers online are hurting my feeble brain.

I want to use multithreading to write json data to a list as it works. I have the below, the some_function function is a function that takes a dictionary runs a different (basic) web scraper, that returns a dictionary or nothing (it errors).

from multiprocessing import Pool

def some_function(job_dict: dict) -> dict:
    # some function

class RunJobs:
    def __init__(self, jobs: list[dict], threads=1):
       self.jobs, self.threads, self.state = jobs, threads, []

    def _run_sequentially(self):
        for job in self.jobs:
            self.state.append(some_function(job))

    def _run_multithreaded(self):
        pool = Pool(self.threads)
        job_queue = []
        for job in self.jobs:
            job_queue.append(pool.apply_async(some_function, args=(job)))
        pool.close()
        pool.join()
        
        for job in job_queue:
            self.state.append(job.get())

    def run(self):
        try:
            if self.threads == 1:
                self._run_sequentially()
            else:
                self._run_multithreaded()
        except KeyInterrupt:
            print(self.state)

How do I make it so this state is updated on multiple threads as they run, or is that not possible?

I think I'm lacking the understanding on multithreading in Python and the answers online are hurting my feeble brain.

I want to use multithreading to write json data to a list as it works. I have the below, the some_function function is a function that takes a dictionary runs a different (basic) web scraper, that returns a dictionary or nothing (it errors).

from multiprocessing import Pool

def some_function(job_dict: dict) -> dict:
    # some function

class RunJobs:
    def __init__(self, jobs: list[dict], threads=1):
       self.jobs, self.threads, self.state = jobs, threads, []

    def _run_sequentially(self):
        for job in self.jobs:
            self.state.append(some_function(job))

    def _run_multithreaded(self):
        pool = Pool(self.threads)
        job_queue = []
        for job in self.jobs:
            job_queue.append(pool.apply_async(some_function, args=(job)))
        pool.close()
        pool.join()
        
        for job in job_queue:
            self.state.append(job.get())

    def run(self):
        try:
            if self.threads == 1:
                self._run_sequentially()
            else:
                self._run_multithreaded()
        except KeyInterrupt:
            print(self.state)

How do I make it so this state is updated on multiple threads as they run, or is that not possible?

Share Improve this question edited Mar 21 at 8:26 probablyjg asked Mar 19 at 15:31 probablyjgprobablyjg 331 silver badge4 bronze badges
Add a comment  | 

3 Answers 3

Reset to default 1

As been pointed out by Arthur Belanger your code is using multiprocessing and not multithreading. If you want to be using multithreading, then you can do so with minimal changes. Instead of:

from multiprocessing import Pool

Do either:

from multiprocessing.dummy import Pool  # multithreading

or:

from multiprocessing.pool import ThreadPool as Pool  # multithreading

But you have other issues with your code. In your _run_multithreaded method you have:

            job_queue.append(pool.apply_async(self._run_single, args=(job)))

But your class does not have a _run_single method. Looking at your _run_sequentially method it would appear that you mean some_function instead of self._run_single. Also, the args argument to apply_async should be an iterable such as a tuple or list instance and each element will be an positional argument to your worker function, run_job. But enclosing job in parentheses does not make it a tuple; it is just a parenthesized expression which in this case is no different than had you just specified args = job (that is, (job) == job). What you need is:

            job_queue.append(pool.apply_async(some_function, args=(job,)))

Note that (job,) is a tuple of one element while (job) is not. You also have quite a few syntax errors. For example:

class RunJobs:
    self __init__(self, jobs: list[dict], threads=1):

That is not how you define a method. This should be:

class RunJobs:
    def __init__(self, jobs: list[dict], threads=1):

Putting it all together, you code should be:

from multiprocessing.dummy import Pool

def some_function(job_dict: dict) -> dict:
    # For demo purposes just return the input argument:
    return job_dict

class RunJobs:
    def __init__(self, jobs: list[dict], threads=1):
       self.jobs, self.threads, self.state = jobs, threads, []

    def _run_sequentially(self):
        for job in self.jobs:
            self.state.append(some_function(job))

    def _run_multithreaded(self):
        pool = Pool(self.threads)
        job_queue = []
        for job in self.jobs:
            job_queue.append(pool.apply_async(some_function, args=(job,)))
        pool.close()
        pool.join()

        for job in job_queue:
            self.state.append(job.get())

    def run(self):
        try:
            if self.threads == 1:
                self._run_sequentially()
            else:
                self._run_multithreaded()
        except KeyInterrupt:
            print(self.state)

if __name__ == '__main__':
    jobs = [
        {'x': 1},
        {'y': 2},
        {'z': 3}
    ]

    run_jobs = RunJobs(jobs, threads=3)
    run_jobs.run()
    print(run_jobs.state)

Prints:

[{'x': 1}, {'y': 2}, {'z': 3}]

This code should run correctly whether you use your original import statement (multiprocessing) or the above modified one (multithreading). Of course, depending on what some_function is actually doing, one will be more appropriate than the other.

If you insist on sharing state, here is another approach: Use queue to share state (state in this case is a tuple of (order, input, output))

Code:

# https://stackoverflow/q/79520670/459745
import random
import time
from concurrent.futures import ThreadPoolExecutor
from queue import PriorityQueue as Queue

from loguru import logger

def some_function(arg: dict) -> dict:
    logger.debug(f"Start job {arg=}")
    time.sleep(random.randint(1, 4))

    result = {**arg, "status": "done"}
    logger.debug(f"Finish job {result=}")
    return result

def store_result(order, arg, future, que):
    future.add_done_callback(lambda f: que.put((order, arg, f.result())))

def run_jobs(job, args, threads=None, result_queue: Queue = None) -> Queue:
    if result_queue is None:
        result_queue = Queue()

    with ThreadPoolExecutor(max_workers=threads) as executor:
        for order, arg in enumerate(args):
            future = executor.submit(job, arg)
            store_result(order, arg, future, result_queue)

    return result_queue

def main():
    logger.info("start")
    results = run_jobs(
        some_function,
        args=[{"job_id": i} for i in range(7)],
    )

    while not results.empty():
        order, arg, result = results.get()
        logger.info(f"[{order}] {arg} => {result}")

    logger.info("end")

if __name__ == "__main__":
    main()

Sample run

INFO     | __main__:main:36 - start
DEBUG    | __main__:some_function:11 - Start job arg={'job_id': 0}
DEBUG    | __main__:some_function:11 - Start job arg={'job_id': 1}
DEBUG    | __main__:some_function:11 - Start job arg={'job_id': 2}
DEBUG    | __main__:some_function:11 - Start job arg={'job_id': 3}
DEBUG    | __main__:some_function:11 - Start job arg={'job_id': 4}
DEBUG    | __main__:some_function:11 - Start job arg={'job_id': 5}
DEBUG    | __main__:some_function:11 - Start job arg={'job_id': 6}
DEBUG    | __main__:some_function:15 - Finish job result={'job_id': 4, 'status': 'done'}
DEBUG    | __main__:some_function:15 - Finish job result={'job_id': 0, 'status': 'done'}
DEBUG    | __main__:some_function:15 - Finish job result={'job_id': 1, 'status': 'done'}
DEBUG    | __main__:some_function:15 - Finish job result={'job_id': 5, 'status': 'done'}
DEBUG    | __main__:some_function:15 - Finish job result={'job_id': 2, 'status': 'done'}
DEBUG    | __main__:some_function:15 - Finish job result={'job_id': 3, 'status': 'done'}
DEBUG    | __main__:some_function:15 - Finish job result={'job_id': 6, 'status': 'done'}
INFO     | __main__:main:44 - [0] {'job_id': 0} => {'job_id': 0, 'status': 'done'}
INFO     | __main__:main:44 - [1] {'job_id': 1} => {'job_id': 1, 'status': 'done'}
INFO     | __main__:main:44 - [2] {'job_id': 2} => {'job_id': 2, 'status': 'done'}
INFO     | __main__:main:44 - [3] {'job_id': 3} => {'job_id': 3, 'status': 'done'}
INFO     | __main__:main:44 - [4] {'job_id': 4} => {'job_id': 4, 'status': 'done'}
INFO     | __main__:main:44 - [5] {'job_id': 5} => {'job_id': 5, 'status': 'done'}
INFO     | __main__:main:44 - [6] {'job_id': 6} => {'job_id': 6, 'status': 'done'}
INFO     | __main__:main:46 - end

Notes

  • This solution is more complex because of shared state
  • Do not use a list for shared state: it is not thread safe
  • I use PriorityQueue and order to ensure the order of output
  • If we do not care about the order, then use Queue. At which time, the order of output will be the order of execution completion.

A few observations before we start:

  • There is no need to share the state list among the threads. I avoid sharing state unless I absolutely have to. This way, the code will be simpler.
  • Instead of using multiprocessing or threading, I use concurrent.futures, which is simpler.
  • I make no distinction between single-thread and multiple-thread runs: I will use concurrent.futures.ThreadPoolExecutor for both cases. The only different is number of threads (1 or multiple)
  • I replace the class RunJobs with the run_jobs() function, which is much simpler.

Code:

import random
import time
from concurrent.futures import ThreadPoolExecutor

from loguru import logger

def some_function(job_in: dict) -> dict:
    logger.debug(f"Start job {job_in=}")
    # Simulate time it takes to finish job
    time.sleep(random.randint(1, 4))

    job_out = {**job_in, "status": "done"}
    logger.debug(f"Finish job {job_out=}")
    return job_out

def run_jobs(job, args, threads=None):
    with ThreadPoolExecutor(max_workers=threads) as executor:
        output = executor.map(job, args)
    return output

def main():
    logger.info("start")
    args_list = [{"job_id": i} for i in range(7)]
    for result in run_jobs(some_function, args_list, threads=4):
        logger.info(result)
    logger.info("end")

if __name__ == "__main__":
    main()

Sample run

INFO     | __main__:main:26 - start
DEBUG    | __main__:some_function:10 - Start job job_in={'job_id': 0}
DEBUG    | __main__:some_function:10 - Start job job_in={'job_id': 1}
DEBUG    | __main__:some_function:10 - Start job job_in={'job_id': 2}
DEBUG    | __main__:some_function:10 - Start job job_in={'job_id': 3}
DEBUG    | __main__:some_function:15 - Finish job job_out={'job_id': 0, 'status': 'done'}
DEBUG    | __main__:some_function:10 - Start job job_in={'job_id': 4}
DEBUG    | __main__:some_function:15 - Finish job job_out={'job_id': 2, 'status': 'done'}
DEBUG    | __main__:some_function:10 - Start job job_in={'job_id': 5}
DEBUG    | __main__:some_function:15 - Finish job job_out={'job_id': 1, 'status': 'done'}
DEBUG    | __main__:some_function:10 - Start job job_in={'job_id': 6}
DEBUG    | __main__:some_function:15 - Finish job job_out={'job_id': 5, 'status': 'done'}
DEBUG    | __main__:some_function:15 - Finish job job_out={'job_id': 3, 'status': 'done'}
DEBUG    | __main__:some_function:15 - Finish job job_out={'job_id': 6, 'status': 'done'}
DEBUG    | __main__:some_function:15 - Finish job job_out={'job_id': 4, 'status': 'done'}
INFO     | __main__:main:29 - {'job_id': 0, 'status': 'done'}
INFO     | __main__:main:29 - {'job_id': 1, 'status': 'done'}
INFO     | __main__:main:29 - {'job_id': 2, 'status': 'done'}
INFO     | __main__:main:29 - {'job_id': 3, 'status': 'done'}
INFO     | __main__:main:29 - {'job_id': 4, 'status': 'done'}
INFO     | __main__:main:29 - {'job_id': 5, 'status': 'done'}
INFO     | __main__:main:29 - {'job_id': 6, 'status': 'done'}
INFO     | __main__:main:30 - end

Notes

  • Function run_jobs() takes in:
    • job: the function which do the work,
    • args: a sequence of arguments for each call
    • threads: The number of threads, if threads is 1, the code will run sequentially. If threads is None, the concurrent.futures library will figure out the best number for your system. Any other non-zero positive number will specify the maximum number of threads.
  • run_jobs() is much simpler than the class RunJobs. It is more generic because it does not hard code the worker function some_function.
  • Due to the nature of the map() method, the output will be in the same order as the input.
  • I dressed up function some_function to add random delays to simulate random finish times
发布评论

评论列表(0)

  1. 暂无评论