最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Multi-threaded requests with rate-limiting and exponential backoff - Stack Overflow

programmeradmin1浏览0评论

I need to make many calls to a web API that has very strict rate limiting. In order to get my results as quickly as possible, I want to use multiple client threads while limiting the overall call rate to the maximum rate advertised by the API. I can do that like this:

def rate_limited(max_per_second):
    '''Decorator to limit the call rate of a function to `max_per_second` calls per second'''
    lock = threading.Lock()
    call_interval = 1.0 / float(max_per_second)
    last_call = [0.0]

    def decorate(func):

        @functools.wraps(func)
        def wrapper(*args,**kwargs):
            with lock:
                elapsed = time.time() - last_call[0]
                wait = call_interval - elapsed

                if 0 < wait:
                    time.sleep(wait)

                last_call[0] = time.time()

            ret = func(*args,**kwargs)
            return ret

        return wrapper

    return decorate

@rate_limited(2.0)
def web_api(...)
    ...

The problem is that even when I set my request rate significantly below the maximum advertised by the API, I still get intermittent throttling errors. So I need to implement exponential backoff as well. And I can't figure out how to make a thread-safe exponential back-off implementation with rate limiting that shares state between threads.

Without thread-safety, I could do something like this (untested):

def rate_limited_with_backoff(max_per_second):
    '''Decorator to limit the call rate of a function to `max_per_second` calls per second with exponential backoff on TooManyRequestsException'''
    min_interval = 1.0 / float(max_per_second)
    call_interval = min_interval
    successive_failures = 0
    last_call = [0.0]

    def decorate(func):

        @functools.wraps(func)
        def wrapper(func, *args, **kwargs):
            while successive_failures < max:
                sleep_time = call_interval - (time.time() - last_call[0])
                if 0 < sleep_time:
                    sleep(sleep_time)
                last_call[0] = time.time()
                try:
                    ret = func(*args, **kwargs)
                    if successive_failures:
                        successive_failures -= 1
                    if call_interval > min_interval:
                        call_interval = max(min_interval, call_interval * 0.9)
                    return ret
                except TooManyRequestsException:
                    call_interval *= 2
                    successive_failures += 1
            raise SomeException
        return wrapper

    return decorate

@rate_limited_with_backoff(2.0)
def web_api(...)
    ...

The problem comes when I try to make it multi-threaded. Everything inside wrapper() needs to happen while holding a lock except the call to func right in the middle (since I need to make those calls in parallel). I end up with something like this (also untested):

def rate_limited_with_backoff_multithreaded(max_per_second):
    '''Decorator to limit the call rate of a function to `max_per_second` calls per second with exponential backoff on TooManyRequestsException and thread safety'''
    min_interval = 1.0 / float(max_per_second)
    call_interval = min_interval
    successive_failures = 0
    last_call = [0.0]
    lock = threading.lock()

    def decorate(func):

        @functools.wraps(func)
        def wrapper(func, *args, **kwargs):
            lock.acquire()
            try:
                while successive_failures < max:
                    sleep_time = call_interval - (time.time() - last_call[0])
                    if 0 < sleep_time:
                        sleep(sleep_time)
                    last_call[0] = time.time()
                    try:
                        lock.release()
                        ret = func(*args, **kwargs)
                        lock.acquire()
                        if successive_failures:
                            successive_failures -= 1
                        if call_interval > min_interval:
                            call_interval = max(min_interval, call_interval * 0.9)
                        return ret
                    except TooManyRequestsException:
                        lock.acquire()
                        call_interval *= 2
                        successive_failures += 1
            finally:
                lock.release()
            raise SomeException
        return wrapper

    return decorate

@rate_limited_with_backoff_multithreaded(2.0)
def web_api(...)
    ...

Obviously, I can't do anything about any other requests that are already in flight, but I want the very next request that makes hits the sleep logic to use an updated sleep interval. This means that if thread A gets TooManyRequestsException, then it must re-acquire the lock and update call_interval before any other threads acquire the lock at the top of wrapper(). But I have no way to ensure that; an arbitrary number of other threads can enter acquire the lock first and make requests using the old request rate before thread A can update it, earning me a temporary ban from the API for overuse. How can I solve this problem?

I need to make many calls to a web API that has very strict rate limiting. In order to get my results as quickly as possible, I want to use multiple client threads while limiting the overall call rate to the maximum rate advertised by the API. I can do that like this:

def rate_limited(max_per_second):
    '''Decorator to limit the call rate of a function to `max_per_second` calls per second'''
    lock = threading.Lock()
    call_interval = 1.0 / float(max_per_second)
    last_call = [0.0]

    def decorate(func):

        @functools.wraps(func)
        def wrapper(*args,**kwargs):
            with lock:
                elapsed = time.time() - last_call[0]
                wait = call_interval - elapsed

                if 0 < wait:
                    time.sleep(wait)

                last_call[0] = time.time()

            ret = func(*args,**kwargs)
            return ret

        return wrapper

    return decorate

@rate_limited(2.0)
def web_api(...)
    ...

The problem is that even when I set my request rate significantly below the maximum advertised by the API, I still get intermittent throttling errors. So I need to implement exponential backoff as well. And I can't figure out how to make a thread-safe exponential back-off implementation with rate limiting that shares state between threads.

Without thread-safety, I could do something like this (untested):

