I'm facing this situation:
I have a FastAPI application running on, for example, 10.11.12.13:8000
, and I "register" multiple "workers" with that address. Clients can send requests (each request is unique) to a worker through the registered address, but I registered all the workers with the same address, so all requests are sent to 10.11.12.13:8000
.
But here's the part I'm trying to handle: A client can send the same unique request to multiple workers at the same time (through an asyncio.gather). How can my application handle the request only once, and return the response to the client as if each worker performs the task independently?
Example code:
app.py
from fastapi import FastAPI
import uvicorn
app = FastAPI()
work_count = 0
@app.get("/")
def handle_workload():
global work_count
work_count += 1
return {"message": f"Hello world! {work_count}"}
def start_server():
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="debug"
)
if __name__ == "__main__":
start_server()
client.py
import asyncio
import aiohttp
import random
async def send_request(worker_url: str):
async with aiohttp.ClientSession() as session:
async with session.get(worker_url) as response:
if response.status == 200:
return await response.json()
#I registered all the worker with the same address:
worker_servers = [
("worker_1", "http://localhost:8000"),
("worker_2", "http://localhost:8000"),
("worker_3", "http://localhost:8000"),
("worker_4", "http://localhost:8000")
]
async def main():
worker_urls = [worker[1] for worker in random.sample(worker_servers, 3)]
responses = await asyncio.gather(*[send_request(url) for url in worker_urls])
print(responses)
if __name__ == "__main__":
asyncio.run(main())
Can you give me some advice about how to implement this?
I thought of batching the requests coming within a short time period, and using cache, but each request is unique so I don't think using cache is a good idea.
I'm facing this situation:
I have a FastAPI application running on, for example, 10.11.12.13:8000
, and I "register" multiple "workers" with that address. Clients can send requests (each request is unique) to a worker through the registered address, but I registered all the workers with the same address, so all requests are sent to 10.11.12.13:8000
.
But here's the part I'm trying to handle: A client can send the same unique request to multiple workers at the same time (through an asyncio.gather). How can my application handle the request only once, and return the response to the client as if each worker performs the task independently?
Example code:
app.py
from fastapi import FastAPI
import uvicorn
app = FastAPI()
work_count = 0
@app.get("/")
def handle_workload():
global work_count
work_count += 1
return {"message": f"Hello world! {work_count}"}
def start_server():
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="debug"
)
if __name__ == "__main__":
start_server()
client.py
import asyncio
import aiohttp
import random
async def send_request(worker_url: str):
async with aiohttp.ClientSession() as session:
async with session.get(worker_url) as response:
if response.status == 200:
return await response.json()
#I registered all the worker with the same address:
worker_servers = [
("worker_1", "http://localhost:8000"),
("worker_2", "http://localhost:8000"),
("worker_3", "http://localhost:8000"),
("worker_4", "http://localhost:8000")
]
async def main():
worker_urls = [worker[1] for worker in random.sample(worker_servers, 3)]
responses = await asyncio.gather(*[send_request(url) for url in worker_urls])
print(responses)
if __name__ == "__main__":
asyncio.run(main())
Can you give me some advice about how to implement this?
I thought of batching the requests coming within a short time period, and using cache, but each request is unique so I don't think using cache is a good idea.
Share Improve this question asked 2 days ago nntoan209nntoan209 92 bronze badges New contributor nntoan209 is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct. 3 |1 Answer
Reset to default 0You should implement this not in FastAPI side, but on some another layer. For example, redis-cache system, and another "main" workers, that will handle the job.
Concept can be something like this:
client send request to FastAPI with unique params. FastAPI goes to redis, check the key. If there is no - create the task and wait in loop (with some time-out).
Another worker should query the redis, checking for the new tasks. When the new task is found, worker should modify the task state to "in_work", then complete the task, modify the task to state `completed`. Then, your FastAPI worker will get the state change of task, and returns the result to the client.
If there will be another concurrent request with same unique params - FastAPI worker will check the redis, get the "in_work" state - and falls to wain-in-loop. When main worker will handle the task - both of FastAPI workers will send the result to their clients.
WHY DO I SUGGEST CELERY: in this way, FastAPI workers can be separated - in different processes, different pods in kubernetes. There is a way to store information about tasks in RAM in global skope, but then you have to have all workers in this process.
By the way, there is plenty of libraries for handle tasks: Celery (i prefer not to use it), task-iq (modern one), etc...
async def
endpoints and neverawait
a coroutine inside or useasync for
inside etc. Or, usedef
endpoints and keep the max number of threads in the threadpool to1
. These concepts are explained in detail in the linked answers above. If you need to keep concurrency, you may think of some other way, such as using the client's IP address and the request payload, query params etc., in order to decide if it is a duplicate request by the same client – Chris Commented yesterday