I think I'm lacking the understanding on multithreading in Python and the answers online are hurting my feeble brain.
I want to use multithreading to write json data to a list as it works. I have the below, the some_function
function is a function that takes a dictionary runs a different (basic) web scraper, that returns a dictionary or nothing (it errors).
from multiprocessing import Pool
def some_function(job_dict: dict) -> dict:
# some function
class RunJobs:
def __init__(self, jobs: list[dict], threads=1):
self.jobs, self.threads, self.state = jobs, threads, []
def _run_sequentially(self):
for job in self.jobs:
self.state.append(some_function(job))
def _run_multithreaded(self):
pool = Pool(self.threads)
job_queue = []
for job in self.jobs:
job_queue.append(pool.apply_async(some_function, args=(job)))
pool.close()
pool.join()
for job in job_queue:
self.state.append(job.get())
def run(self):
try:
if self.threads == 1:
self._run_sequentially()
else:
self._run_multithreaded()
except KeyInterrupt:
print(self.state)
How do I make it so this state is updated on multiple threads as they run, or is that not possible?
I think I'm lacking the understanding on multithreading in Python and the answers online are hurting my feeble brain.
I want to use multithreading to write json data to a list as it works. I have the below, the some_function
function is a function that takes a dictionary runs a different (basic) web scraper, that returns a dictionary or nothing (it errors).
from multiprocessing import Pool
def some_function(job_dict: dict) -> dict:
# some function
class RunJobs:
def __init__(self, jobs: list[dict], threads=1):
self.jobs, self.threads, self.state = jobs, threads, []
def _run_sequentially(self):
for job in self.jobs:
self.state.append(some_function(job))
def _run_multithreaded(self):
pool = Pool(self.threads)
job_queue = []
for job in self.jobs:
job_queue.append(pool.apply_async(some_function, args=(job)))
pool.close()
pool.join()
for job in job_queue:
self.state.append(job.get())
def run(self):
try:
if self.threads == 1:
self._run_sequentially()
else:
self._run_multithreaded()
except KeyInterrupt:
print(self.state)
How do I make it so this state is updated on multiple threads as they run, or is that not possible?
Share Improve this question edited Mar 21 at 8:26 probablyjg asked Mar 19 at 15:31 probablyjgprobablyjg 331 silver badge4 bronze badges3 Answers
Reset to default 1As been pointed out by Arthur Belanger your code is using multiprocessing and not multithreading. If you want to be using multithreading, then you can do so with minimal changes. Instead of:
from multiprocessing import Pool
Do either:
from multiprocessing.dummy import Pool # multithreading
or:
from multiprocessing.pool import ThreadPool as Pool # multithreading
But you have other issues with your code. In your _run_multithreaded
method you have:
job_queue.append(pool.apply_async(self._run_single, args=(job)))
But your class does not have a _run_single
method. Looking at your _run_sequentially
method it would appear that you mean some_function
instead of self._run_single
. Also, the args argument to apply_async
should be an iterable such as a tuple
or list
instance and each element will be an positional argument to your worker function, run_job
. But enclosing job
in parentheses does not make it a tuple; it is just a parenthesized expression which in this case is no different than had you just specified args = job
(that is, (job) == job
). What you need is:
job_queue.append(pool.apply_async(some_function, args=(job,)))
Note that (job,)
is a tuple of one element while (job)
is not. You also have quite a few syntax errors. For example:
class RunJobs:
self __init__(self, jobs: list[dict], threads=1):
That is not how you define a method. This should be:
class RunJobs:
def __init__(self, jobs: list[dict], threads=1):
Putting it all together, you code should be:
from multiprocessing.dummy import Pool
def some_function(job_dict: dict) -> dict:
# For demo purposes just return the input argument:
return job_dict
class RunJobs:
def __init__(self, jobs: list[dict], threads=1):
self.jobs, self.threads, self.state = jobs, threads, []
def _run_sequentially(self):
for job in self.jobs:
self.state.append(some_function(job))
def _run_multithreaded(self):
pool = Pool(self.threads)
job_queue = []
for job in self.jobs:
job_queue.append(pool.apply_async(some_function, args=(job,)))
pool.close()
pool.join()
for job in job_queue:
self.state.append(job.get())
def run(self):
try:
if self.threads == 1:
self._run_sequentially()
else:
self._run_multithreaded()
except KeyInterrupt:
print(self.state)
if __name__ == '__main__':
jobs = [
{'x': 1},
{'y': 2},
{'z': 3}
]
run_jobs = RunJobs(jobs, threads=3)
run_jobs.run()
print(run_jobs.state)
Prints:
[{'x': 1}, {'y': 2}, {'z': 3}]
This code should run correctly whether you use your original import statement (multiprocessing) or the above modified one (multithreading). Of course, depending on what some_function
is actually doing, one will be more appropriate than the other.
If you insist on sharing state, here is another approach: Use queue to share state (state in this case is a tuple of (order, input, output)
)
Code:
# https://stackoverflow/q/79520670/459745
import random
import time
from concurrent.futures import ThreadPoolExecutor
from queue import PriorityQueue as Queue
from loguru import logger
def some_function(arg: dict) -> dict:
logger.debug(f"Start job {arg=}")
time.sleep(random.randint(1, 4))
result = {**arg, "status": "done"}
logger.debug(f"Finish job {result=}")
return result
def store_result(order, arg, future, que):
future.add_done_callback(lambda f: que.put((order, arg, f.result())))
def run_jobs(job, args, threads=None, result_queue: Queue = None) -> Queue:
if result_queue is None:
result_queue = Queue()
with ThreadPoolExecutor(max_workers=threads) as executor:
for order, arg in enumerate(args):
future = executor.submit(job, arg)
store_result(order, arg, future, result_queue)
return result_queue
def main():
logger.info("start")
results = run_jobs(
some_function,
args=[{"job_id": i} for i in range(7)],
)
while not results.empty():
order, arg, result = results.get()
logger.info(f"[{order}] {arg} => {result}")
logger.info("end")
if __name__ == "__main__":
main()
Sample run
INFO | __main__:main:36 - start
DEBUG | __main__:some_function:11 - Start job arg={'job_id': 0}
DEBUG | __main__:some_function:11 - Start job arg={'job_id': 1}
DEBUG | __main__:some_function:11 - Start job arg={'job_id': 2}
DEBUG | __main__:some_function:11 - Start job arg={'job_id': 3}
DEBUG | __main__:some_function:11 - Start job arg={'job_id': 4}
DEBUG | __main__:some_function:11 - Start job arg={'job_id': 5}
DEBUG | __main__:some_function:11 - Start job arg={'job_id': 6}
DEBUG | __main__:some_function:15 - Finish job result={'job_id': 4, 'status': 'done'}
DEBUG | __main__:some_function:15 - Finish job result={'job_id': 0, 'status': 'done'}
DEBUG | __main__:some_function:15 - Finish job result={'job_id': 1, 'status': 'done'}
DEBUG | __main__:some_function:15 - Finish job result={'job_id': 5, 'status': 'done'}
DEBUG | __main__:some_function:15 - Finish job result={'job_id': 2, 'status': 'done'}
DEBUG | __main__:some_function:15 - Finish job result={'job_id': 3, 'status': 'done'}
DEBUG | __main__:some_function:15 - Finish job result={'job_id': 6, 'status': 'done'}
INFO | __main__:main:44 - [0] {'job_id': 0} => {'job_id': 0, 'status': 'done'}
INFO | __main__:main:44 - [1] {'job_id': 1} => {'job_id': 1, 'status': 'done'}
INFO | __main__:main:44 - [2] {'job_id': 2} => {'job_id': 2, 'status': 'done'}
INFO | __main__:main:44 - [3] {'job_id': 3} => {'job_id': 3, 'status': 'done'}
INFO | __main__:main:44 - [4] {'job_id': 4} => {'job_id': 4, 'status': 'done'}
INFO | __main__:main:44 - [5] {'job_id': 5} => {'job_id': 5, 'status': 'done'}
INFO | __main__:main:44 - [6] {'job_id': 6} => {'job_id': 6, 'status': 'done'}
INFO | __main__:main:46 - end
Notes
- This solution is more complex because of shared state
- Do not use a list for shared state: it is not thread safe
- I use
PriorityQueue
andorder
to ensure the order of output - If we do not care about the order, then use
Queue
. At which time, the order of output will be the order of execution completion.
A few observations before we start:
- There is no need to share the
state
list among the threads. I avoid sharing state unless I absolutely have to. This way, the code will be simpler. - Instead of using
multiprocessing
orthreading
, I useconcurrent.futures
, which is simpler. - I make no distinction between single-thread and multiple-thread runs: I will use
concurrent.futures.ThreadPoolExecutor
for both cases. The only different is number of threads (1 or multiple) - I replace the class
RunJobs
with therun_jobs()
function, which is much simpler.
Code:
import random
import time
from concurrent.futures import ThreadPoolExecutor
from loguru import logger
def some_function(job_in: dict) -> dict:
logger.debug(f"Start job {job_in=}")
# Simulate time it takes to finish job
time.sleep(random.randint(1, 4))
job_out = {**job_in, "status": "done"}
logger.debug(f"Finish job {job_out=}")
return job_out
def run_jobs(job, args, threads=None):
with ThreadPoolExecutor(max_workers=threads) as executor:
output = executor.map(job, args)
return output
def main():
logger.info("start")
args_list = [{"job_id": i} for i in range(7)]
for result in run_jobs(some_function, args_list, threads=4):
logger.info(result)
logger.info("end")
if __name__ == "__main__":
main()
Sample run
INFO | __main__:main:26 - start
DEBUG | __main__:some_function:10 - Start job job_in={'job_id': 0}
DEBUG | __main__:some_function:10 - Start job job_in={'job_id': 1}
DEBUG | __main__:some_function:10 - Start job job_in={'job_id': 2}
DEBUG | __main__:some_function:10 - Start job job_in={'job_id': 3}
DEBUG | __main__:some_function:15 - Finish job job_out={'job_id': 0, 'status': 'done'}
DEBUG | __main__:some_function:10 - Start job job_in={'job_id': 4}
DEBUG | __main__:some_function:15 - Finish job job_out={'job_id': 2, 'status': 'done'}
DEBUG | __main__:some_function:10 - Start job job_in={'job_id': 5}
DEBUG | __main__:some_function:15 - Finish job job_out={'job_id': 1, 'status': 'done'}
DEBUG | __main__:some_function:10 - Start job job_in={'job_id': 6}
DEBUG | __main__:some_function:15 - Finish job job_out={'job_id': 5, 'status': 'done'}
DEBUG | __main__:some_function:15 - Finish job job_out={'job_id': 3, 'status': 'done'}
DEBUG | __main__:some_function:15 - Finish job job_out={'job_id': 6, 'status': 'done'}
DEBUG | __main__:some_function:15 - Finish job job_out={'job_id': 4, 'status': 'done'}
INFO | __main__:main:29 - {'job_id': 0, 'status': 'done'}
INFO | __main__:main:29 - {'job_id': 1, 'status': 'done'}
INFO | __main__:main:29 - {'job_id': 2, 'status': 'done'}
INFO | __main__:main:29 - {'job_id': 3, 'status': 'done'}
INFO | __main__:main:29 - {'job_id': 4, 'status': 'done'}
INFO | __main__:main:29 - {'job_id': 5, 'status': 'done'}
INFO | __main__:main:29 - {'job_id': 6, 'status': 'done'}
INFO | __main__:main:30 - end
Notes
- Function
run_jobs()
takes in:job
: the function which do the work,args
: a sequence of arguments for each callthreads
: The number of threads, ifthreads
is 1, the code will run sequentially. Ifthreads
isNone
, theconcurrent.futures
library will figure out the best number for your system. Any other non-zero positive number will specify the maximum number of threads.
run_jobs()
is much simpler than the classRunJobs
. It is more generic because it does not hard code the worker functionsome_function
.- Due to the nature of the
map()
method, the output will be in the same order as the input. - I dressed up function
some_function
to add random delays to simulate random finish times