I need to make many calls to a web API that has very strict rate limiting. In order to get my results as quickly as possible, I want to use multiple client threads while limiting the overall call rate to the maximum rate advertised by the API. I can do that like this:
def rate_limited(max_per_second):
'''Decorator to limit the call rate of a function to `max_per_second` calls per second'''
lock = threading.Lock()
call_interval = 1.0 / float(max_per_second)
last_call = [0.0]
def decorate(func):
@functools.wraps(func)
def wrapper(*args,**kwargs):
with lock:
elapsed = time.time() - last_call[0]
wait = call_interval - elapsed
if 0 < wait:
time.sleep(wait)
last_call[0] = time.time()
ret = func(*args,**kwargs)
return ret
return wrapper
return decorate
@rate_limited(2.0)
def web_api(...)
...
The problem is that even when I set my request rate significantly below the maximum advertised by the API, I still get intermittent throttling errors. So I need to implement exponential backoff as well. And I can't figure out how to make a thread-safe exponential back-off implementation with rate limiting that shares state between threads.
Without thread-safety, I could do something like this (untested):
def rate_limited_with_backoff(max_per_second):
'''Decorator to limit the call rate of a function to `max_per_second` calls per second with exponential backoff on TooManyRequestsException'''
min_interval = 1.0 / float(max_per_second)
call_interval = min_interval
successive_failures = 0
last_call = [0.0]
def decorate(func):
@functools.wraps(func)
def wrapper(func, *args, **kwargs):
while successive_failures < max:
sleep_time = call_interval - (time.time() - last_call[0])
if 0 < sleep_time:
sleep(sleep_time)
last_call[0] = time.time()
try:
ret = func(*args, **kwargs)
if successive_failures:
successive_failures -= 1
if call_interval > min_interval:
call_interval = max(min_interval, call_interval * 0.9)
return ret
except TooManyRequestsException:
call_interval *= 2
successive_failures += 1
raise SomeException
return wrapper
return decorate
@rate_limited_with_backoff(2.0)
def web_api(...)
...
The problem comes when I try to make it multi-threaded. Everything inside wrapper()
needs to happen while holding a lock except the call to func
right in the middle (since I need to make those calls in parallel). I end up with something like this (also untested):
def rate_limited_with_backoff_multithreaded(max_per_second):
'''Decorator to limit the call rate of a function to `max_per_second` calls per second with exponential backoff on TooManyRequestsException and thread safety'''
min_interval = 1.0 / float(max_per_second)
call_interval = min_interval
successive_failures = 0
last_call = [0.0]
lock = threading.lock()
def decorate(func):
@functools.wraps(func)
def wrapper(func, *args, **kwargs):
lock.acquire()
try:
while successive_failures < max:
sleep_time = call_interval - (time.time() - last_call[0])
if 0 < sleep_time:
sleep(sleep_time)
last_call[0] = time.time()
try:
lock.release()
ret = func(*args, **kwargs)
lock.acquire()
if successive_failures:
successive_failures -= 1
if call_interval > min_interval:
call_interval = max(min_interval, call_interval * 0.9)
return ret
except TooManyRequestsException:
lock.acquire()
call_interval *= 2
successive_failures += 1
finally:
lock.release()
raise SomeException
return wrapper
return decorate
@rate_limited_with_backoff_multithreaded(2.0)
def web_api(...)
...
Obviously, I can't do anything about any other requests that are already in flight, but I want the very next request that makes hits the sleep
logic to use an updated sleep interval. This means that if thread A gets TooManyRequestsException
, then it must re-acquire the lock and update call_interval
before any other threads acquire the lock at the top of wrapper()
. But I have no way to ensure that; an arbitrary number of other threads can enter acquire the lock first and make requests using the old request rate before thread A can update it, earning me a temporary ban from the API for overuse. How can I solve this problem?
I need to make many calls to a web API that has very strict rate limiting. In order to get my results as quickly as possible, I want to use multiple client threads while limiting the overall call rate to the maximum rate advertised by the API. I can do that like this:
def rate_limited(max_per_second):
'''Decorator to limit the call rate of a function to `max_per_second` calls per second'''
lock = threading.Lock()
call_interval = 1.0 / float(max_per_second)
last_call = [0.0]
def decorate(func):
@functools.wraps(func)
def wrapper(*args,**kwargs):
with lock:
elapsed = time.time() - last_call[0]
wait = call_interval - elapsed
if 0 < wait:
time.sleep(wait)
last_call[0] = time.time()
ret = func(*args,**kwargs)
return ret
return wrapper
return decorate
@rate_limited(2.0)
def web_api(...)
...
The problem is that even when I set my request rate significantly below the maximum advertised by the API, I still get intermittent throttling errors. So I need to implement exponential backoff as well. And I can't figure out how to make a thread-safe exponential back-off implementation with rate limiting that shares state between threads.
Without thread-safety, I could do something like this (untested):
def rate_limited_with_backoff(max_per_second):
'''Decorator to limit the call rate of a function to `max_per_second` calls per second with exponential backoff on TooManyRequestsException'''
min_interval = 1.0 / float(max_per_second)
call_interval = min_interval
successive_failures = 0
last_call = [0.0]
def decorate(func):
@functools.wraps(func)
def wrapper(func, *args, **kwargs):
while successive_failures < max:
sleep_time = call_interval - (time.time() - last_call[0])
if 0 < sleep_time:
sleep(sleep_time)
last_call[0] = time.time()
try:
ret = func(*args, **kwargs)
if successive_failures:
successive_failures -= 1
if call_interval > min_interval:
call_interval = max(min_interval, call_interval * 0.9)
return ret
except TooManyRequestsException:
call_interval *= 2
successive_failures += 1
raise SomeException
return wrapper
return decorate
@rate_limited_with_backoff(2.0)
def web_api(...)
...
The problem comes when I try to make it multi-threaded. Everything inside wrapper()
needs to happen while holding a lock except the call to func
right in the middle (since I need to make those calls in parallel). I end up with something like this (also untested):
def rate_limited_with_backoff_multithreaded(max_per_second):
'''Decorator to limit the call rate of a function to `max_per_second` calls per second with exponential backoff on TooManyRequestsException and thread safety'''
min_interval = 1.0 / float(max_per_second)
call_interval = min_interval
successive_failures = 0
last_call = [0.0]
lock = threading.lock()
def decorate(func):
@functools.wraps(func)
def wrapper(func, *args, **kwargs):
lock.acquire()
try:
while successive_failures < max:
sleep_time = call_interval - (time.time() - last_call[0])
if 0 < sleep_time:
sleep(sleep_time)
last_call[0] = time.time()
try:
lock.release()
ret = func(*args, **kwargs)
lock.acquire()
if successive_failures:
successive_failures -= 1
if call_interval > min_interval:
call_interval = max(min_interval, call_interval * 0.9)
return ret
except TooManyRequestsException:
lock.acquire()
call_interval *= 2
successive_failures += 1
finally:
lock.release()
raise SomeException
return wrapper
return decorate
@rate_limited_with_backoff_multithreaded(2.0)
def web_api(...)
...
Obviously, I can't do anything about any other requests that are already in flight, but I want the very next request that makes hits the sleep
logic to use an updated sleep interval. This means that if thread A gets TooManyRequestsException
, then it must re-acquire the lock and update call_interval
before any other threads acquire the lock at the top of wrapper()
. But I have no way to ensure that; an arbitrary number of other threads can enter acquire the lock first and make requests using the old request rate before thread A can update it, earning me a temporary ban from the API for overuse. How can I solve this problem?
1 Answer
Reset to default 0My approach would be to create a separate class, RateLimiter
, that has the throttling logic. Then instead of your decorator taking a max_per_second argument, instead you instantiate a RateLimiter
instance with that argument and use that instance to initialize your decorator. This is the general idea (you might want to check the sleep time calculations and adjust as necessary):
import time
import functools
class TooManyRequestsException(Exception):
pass
class SommeException(Exception):
pass
class RateLimiter:
"""A throttler with exponential backoff."""
def __init__(self, max_per_second: int | float) -> None:
self._min_interval = 1.0 / max_per_second
self._call_interval = self._min_interval
self._lock = Lock()
self._last_call = 0.0
self._successive_errors = 0
def throttle(self):
with self._lock:
now = time.monotonic()
time_left = self._call_interval - (now - self._last_call)
if time_left > 0:
time.sleep(time_left)
now = time.monotonic()
self._last_call = now
def set_successive_errors(delta: int, max_errors: int=3) -> None:
"""Add delta to self._successive_errors and set the next call interval."""
with self._lock:
if delta == -1:
# Don't let this go below 0:
self._successive_errors = max(0, self._successive_errors - 1)
if self._call_interval > self._min_interval:
self._call_interval = max(self._min_interval, self._call_interval * 0.9)
elif delta == 1:
self._successive_errors += 1
if self._successive_errors == max_errors:
raise SomeException
self._call_interval *= 2
else:
raise RuntimeError('Invalid delta argument')
def rate_limited_with_backoff_multithreaded(rate_limiter: RateLimiter):
'''Decorator to limit the call rate of a function using a RateLimiter with exponential backoff.'''
self._rate_limiter = rate_limiter
def decorate(func):
@functools.wraps(func)
def wrapper(func, *args, **kwargs):
self._rate_limiter.throttle()
try:
ret = func(*args, **kwargs)
except TooManyRequestsException:
self._rate_limiter.set_successive_errors(1)
else:
self._rate_limiter.set_successive_errors(-1)
return ret
return wrapper
return decorate
rate_limiter = RateLimiter(2.0)
@rate_limited_with_backoff_multithreaded(rate_limiter)
def web_api():
...
Notes
Naturally, you do not want to be holding a lock when your decorator calls the decorated function. Otherwise, there will be no actual multithreading occurring. But now if the function experiences a TooManyRequests
exception, it has to first re-acquire the lock in method RateLimiter.set_successive_errors
before the call interval for the next request can be adjusted. But that "next request" can possibly acquire the lock in method RateLimiter.throttle
first. Thus, the new request may not be using the longer call interval.