I have two Celery tasks which both work on the same PyTorch GPU model for inference. Due to GPU memory constraints, I plan to use only one worker process so that the same model would be shared among subprocesses. I've learnt that the --concurrency
option can be used to specify the level of concurrency, and the -Q
option can be used to specify the worker's consuming queues.
Is it possible to allocate a specific number of concurrency to specific queues?
For example, there are two queues "A" and "B", if invoked with the following, the 4 threads created by Celery consume the tasks randomly without priority.
celery -A celery_task worker --pool=threads --concurrency=4 -Q A,B,B,B
The tasks are defined as: celery_task.py
import time
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(queue="A")
def simulate_long_work():
print("Doing long work")
time.sleep(10)
print("Done long work")
return 'Work completed'
@app.task(queue="B")
def simulate_short_work():
print("Doing short work")
time.sleep(1)
print("Done short work")
return 'Work completed'
Then enqueued by: celery_caller.py
import celery_task
celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()
One possible output, note a "short work" task gets consumed before the 4th "long work" even though they are produced after all "long work":
[2025-03-19 01:39:15,703: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,705: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,706: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,708: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:16,709: WARNING/MainProcess] Done short work
[2025-03-19 01:39:16,718: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:25,714: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,714: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,715: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,760: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:25,761: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:25,762: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:26,728: WARNING/MainProcess] Done long work
[2025-03-19 01:39:26,763: WARNING/MainProcess] Done short work
[2025-03-19 01:39:26,764: WARNING/MainProcess] Done short work
[2025-03-19 01:39:26,764: WARNING/MainProcess] Done short work
Can I allocate 3 concurrencies to the "short work" and only 1 concurrency to the "long work", so that "short work" tasks gets consumed faster while only using 1 worker?
I have two Celery tasks which both work on the same PyTorch GPU model for inference. Due to GPU memory constraints, I plan to use only one worker process so that the same model would be shared among subprocesses. I've learnt that the --concurrency
option can be used to specify the level of concurrency, and the -Q
option can be used to specify the worker's consuming queues.
Is it possible to allocate a specific number of concurrency to specific queues?
For example, there are two queues "A" and "B", if invoked with the following, the 4 threads created by Celery consume the tasks randomly without priority.
celery -A celery_task worker --pool=threads --concurrency=4 -Q A,B,B,B
The tasks are defined as: celery_task.py
import time
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(queue="A")
def simulate_long_work():
print("Doing long work")
time.sleep(10)
print("Done long work")
return 'Work completed'
@app.task(queue="B")
def simulate_short_work():
print("Doing short work")
time.sleep(1)
print("Done short work")
return 'Work completed'
Then enqueued by: celery_caller.py
import celery_task
celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()
One possible output, note a "short work" task gets consumed before the 4th "long work" even though they are produced after all "long work":
[2025-03-19 01:39:15,703: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,705: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,706: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,708: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:16,709: WARNING/MainProcess] Done short work
[2025-03-19 01:39:16,718: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:25,714: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,714: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,715: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,760: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:25,761: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:25,762: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:26,728: WARNING/MainProcess] Done long work
[2025-03-19 01:39:26,763: WARNING/MainProcess] Done short work
[2025-03-19 01:39:26,764: WARNING/MainProcess] Done short work
[2025-03-19 01:39:26,764: WARNING/MainProcess] Done short work
Can I allocate 3 concurrencies to the "short work" and only 1 concurrency to the "long work", so that "short work" tasks gets consumed faster while only using 1 worker?
Share Improve this question asked Mar 19 at 1:51 HKJefferHKJeffer 2192 silver badges11 bronze badges1 Answer
Reset to default 0If you want a guaranteed number of processes (default) or threads (in your cases as you use thread pool) to be available for tasks in particular queue you simply run more than one Celery worker. In this particular case you would run one worker with -c 1 -Q A
, and one with -c 3 -Q B
.
Keep in mind since you use threads you are effectively sharing CPU among tasks from other queues, so important tasks could be slowed down at any point of time.