I have this code -
event_queue = asyncio.Queue()
@router.get("/stream")
async def stream():
async def event_generator():
while True:
try:
event = await event_queue.get()
yield f"data: {json.dumps(event)}\n\n"
except asyncio.CancelledError:
break
return StreamingResponse(event_generator(), media_type="text/event-stream")
and in my angular frontend, when the user generate an a image, I'm subscribing to stream. the way the image generation work - i'm waiting for a webhook with an image to get to my /webhook, and when its there I want to update my frontend user images (that's why i need sse).
@router.post("/webhook")
async def generate_status(request: Request, db: db_dependency):
# unrelated login here
await event_queue.put({"status": "uploaded"})
return {"message": "Webhook processed and image uploaded"}
so as you can see here I'm adding a event when its there. the problem is that its unstable, sometimes it works sometimes it doesnt, and I think that if multiple users use the same queue, it will bug out entirely..
What is a good way to implement SSE from fastapi?
I'm trying to get answers how to trigger events from a function, in fastapi - i didnt see anything that works about it online
I have this code -
event_queue = asyncio.Queue()
@router.get("/stream")
async def stream():
async def event_generator():
while True:
try:
event = await event_queue.get()
yield f"data: {json.dumps(event)}\n\n"
except asyncio.CancelledError:
break
return StreamingResponse(event_generator(), media_type="text/event-stream")
and in my angular frontend, when the user generate an a image, I'm subscribing to stream. the way the image generation work - i'm waiting for a webhook with an image to get to my /webhook, and when its there I want to update my frontend user images (that's why i need sse).
@router.post("/webhook")
async def generate_status(request: Request, db: db_dependency):
# unrelated login here
await event_queue.put({"status": "uploaded"})
return {"message": "Webhook processed and image uploaded"}
so as you can see here I'm adding a event when its there. the problem is that its unstable, sometimes it works sometimes it doesnt, and I think that if multiple users use the same queue, it will bug out entirely..
What is a good way to implement SSE from fastapi?
I'm trying to get answers how to trigger events from a function, in fastapi - i didnt see anything that works about it online
Share Improve this question asked Feb 6 at 13:22 רועי כחלוןרועי כחלון 11 Answer
Reset to default 0Yes, as you're suspecting, the issue is that multiple users are using the same queue, so each message will only be sent to one client. What you probably want is to have one queue per client, and implement something similar to how the docs handle multiple websocket connections.
As a quick example I implemented a connection manager class that creates a separate queue for each connection, and then sends every message to all connected clients. I also added a signal handler, so that all connections are terminated when the server restarts (you'll need to handle automatic reconnects in the frontend).
import asyncio
import json
import signal
from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: list[asyncio.Queue] = []
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, self.close)
async def connect(self):
queue = asyncio.Queue()
self.active_connections.append(queue)
while True:
try:
event = await queue.get()
yield f"data: {json.dumps(event)}\n\n"
except (asyncio.CancelledError, asyncio.QueueShutDown):
print("Cancelled")
self.active_connections.remove(queue)
break
async def broadcast(self, message: dict[str, Any]):
await asyncio.gather(*(queue.put(message) for queue in self.active_connections))
def close(self):
for queue in self.active_connections:
queue.shutdown()
manager = ConnectionManager()
@app.get("/stream")
async def stream():
return StreamingResponse(manager.connect(), media_type="text/event-stream")
@app.post("/webhook")
async def generate_status():
await manager.broadcast({"status": "uploaded"})
return {"message": "Webhook processed and image uploaded"}
Note that this will work well as long as you're only running one FastAPI process. If you want to use multiple workers, or deploy the application on several machines (using something like Kubernetes), you need to handle this queue differently. One way would be to use Redis pub/sub, something along the lines of this tutorial.