最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Celery Signals issue (task_success & task_failure) - Stack Overflow

programmeradmin1浏览0评论

Celery Signals such as task_success and task_failure doesn't work.

I have a project mounted in Docker and some asynchronous tasks. It runs a redis:alpine, uvicorn and celery services through docker-compose.

Redis runs by typing: docker run redis:alpine -p 6379:6379

Celery is executed by typing: python -m celery -A app.infrastructure.celery_tasks worker -E

I configured Celery instance as following:

# app/infrastructure/celery_tasks/celery_config.py

from celery import Celery
from kombu import Exchange, Queue

app = Celery(
    "tasks",
    broker="redis://redis:6379/0",
    backend="redis://redis:6379/0",
)

app.conf.update(
    broker_connection_retry_on_startup=True,
    global_retry_backoff=3,
    CELERY_TASK_ACKS_LATE=True,
    CELERY_TASK_RETRY_POLICY={
        "max_retries": 3,
        "interval_start": 0,
        "interval_step": 2,
        "interval_max": 30,
    },
    use_tz=False,
    enable_utc=True,

    worker_heartbeat=3600,
    broker_transport_options={"visibility_timeout": 3600},
    task_serializer="json",
    accept_content=["json", "application/json"],
    result_serializer="json",
    worker_send_task_events=True,
    task_track_started=True,
    result_extended=True,
    task_send_sent_event=True,
    task_allow_error_cb_on_chord_header=True,
    task_acks_on_failure_or_timeout=True,
)

task_exchange = Exchange("tasks", type="direct")

app.conf.task_queues = (
    Queue("common", task_exchange, routing_key="tasksmon"),
    # ...,
)

app.conf.task_routes = {
    "common": {"queue": "common"},
    # ...,
}

app.autodiscover_tasks(["app.infrastructure.celery_tasks"])
# app/infrastructure/celery_tasks/__init__.py

from .celery_config import app as celery_app

__all__ = ("celery_app",)

I declared an example task and task_success signal in tasks.py (separated from celery configuration file):

# app/infrastructure/celery_tasks/tasks.py

from celery.signals import task_success

@shared_task("task_example")
def example(user):
    return {"file": "video.mp4", "user_id": user}

@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
    print(f"Result: {result}") if result else None

I enqueue the task using:

# main.py

from app.infrastructure.celery_tasks.celery_config import app as celery_app

task = celery_app.send_task(
    "task_example",
    args=("foo"),
    queue="common",
    routing_key="tasksmon",
)

return {"task_id": task.id}

No task success event (signal) executed in any moment.

I supposed problems can be in:

  • Broker messaging (Redis).
  • Celery configuration.
  • Kombu

Any idea or suggestion? If you need more info, ask me for it.

发布评论

评论列表(0)

  1. 暂无评论