I want to execute 2 functions in Python that execute requests under the hood. I don't mind to wait until the two have finished, but they should be executed in parallel.
I tried to use the concurrent.futures
library, but that requires me to use an async and so on until the function on top of all, which is synchronous and would need a huge refactoring to make it async.
I'm trying with this approach, but I'm not sure if this actually parallelises everything correctly
def worker(function, queue_prepared, *args):
result = function(*args)
queue_prepared.put(result)
def launch_threads(param1, param2):
queue_first = queue.Queue()
queue_second = queue.Queue()
thread_first = threading.Thread(target=worker, args=(request1, queue_first, param1, param2))
thread_second = threading.Thread(target=worker, args=(request2, queue_second, param1))
thread_first.start()
thread_second.start()
thread_first.join()
thread_second.join()
return queue_first, queue_second
queue_first, queue_second = launch_threads(param1,param2)
queue_first_finished = False
queue_second_finished = False
while not queue_first_finished or not queue_second_finished:
if not queue_first_finished:
if not queue_first.empty():
first = queue_first.get(timeout=1)
else:
queue_first_finished = True
if not queue_second_finished:
if not queue_second.empty():
first = queue_second.get(timeout=1)
else:
queue_second_finished = True
I want to execute 2 functions in Python that execute requests under the hood. I don't mind to wait until the two have finished, but they should be executed in parallel.
I tried to use the concurrent.futures
library, but that requires me to use an async and so on until the function on top of all, which is synchronous and would need a huge refactoring to make it async.
I'm trying with this approach, but I'm not sure if this actually parallelises everything correctly
def worker(function, queue_prepared, *args):
result = function(*args)
queue_prepared.put(result)
def launch_threads(param1, param2):
queue_first = queue.Queue()
queue_second = queue.Queue()
thread_first = threading.Thread(target=worker, args=(request1, queue_first, param1, param2))
thread_second = threading.Thread(target=worker, args=(request2, queue_second, param1))
thread_first.start()
thread_second.start()
thread_first.join()
thread_second.join()
return queue_first, queue_second
queue_first, queue_second = launch_threads(param1,param2)
queue_first_finished = False
queue_second_finished = False
while not queue_first_finished or not queue_second_finished:
if not queue_first_finished:
if not queue_first.empty():
first = queue_first.get(timeout=1)
else:
queue_first_finished = True
if not queue_second_finished:
if not queue_second.empty():
first = queue_second.get(timeout=1)
else:
queue_second_finished = True
Share
Improve this question
asked Nov 20, 2024 at 12:04
CarabesCarabes
6712 gold badges7 silver badges25 bronze badges
6
|
Show 1 more comment
1 Answer
Reset to default 1you don't need to reinvent your own threadpool with queues, instead use concurrent.futures.ThreadPoolExecutor and concurrent.futures.as_completed to get task results as soon as they complete, you can also add a timeout if needed.
import concurrent.futures
import time
def task(x):
time.sleep(x)
return x
pool = concurrent.futures.ThreadPoolExecutor(2)
wait_times = [2,1]
tasks = [pool.submit(task,i) for i in wait_times]
for future in concurrent.futures.as_completed(tasks):
print(future.result())
1
2
note the timeout part is tricky because it doesn't actually terminate the task, it only stops the loop, and destroying the threadpool joins the pool, which means you will still wait for the task to complete.
Also you might want to store this threadpool globally and reuse it in many places, because creating threads is not cheap (typically 10s-100s of microseconds) whereas creating tasks is cheap (in the order of a few microseconds)
launch_threads
" seems like a misleading name because that function doesn't merely launch the threads, it also waits for them to finish. A better name might be something like, "do_work_in_parallel." – Solomon Slow Commented Nov 20, 2024 at 13:12PyObject
has a lock that is locked before any action is done, even stack references have locks, this made python 5-10% slower (see the PEP) and if you load any C extension the GIL gets turned back ON until the library marks itself as safe without the gil, (you can still force it OFF but you might run into crashes) it is still very experimental at the moment. – Ahmed AEK Commented Nov 20, 2024 at 15:55