I'm using Flask with BullMQ to handle background tasks for generating reports. While the job is successfully added to the queue and the worker appears to be initialized correctly ("Worker is ready and listening for jobs."
appears in the logs), the report generation task is never executed.
Full Code
from flask import Flask, request, jsonify
from bullmq import Queue, Worker
import asyncio
from Mytools import *
import signal
import sys
import logging
import threading
app = Flask(__name__)
queue = Queue("generate_reports_queue")
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ========================== #
# Report Generation Function
# ========================== #
async def generate_report_task(survey_id):
try:
logger.info(f"Starting report generation for survey ID: {survey_id}")
reponses, questions = fetch_responses(survey_id)
data_df = responses_json_to_df(reponses, "definition")
survey_data = bdv_json_to_df(questions["definition"].iloc[0])
output_path = f"output_{survey_id}.docx"
decision_maker_with_report_streaming(
data=data_df,
df_bbl=survey_data,
template="Template_.docx",
bucket_name="reu-data",
key=output_path
)
logger.info(f"Report generated successfully for survey ID: {survey_id}")
return {"message": "Report generated successfully", "file_path": output_path}
except Exception as e:
logger.error(f"Error generating report for survey ID {survey_id}: {str(e)}")
return {"error": str(e)}
# ========================== #
# Flask API
# ========================== #
@app.route('/generate-report', methods=['POST'])
async def generate_report():
data = request.get_json()
survey_id = data.get("id")
if not survey_id:
return jsonify({"error": "ID missing"}), 400
job = await queue.add("generate_report", {"survey_id": survey_id})
logger.info(f"Added job to queue with ID: {job.id}")
return jsonify({"message": "Task in progress", "task_id": job.id})
# ========================== #
# BullMQ Worker
# ========================== #
async def process_job(job):
survey_id = job.data.get("survey_id")
logger.info(f"Processing job with survey ID: {survey_id}")
return await generate_report_task(survey_id)
async def main_worker():
try:
logger.info("Initializing worker...")
worker = Worker("generate_reports_queue", process_job)
logger.info("Worker is ready and listening for jobs.")
await asyncio.Future() # Keep the worker running
except Exception as e:
logger.error(f"Worker error: {e}", exc_info=True)
def run_worker():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main_worker())
# Graceful shutdown handling
def shutdown(signal, frame):
logger.info("Shutting down gracefully...")
sys.exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
if __name__ == "__main__":
worker_thread = threading.Thread(target=run_worker, daemon=True)
worker_thread.start()
app.run(debug=True, use_reloader=False)
Expected Behavior
- The
generate_report_task
function should be called, and the report should be generated after the worker picks up the job.
Observed Behavior
- The job is successfully added to the queue.
"Worker is ready and listening for jobs."
appears in the logs.- However, the
generate_report_task
function is never called.
Question
What could be preventing the worker from processing the job? Are there any known issues or common mistakes I should check for in my setup? Any guidance would be appreciated!
here's what I get when I test the API
* Serving Flask app 'flask_api_with_bull2'
INFO:__main__:Initializing worker...
* Debug mode: on
INFO:__main__:Worker is ready and listening for jobs.
INFO:werkzeug:WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on http://127.0.0.1:5000
INFO:werkzeug:Press CTRL+C to quit
INFO:__main__:Added job to queue with ID: 8
INFO:werkzeug:127.0.0.1 - - [10/Mar/2025 15:18:01] "POST /generate-report HTTP/1.1" 200 -