In RxJS, when you want to run http requests in sequence- you chain them. But I'm not clear on how can I run requests in parallel? I saw in the examples on / that they use Observable.zip() to run 2 requests in parallel. But how would you run 5 requests in parallel? More specifically, how can I setup so that my function is called:
- when all 5 plete?
- when first plete?
In RxJS, when you want to run http requests in sequence- you chain them. But I'm not clear on how can I run requests in parallel? I saw in the examples on http://reactive-extensions.github.io/learnrx/ that they use Observable.zip() to run 2 requests in parallel. But how would you run 5 requests in parallel? More specifically, how can I setup so that my function is called:
- when all 5 plete?
- when first plete?
4 Answers
Reset to default 8Use bineLatest or forkJoin!
// Assume you have an array of urls
const urls = [
"twitter./puppies.json",
"google./puppies.json",
"facebook./puppies.json"
];
// Let's map these urls to Ajax Observables
const requests = urls.map(url => Rx.DOM.Ajax.getJSON(url))
// Now bine the result from each request into an observable
// Here's bineLatest:
const allThePuppies$ = Rx.Observable.bineLatest(...urls)
// Alternatively, here's forkJoin:
const allThePuppies$ = Rx.Observable.forkJoin(urls)
// When you subscribe to `allThePuppies$`, you'll kick off all your requests in parallel, and your response will contain an array with the results from each request:
allThePuppies$.subscribe(results => {
const twitterPuppies, googlePuppies, facebookPuppies = results;
// Do what you must with the respective responses
// (Presumably in this example you'd show your users some adorable pics of puppies)
})
bineLatest
takes in an arbitrary number of observables, and once each of them have emitted at least one value, it will emit an array of the latest value from each observable when any of those observables fires.
That's terribly abstract, though. For our purposes, we know that a handful of ajax requests will realistically only emit once. So, if we use bineLatest
for a handful of ajax observables, we'd end up with an observable that emits an array of results from each of the ajax requests.
forkJoin
is similar to bineLatest
, but it only emits its array of responses once each of its constituent observables has pleted.
This is a quite old question but without an accepted answer. The answer you are looking for might be surprisingly simple: concatMap.
When a promise is created, it starts execution immediately, so they are executing in parallel; while when values are emitted from one observable, they are in serial.
So bine these two, for the following code snippet, observables from promise are executed in parallel, and the result of them are emitted in serial because concatMap puts them into one stream in the order they are created.
Rx.Observable.from(urls_array)
.concatMap(function(url) { return Rx.Observable.fromPromise(Promise.resolve($.get(url))) })
.subscribe(
function(jsonObj) {
// first result will arrive first
},
function(err) { },
function() {
// all pleted
}
)
.zip() can help you with that!
const a$ = Observable.interval(200).take(6)
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
.zip(b$,a$)
.subscribe(v=>console.log(v))
// marble
-0-1-2-3-4-5| (a$)
--0--1--2--3--4| (b$)
---0---1---2| (c$)
zip(a$, b$)
---[0,0,0]---[1,1,1]---[2,2,2]|
// console.log
[0,0,0]
pause(400ms)
[1,1,1]
pause(400ms)
[2,2,3]
.zip(arg1, arg2, (itself, arg1, arg2)=> doSomething() )
const a$ = Observable.interval(200).take(6)
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
.zip(b$,a$, (c,b,a)=>a+b+c)
.subscribe(v=>console.log(v))
// console.log()
0
pause(400ms)
3 = (1+1+1)
pause(400ms)
9 = (3+3+3)
Or
merge() + flatMap()
import Rx, { Observable } from 'rxjs'
import axios from 'axios'
const promiseA = axios.get('https://jsonplaceholder.typicode./users/1')
, promiseB = axios.get('https://jsonplaceholder.typicode./users/2')
, promiseC = axios.get('https://jsonplaceholder.typicode./users/3')
Observable.interval(0).take(1)
.flatMap(()=>Observable.merge(promiseA, promiseB, promiseC))
// flatMap will resolve the promise for you!
.map(res=>res.data.username)
.reduce((arr,item)=>arr.concat(item),[])
.subscribe(v=>console.log(v)) // [ 'Samantha', 'Antonette', 'Bret' ]
You can have a look at https://www.npmjs/package/async
It's a node module which can be used in browser too.