最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Celery worker allocate specific number of concurrencies to queue - Stack Overflow

programmeradmin3浏览0评论

I have two Celery tasks which both work on the same PyTorch GPU model for inference. Due to GPU memory constraints, I plan to use only one worker process so that the same model would be shared among subprocesses. I've learnt that the --concurrency option can be used to specify the level of concurrency, and the -Q option can be used to specify the worker's consuming queues.

Is it possible to allocate a specific number of concurrency to specific queues?

For example, there are two queues "A" and "B", if invoked with the following, the 4 threads created by Celery consume the tasks randomly without priority.

celery -A celery_task worker --pool=threads --concurrency=4 -Q A,B,B,B

The tasks are defined as: celery_task.py

import time
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(queue="A")
def simulate_long_work():
    print("Doing long work")
    time.sleep(10)
    print("Done long work")
    return 'Work completed'

@app.task(queue="B")
def simulate_short_work():
    print("Doing short work")
    time.sleep(1)
    print("Done short work")
    return 'Work completed'

Then enqueued by: celery_caller.py

import celery_task

celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()

One possible output, note a "short work" task gets consumed before the 4th "long work" even though they are produced after all "long work":

[2025-03-19 01:39:15,703: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,705: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,706: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,708: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:16,709: WARNING/MainProcess] Done short work
[2025-03-19 01:39:16,718: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:25,714: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,714: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,715: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,760: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:25,761: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:25,762: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:26,728: WARNING/MainProcess] Done long work
[2025-03-19 01:39:26,763: WARNING/MainProcess] Done short work
[2025-03-19 01:39:26,764: WARNING/MainProcess] Done short work
[2025-03-19 01:39:26,764: WARNING/MainProcess] Done short work

Can I allocate 3 concurrencies to the "short work" and only 1 concurrency to the "long work", so that "short work" tasks gets consumed faster while only using 1 worker?

I have two Celery tasks which both work on the same PyTorch GPU model for inference. Due to GPU memory constraints, I plan to use only one worker process so that the same model would be shared among subprocesses. I've learnt that the --concurrency option can be used to specify the level of concurrency, and the -Q option can be used to specify the worker's consuming queues.

Is it possible to allocate a specific number of concurrency to specific queues?

For example, there are two queues "A" and "B", if invoked with the following, the 4 threads created by Celery consume the tasks randomly without priority.

celery -A celery_task worker --pool=threads --concurrency=4 -Q A,B,B,B

The tasks are defined as: celery_task.py

import time
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(queue="A")
def simulate_long_work():
    print("Doing long work")
    time.sleep(10)
    print("Done long work")
    return 'Work completed'

@app.task(queue="B")
def simulate_short_work():
    print("Doing short work")
    time.sleep(1)
    print("Done short work")
    return 'Work completed'

Then enqueued by: celery_caller.py

import celery_task

celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_long_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()
celery_task.simulate_short_work.apply_async()

One possible output, note a "short work" task gets consumed before the 4th "long work" even though they are produced after all "long work":

[2025-03-19 01:39:15,703: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,705: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,706: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:15,708: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:16,709: WARNING/MainProcess] Done short work
[2025-03-19 01:39:16,718: WARNING/MainProcess] Doing long work
[2025-03-19 01:39:25,714: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,714: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,715: WARNING/MainProcess] Done long work
[2025-03-19 01:39:25,760: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:25,761: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:25,762: WARNING/MainProcess] Doing short work
[2025-03-19 01:39:26,728: WARNING/MainProcess] Done long work
[2025-03-19 01:39:26,763: WARNING/MainProcess] Done short work
[2025-03-19 01:39:26,764: WARNING/MainProcess] Done short work
[2025-03-19 01:39:26,764: WARNING/MainProcess] Done short work

Can I allocate 3 concurrencies to the "short work" and only 1 concurrency to the "long work", so that "short work" tasks gets consumed faster while only using 1 worker?

Share Improve this question asked Mar 19 at 1:51 HKJefferHKJeffer 2192 silver badges11 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

If you want a guaranteed number of processes (default) or threads (in your cases as you use thread pool) to be available for tasks in particular queue you simply run more than one Celery worker. In this particular case you would run one worker with -c 1 -Q A, and one with -c 3 -Q B.

Keep in mind since you use threads you are effectively sharing CPU among tasks from other queues, so important tasks could be slowed down at any point of time.

发布评论

评论列表(0)

  1. 暂无评论