def rate_limited_with_backoff(max_per_second):
    '''Decorator to limit the call rate of a function to `max_per_second` calls per second with exponential backoff on TooManyRequestsException'''
    min_interval = 1.0 / float(max_per_second)
    call_interval = min_interval
    successive_failures = 0
    last_call = [0.0]

    def decorate(func):

        @functools.wraps(func)
        def wrapper(func, *args, **kwargs):
            while successive_failures < max:
                sleep_time = call_interval - (time.time() - last_call[0])
                if 0 < sleep_time:
                    sleep(sleep_time)
                last_call[0] = time.time()
                try:
                    ret = func(*args, **kwargs)
                    if successive_failures:
                        successive_failures -= 1
                    if call_interval > min_interval:
                        call_interval = max(min_interval, call_interval * 0.9)
                    return ret
                except TooManyRequestsException:
                    call_interval *= 2
                    successive_failures += 1
            raise SomeException
        return wrapper

    return decorate

@rate_limited_with_backoff(2.0)
def web_api(...)
    ...

The problem comes when I try to make it multi-threaded. Everything inside wrapper() needs to happen while holding a lock except the call to func right in the middle (since I need to make those calls in parallel). I end up with something like this (also untested):

def rate_limited_with_backoff_multithreaded(max_per_second):
    '''Decorator to limit the call rate of a function to `max_per_second` calls per second with exponential backoff on TooManyRequestsException and thread safety'''
    min_interval = 1.0 / float(max_per_second)
    call_interval = min_interval
    successive_failures = 0
    last_call = [0.0]
    lock = threading.lock()

    def decorate(func):

        @functools.wraps(func)
        def wrapper(func, *args, **kwargs):
            lock.acquire()
            try:
                while successive_failures < max:
                    sleep_time = call_interval - (time.time() - last_call[0])
                    if 0 < sleep_time:
                        sleep(sleep_time)
                    last_call[0] = time.time()
                    try:
                        lock.release()
                        ret = func(*args, **kwargs)
                        lock.acquire()
                        if successive_failures:
                            successive_failures -= 1
                        if call_interval > min_interval:
                            call_interval = max(min_interval, call_interval * 0.9)
                        return ret
                    except TooManyRequestsException:
                        lock.acquire()
                        call_interval *= 2
                        successive_failures += 1
            finally:
                lock.release()
            raise SomeException
        return wrapper

    return decorate

@rate_limited_with_backoff_multithreaded(2.0)
def web_api(...)
    ...

Obviously, I can't do anything about any other requests that are already in flight, but I want the very next request that makes hits the sleep logic to use an updated sleep interval. This means that if thread A gets TooManyRequestsException, then it must re-acquire the lock and update call_interval before any other threads acquire the lock at the top of wrapper(). But I have no way to ensure that; an arbitrary number of other threads can enter acquire the lock first and make requests using the old request rate before thread A can update it, earning me a temporary ban from the API for overuse. How can I solve this problem?

Share Improve this question edited Mar 18 at 22:53 user3553031 asked Mar 18 at 16:57 user3553031user3553031 6,2341 gold badge21 silver badges42 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

My approach would be to create a separate class, RateLimiter, that has the throttling logic. Then instead of your decorator taking a max_per_second argument, instead you instantiate a RateLimiter instance with that argument and use that instance to initialize your decorator. This is the general idea (you might want to check the sleep time calculations and adjust as necessary):

import time
import functools

class TooManyRequestsException(Exception):
    pass

class SommeException(Exception):
    pass

class RateLimiter:
    """A throttler with exponential backoff."""
    
    def __init__(self, max_per_second: int | float) -> None:
        self._min_interval = 1.0 / max_per_second
        self._call_interval = self._min_interval
        self._lock = Lock()
        self._last_call = 0.0
        self._successive_errors = 0

    def throttle(self):
        with self._lock:
            now = time.monotonic()
            time_left = self._call_interval - (now - self._last_call)
            if time_left > 0:
                time.sleep(time_left)
                now = time.monotonic()
            self._last_call = now
            
    def set_successive_errors(delta: int, max_errors: int=3) -> None:
        """Add delta to self._successive_errors and set the next call interval."""
        
        with self._lock:
            if delta == -1:
                # Don't let this go below 0:
                self._successive_errors = max(0, self._successive_errors - 1)
                if self._call_interval > self._min_interval:
                    self._call_interval = max(self._min_interval, self._call_interval * 0.9)
            elif delta == 1:                
                self._successive_errors += 1
                if self._successive_errors == max_errors:
                    raise SomeException
                self._call_interval *= 2
            else:
                raise RuntimeError('Invalid delta argument')

def rate_limited_with_backoff_multithreaded(rate_limiter: RateLimiter):
    '''Decorator to limit the call rate of a function using a RateLimiter with exponential backoff.'''
    self._rate_limiter = rate_limiter

    def decorate(func):

        @functools.wraps(func)
        def wrapper(func, *args, **kwargs):
            self._rate_limiter.throttle()
            try:
                ret = func(*args, **kwargs)
            except TooManyRequestsException:
                self._rate_limiter.set_successive_errors(1)
            else:
                self._rate_limiter.set_successive_errors(-1)
                return ret
                
        return wrapper

    return decorate

rate_limiter = RateLimiter(2.0)

@rate_limited_with_backoff_multithreaded(rate_limiter)
def web_api():
    ...

Notes

Naturally, you do not want to be holding a lock when your decorator calls the decorated function. Otherwise, there will be no actual multithreading occurring. But now if the function experiences a TooManyRequests exception, it has to first re-acquire the lock in method RateLimiter.set_successive_errors before the call interval for the next request can be adjusted. But that "next request" can possibly acquire the lock in method RateLimiter.throttle first. Thus, the new request may not be using the longer call interval.

发布评论

评论列表(0)

  1. 暂无评论