最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - Rate-limiting and count-limiting events in RxJS v5, but also allowing pass-through - Stack Overflow

programmeradmin1浏览0评论

I have a bunch of events to send up to a service. But the requests are rate limited and each request has a count limit:

  • 1 request per second: bufferTime(1000)
  • 100 event items per request: bufferCount(100)

The problem is, I am not sure how to bine them in a way that makes sense.

Allowing pass-through

Complicating this further, I need to make sure that events go through instantaneously if we don't hit either limit.

For example, I don't want it to actually wait for 100 event items before letting it go through if it's only one single event during a non-busy time.

Legacy API

I also found that there was a bufferWithTimeOrCount that existed in RxJS v4, although I am not sure how I'd use that even if I had it.

Test playground

Here is a JSBin I made for you to test your solution:

,console,output

Any help would be greatly appreciated.

I have a bunch of events to send up to a service. But the requests are rate limited and each request has a count limit:

  • 1 request per second: bufferTime(1000)
  • 100 event items per request: bufferCount(100)

The problem is, I am not sure how to bine them in a way that makes sense.

Allowing pass-through

Complicating this further, I need to make sure that events go through instantaneously if we don't hit either limit.

For example, I don't want it to actually wait for 100 event items before letting it go through if it's only one single event during a non-busy time.

Legacy API

I also found that there was a bufferWithTimeOrCount that existed in RxJS v4, although I am not sure how I'd use that even if I had it.

Test playground

Here is a JSBin I made for you to test your solution:

http://jsbin./fozexehiba/1/edit?js,console,output

Any help would be greatly appreciated.

Share Improve this question asked Mar 30, 2017 at 23:29 adrianmcliadrianmcli 2,0063 gold badges24 silver badges49 bronze badges
Add a ment  | 

3 Answers 3

Reset to default 5

The bufferTime() operator takes three parameters which bines the functionality of bufferTime and bufferCount. See http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-bufferTime.

With .bufferTime(1000, null, 3) you can make a buffer every 1000ms or when it reaches 3 items. However, this means that it doesn't guarantee 1000ms delay between each buffer.

So you could use something like this which is pretty easy to use (buffers only 3 items for max 1000ms):

click$
  .scan((a, b) => a + 1, 0)
  .bufferTime(1000, null, 3)
  .filter(buffer => buffer.length > 0)
  .concatMap(buffer => Rx.Observable.of(buffer).delay(1000))
  .timestamp()
  .subscribe(console.log);

See live demo: http://jsbin./libazer/7/edit?js,console,output

The only difference to what you probably wanted is that the first emission might be delayed by more than 1000ms. This is because both bufferTime() and delay(1000) operators make a delay to ensure that there's always at least 1000ms gap.

I hope this works for you.

Operator

events$
  .windowCount(10)
  .mergeMap(m => m.bufferTime(100))
  .concatMap(val => Rx.Observable.of(val).delay(100))
  .filter(f => f.length > 0)

Doc

  • .windowCount(number) : [ Rx Doc ]
  • .bufferTime(number) : [ Rx Doc ]

Demo

// test case
const mock = [8, 0, 2, 3, 30, 5, 6, 2, 2, 0, 0, 0, 1]

const tInterval = 100
const tCount = 10

Rx.Observable.interval(tInterval)
  .take(mock.length)
  .mergeMap(mm => Rx.Observable.range(0, mock[mm]))
  
  // start
  .windowCount(tCount)
  .mergeMap(m => m.bufferTime(tInterval))
  .concatMap(val => Rx.Observable.of(val).delay(tInterval))
  .filter(f => f.length > 0)
  // end

  .subscribe({
    next: (n) => console.log('Next: ', n),
    error: (e) => console.log('Error: ', e),
    plete: (c) => console.log('Completed'),
  })
<script src="https://unpkg./rxjs/bundles/Rx.min.js"></script>


Updated

After more testing. I found the answer above has some problem in extreme condition. I think they are caused by .window() and .concat(), and then I find a warning in the doc#concatMap.

Warning: if source values arrive endlessly and faster than their corresponding inner Observables can plete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.

However, I thought the right way to limit the request rate possibly is, that we could limit the cycle time of requests. In your case, just limit there is only 1 request per 10 milliseconds. It is simpler and may be more efficient to control the requests.

Operator

const tInterval = 100
const tCount = 10
const tCircle = tInterval / tCount

const rxTimer = Rx.Observable.timer(tCircle).ignoreElements()

events$
  .concatMap(m => Rx.Observable.of(m).merge(rxTimer)) // more accurate than `.delay()`
  // .concatMap(m => Rx.Observable.of(m).delay(tCircle))

or

events$
  .zip(Rx.Observable.interval(tCircle), (x,y) => x)

I've modified the answer I gave to this question to support your use case of adding a limited number of values (i.e. events) to pending requests.

The ments within should explain how it works.

Because you need to keep a record of the requests that have been made within the rate limit period, I don't believe that it's possible to use the bufferTime and bufferCount operators to do what you want - a scan is required so that you can maintain that state within the observable.

function rateLimit(source, period, valuesPerRequest, requestsPerPeriod = 1) {

  return source
    .scan((requests, value) => {

      const now = Date.now();
      const since = now - period;

      // Keep a record of all requests made within the last period. If the
      // number of requests made is below the limit, the value can be
      // included in an immediate request. Otherwise, it will need to be
      // included in a delayed request.

      requests = requests.filter((request) => request.until > since);
      if (requests.length >= requestsPerPeriod) {

        const leastRecentRequest = requests[0];
        const mostRecentRequest = requests[requests.length - 1];

        // If there is a request that has not yet been made, append the
        // value to that request if the number of values in that request's
        // is below the limit. Otherwise, another delayed request will be
        // required.

        if (
          (mostRecentRequest.until > now) &&
          (mostRecentRequest.values.length < valuesPerRequest)
        ) {

          mostRecentRequest.values.push(value);

        } else {

          // until is the time until which the value should be delayed.

          const until = leastRecentRequest.until + (
            period * Math.floor(requests.length / requestsPerPeriod)
          );

          // concatMap is used below to guarantee the values are emitted
          // in the same order in which they are received, so the delays
          // are cumulative. That means the actual delay is the difference
          // between the until times.

          requests.push({
            delay: (mostRecentRequest.until < now) ?
              (until - now) :
              (until - mostRecentRequest.until),
            until,
            values: [value]
          });
        }

      } else {

        requests.push({
          delay: 0,
          until: now,
          values: [value]
        });
      }
      return requests;

    }, [])

    // Emit only the most recent request.

    .map((requests) => requests[requests.length - 1])

    // If multiple values are added to the request, it will be emitted
    // mulitple times. Use distinctUntilChanged so that concatMap receives
    // the request only once.

    .distinctUntilChanged()
    .concatMap((request) => {

      const observable = Rx.Observable.of(request.values);
      return request.delay ? observable.delay(request.delay) : observable;
    });
}

const start = Date.now();
rateLimit(
  Rx.Observable.range(1, 250),
  1000,
  100,
  1
).subscribe((values) => console.log(
  `Request with ${values.length} value(s) at T+${Date.now() - start}`
));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg./rxjs@5/bundles/Rx.min.js"></script>

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论