最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - Exponential backoff implementation with rxjs - Stack Overflow

programmeradmin5浏览0评论

Angular 7 docs provide this example of practical usage of rxjs Observables in implementing an exponential backoff for an AJAX request:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

While I understand the concept of both Observables and backoff, I can’t quite figure out, how exactly retryWhen will calculate time intervals for resubscribing to the source ajax.

Specifically, how do zip, map, and mapMerge work in this setup?

And what’s going to be contained in the attempts object when it’s emitted into retryWhen?

I went through their reference pages, but still can’t wrap my head around this.

Angular 7 docs provide this example of practical usage of rxjs Observables in implementing an exponential backoff for an AJAX request:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

While I understand the concept of both Observables and backoff, I can’t quite figure out, how exactly retryWhen will calculate time intervals for resubscribing to the source ajax.

Specifically, how do zip, map, and mapMerge work in this setup?

And what’s going to be contained in the attempts object when it’s emitted into retryWhen?

I went through their reference pages, but still can’t wrap my head around this.

Share Improve this question edited Nov 20, 2018 at 14:45 Gonçalo Peres 13.6k5 gold badges69 silver badges93 bronze badges asked Oct 26, 2018 at 19:26 CBlewCBlew 7219 silver badges21 bronze badges 7
  • 4 Rx is just somewhat confusing to learn. retryWhen retries on a criteria - attempts is the error stream. zip grabs the next index from the range. map multiplies` it by itself (that's the exponential part since x * x is just x ** 2), mapMerge waits for the timer to finish before the next attempt continues. Piping it to the ajax will tell it to retry when the ajax errors and then back off exponentially (when the error stream emits an error - the zip continues, the map runs and the mergeMap waits for the timer before continuing). – Benjamin Gruenbaum Commented Oct 26, 2018 at 19:33
  • And of course, you're welcome to just use fetch and implement it without Rx which would be ~5 LoC of regular JavaScript and a for loop with a try/catch :D – Benjamin Gruenbaum Commented Oct 26, 2018 at 19:34
  • @BenjaminGruenbaum one thing I don't get is how zip is working here ... why doesn't it create a new Observable from zip each time it resubscribes? – Explosion Pills Commented Oct 26, 2018 at 20:42
  • 1 @ExplosionPills The retryWhen operator only subscribes once to the observable that's composed within it. It feeds any errors received from the source to the observable composed within it - the attempts parameter to the arrow function is an observable of errors. Essentially, the range observable 'seeds' the zip with the number of retries that should be attempted. – cartant Commented Oct 26, 2018 at 23:10
  • @BenjaminGruenbaum @cartant So, zip operator grabs the range observable (1, 2, 3) and merges it with the input attempts? But why do this? I removed zip from the pipe and the function seems to have run the same way. So what was the point of zipping? Can’t we just ignore attempts? – CBlew Commented Oct 27, 2018 at 10:44
 |  Show 2 more comments

2 Answers 2

Reset to default 22

I have spent quite some time researching this (for learning purposes) and will try to explain the workings of this code as thoroughly as possible.

First, here’s the original code, annotated:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {                  // (1)
 return pipe(                                     // (2)
   retryWhen(attempts => range(1, maxTries)       // (3)
     .pipe(
       zip(attempts, (i) => i),                   // (4)
       map(i => i * i),                           // (5)
       mergeMap(i =>  timer(i * ms))              // (6)
     )
   )
 );                                               // (7)
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}
  1. Easy enough, we’re creating custom backoff operator out of retryWhen operator. We’ll be able to apply this later within pipe function.
  2. In this context, pipe method returns a custom operator.
  3. Our custom operator is going to be a modified retryWhen operator. It takes a function argument. This function is going to be called once — specifically, when this retryWhen is first encountered/invoked. By the way, retryWhen gets into play only when the source observable produces an error. It then prevents error from propagating further and resubscribes to the source. If the source produces a non-error result (whether on first subscription or on a retry), retryWhen is passed over and is not involved.

    A few words on attempts. It’s an observable. It is not the source observable. It is created specifically for retryWhen. It has one use and one use only: whenever subscription (or re-subscription) to the source observable results in an error, attempts fires a next. We are given attempts and are free to use it in order to react in some way to each failed subscription attempt to the source observable.

    So that’s what we are going to do.

    First we create range(1, maxTries), an observable that has an integer for every retry we are willing to perform. range is ready to fire all it’s numbers right then and there, but we have to hold its horses: we only need a new number when another retry happens. So, that’s why we...

  4. ... zip it with the attempts. Meaning, marry each emitted value of attempts with a single value of range.

    Remember, function we’re currently in is going to be called only once, and at that time, attempts will have only fired next once — for the initial failed subscription. So, at this point, our two zipped observables have produced just one value.

    Btw, what are the values of the two observables zipped into one? This function decides that: (i) => i. For clarity it can be written (itemFromRange, itemFromAttempts) => itemFromRange. Second argument is not used, so it’s dropped, and first is renamed into i.

    What happens here, is we simply disregard the values fired by attempts, we are only interested in the fact that they are fired. And whenever that happens we pull the next value from range observable...

  5. ...and square it. This is for the exponential part of the exponential backoff.

    So, now whenever (re-)subscription to source fails, we have an ever increasing integer on our hands (1, 4, 9, 16...). How do we transform that integer into a time delay until next re-subscription?

    Remember, this function we are currently inside of, it must return an observable, using attempts as input. This resulting observable is only built once. retryWhen then subscribes to that resulting observable and: retries subscribing to source observable whenever resulting observable fires next; calls complete or error on source observable whenever resulting observable fires those corresponding events.

  6. Long story short, we need to make retryWhen wait a bit. delay operator could maybe be used, but setting up exponential growth of the delay would likely be pain. Instead, mergeMap operator comes into play.

    mergeMap is a shortcut for two operators combined: map and mergeAll. map simply converts every increasing integer (1, 4, 9, 16...) into a timer observable which fires next after passed number of milliseconds. mergeAll forces retryWhen to actually subscribe to timer. If that last bit didn’t happen, our resulting observable would just fire next immediately with timer observable instance as value.

  7. At this point, we’ve built our custom observable which will be used by retryWhen to decide when exactly to attempt to re-subscribe to source observable.

As it stands I see two problems with this implementation:

  • As soon as our resulting observable fires its last next (causing the last attempt to resubscribe), it also immediately fires complete. Unless the source observable returns result very quickly (assuming that the very last retry will be the one that succeeds), that result is going to be ignored.

    This is because as soon as retryWhen hears complete from our observable, it calls complete on source, which may still be in the process of making AJAX request.

  • If all retries were unsuccessful, source actually calls complete instead of more logical error.

To solve both these issues, I think that our resulting observable should fire error at the very end, after giving the last retry some reasonable time to attempt to do its job.

Here’s my implementation of said fix, which also takes into account deprecation of zip operator in latest rxjs v6:

import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";

function backoffImproved(maxTries, ms) {
    return pipe(
        retryWhen(attempts => {
            const observableForRetries =
                zip(range(1, maxTries), attempts)
                    .pipe(
                        map(([elemFromRange, elemFromAttempts]) => elemFromRange),
                        map(i => i * i),
                        switchMap(i => timer(i * ms))
                    );
            const observableForFailure =
                throwError(new Error('Could not complete AJAX request'))
                    .pipe(
                        materialize(),
                        delay(1000),
                        dematerialize()
                    );
            return concat(observableForRetries, observableForFailure);
        })
    );
}

I tested this code and it seems to work properly in all cases. I can’t be bothered to explain it in detail right now; I doubt anyone will even read the wall of text above.

Anyway, big thanks to @BenjaminGruenbaum and @cartant for setting me onto right path for wrapping my head around all this.

Here is a different version that can be easily extended/modified:

import { Observable, pipe, throwError, timer } from 'rxjs';
import { mergeMap, retryWhen } from 'rxjs/operators';

export function backoff(maxRetries = 5): (_: Observable<any>) => Observable<any> {
  return pipe(
    retryWhen(errors => errors.pipe(
      mergeMap((error, i) => {
        const retryAttempt = i + 1;
        if (retryAttempt > maxRetries) {
          return throwError(error);
        } else {
          const waitms = retryAttempt * retryAttempt * 1000;
          console.log(`Attempt ${retryAttempt}: retrying in ${waitms}ms`);
          return timer(waitms);
        }
      }),
    ))
  );
};

Ref retryWhen

发布评论

评论列表(0)

  1. 暂无评论