最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Flask + BullMQ Worker: Job Added to Queue but Not Processed - Stack Overflow

programmeradmin4浏览0评论

I'm using Flask with BullMQ to handle background tasks for generating reports. While the job is successfully added to the queue and the worker appears to be initialized correctly ("Worker is ready and listening for jobs." appears in the logs), the report generation task is never executed.

Full Code

from flask import Flask, request, jsonify
from bullmq import Queue, Worker
import asyncio
from Mytools import *
import signal
import sys
import logging
import threading

app = Flask(__name__)
queue = Queue("generate_reports_queue")

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ========================== #
#  Report Generation Function
# ========================== #
async def generate_report_task(survey_id):
    try:
        logger.info(f"Starting report generation for survey ID: {survey_id}")
        reponses, questions = fetch_responses(survey_id)
        data_df = responses_json_to_df(reponses, "definition")
        survey_data = bdv_json_to_df(questions["definition"].iloc[0])

        output_path = f"output_{survey_id}.docx"

        decision_maker_with_report_streaming(
            data=data_df,
            df_bbl=survey_data,
            template="Template_.docx",
            bucket_name="reu-data",
            key=output_path
        )

        logger.info(f"Report generated successfully for survey ID: {survey_id}")
        return {"message": "Report generated successfully", "file_path": output_path}

    except Exception as e:
        logger.error(f"Error generating report for survey ID {survey_id}: {str(e)}")
        return {"error": str(e)}

# ========================== #
#  Flask API
# ========================== #
@app.route('/generate-report', methods=['POST'])
async def generate_report():
    data = request.get_json()
    survey_id = data.get("id")

    if not survey_id:
        return jsonify({"error": "ID missing"}), 400

    job = await queue.add("generate_report", {"survey_id": survey_id})
    logger.info(f"Added job to queue with ID: {job.id}")

    return jsonify({"message": "Task in progress", "task_id": job.id})

# ========================== #
#  BullMQ Worker
# ========================== #
async def process_job(job):
    survey_id = job.data.get("survey_id")
    logger.info(f"Processing job with survey ID: {survey_id}")
    return await generate_report_task(survey_id)

async def main_worker():
    try:
        logger.info("Initializing worker...")
        worker = Worker("generate_reports_queue", process_job)
        logger.info("Worker is ready and listening for jobs.")
        await asyncio.Future()  # Keep the worker running
    except Exception as e:
        logger.error(f"Worker error: {e}", exc_info=True)

def run_worker():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(main_worker())

# Graceful shutdown handling
def shutdown(signal, frame):
    logger.info("Shutting down gracefully...")
    sys.exit(0)

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

if __name__ == "__main__":
    worker_thread = threading.Thread(target=run_worker, daemon=True)
    worker_thread.start()
    app.run(debug=True, use_reloader=False)

Expected Behavior

  • The generate_report_task function should be called, and the report should be generated after the worker picks up the job.

Observed Behavior

  • The job is successfully added to the queue.
  • "Worker is ready and listening for jobs." appears in the logs.
  • However, the generate_report_task function is never called.

Question

What could be preventing the worker from processing the job? Are there any known issues or common mistakes I should check for in my setup? Any guidance would be appreciated!

here's what I get when I test the API

 * Serving Flask app 'flask_api_with_bull2'
INFO:__main__:Initializing worker...
 * Debug mode: on
INFO:__main__:Worker is ready and listening for jobs.
INFO:werkzeug:WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on http://127.0.0.1:5000
INFO:werkzeug:Press CTRL+C to quit
INFO:__main__:Added job to queue with ID: 8
INFO:werkzeug:127.0.0.1 - - [10/Mar/2025 15:18:01] "POST /generate-report HTTP/1.1" 200 -
发布评论

评论列表(0)

  1. 暂无评论