最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - Running async tasks in parallel - Stack Overflow

programmeradmin0浏览0评论

In RxJS, when you want to run http requests in sequence- you chain them. But I'm not clear on how can I run requests in parallel? I saw in the examples on / that they use Observable.zip() to run 2 requests in parallel. But how would you run 5 requests in parallel? More specifically, how can I setup so that my function is called:

  • when all 5 plete?
  • when first plete?

In RxJS, when you want to run http requests in sequence- you chain them. But I'm not clear on how can I run requests in parallel? I saw in the examples on http://reactive-extensions.github.io/learnrx/ that they use Observable.zip() to run 2 requests in parallel. But how would you run 5 requests in parallel? More specifically, how can I setup so that my function is called:

  • when all 5 plete?
  • when first plete?
Share Improve this question asked Oct 19, 2014 at 8:25 Pavle LekicPavle Lekic 1,1221 gold badge15 silver badges28 bronze badges
Add a ment  | 

4 Answers 4

Reset to default 8

Use bineLatest or forkJoin!

// Assume you have an array of urls
const urls = [
  "twitter./puppies.json", 
  "google./puppies.json", 
  "facebook./puppies.json"
];

// Let's map these urls to Ajax Observables
const requests = urls.map(url => Rx.DOM.Ajax.getJSON(url))

// Now bine the result from each request into an observable
// Here's bineLatest:
const allThePuppies$ = Rx.Observable.bineLatest(...urls)
// Alternatively, here's forkJoin:
const allThePuppies$ = Rx.Observable.forkJoin(urls)


// When you subscribe to `allThePuppies$`, you'll kick off all your requests in parallel, and your response will contain an array with the results from each request:
allThePuppies$.subscribe(results => {
  const twitterPuppies, googlePuppies, facebookPuppies = results;
  // Do what you must with the respective responses
  // (Presumably in this example you'd show your users some adorable pics of puppies)
})

bineLatest takes in an arbitrary number of observables, and once each of them have emitted at least one value, it will emit an array of the latest value from each observable when any of those observables fires.

That's terribly abstract, though. For our purposes, we know that a handful of ajax requests will realistically only emit once. So, if we use bineLatest for a handful of ajax observables, we'd end up with an observable that emits an array of results from each of the ajax requests.

forkJoin is similar to bineLatest, but it only emits its array of responses once each of its constituent observables has pleted.

This is a quite old question but without an accepted answer. The answer you are looking for might be surprisingly simple: concatMap.

When a promise is created, it starts execution immediately, so they are executing in parallel; while when values are emitted from one observable, they are in serial.

So bine these two, for the following code snippet, observables from promise are executed in parallel, and the result of them are emitted in serial because concatMap puts them into one stream in the order they are created.

Rx.Observable.from(urls_array)
.concatMap(function(url) { return Rx.Observable.fromPromise(Promise.resolve($.get(url))) })
.subscribe(
  function(jsonObj) {
    // first result will arrive first
  },
  function(err) { },
  function() {
    // all pleted
  }
)

.zip() can help you with that!

const a$ = Observable.interval(200).take(6) 
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
  .zip(b$,a$)
  .subscribe(v=>console.log(v))


// marble 
-0-1-2-3-4-5|    (a$)
--0--1--2--3--4| (b$)
---0---1---2|    (c$)
  zip(a$, b$)
---[0,0,0]---[1,1,1]---[2,2,2]|

// console.log
[0,0,0]
pause(400ms)
[1,1,1]
pause(400ms)
[2,2,3]

.zip(arg1, arg2, (itself, arg1, arg2)=> doSomething() )

const a$ = Observable.interval(200).take(6)
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
  .zip(b$,a$, (c,b,a)=>a+b+c)
  .subscribe(v=>console.log(v))

// console.log()
0
pause(400ms)
3 = (1+1+1)
pause(400ms)
9 = (3+3+3)

Or

merge() + flatMap()

import Rx, { Observable } from 'rxjs'
import axios from 'axios'

const promiseA = axios.get('https://jsonplaceholder.typicode./users/1')
    , promiseB = axios.get('https://jsonplaceholder.typicode./users/2')
    , promiseC = axios.get('https://jsonplaceholder.typicode./users/3')

Observable.interval(0).take(1)
  .flatMap(()=>Observable.merge(promiseA, promiseB, promiseC)) 
   // flatMap will resolve the promise for you!
  .map(res=>res.data.username)
  .reduce((arr,item)=>arr.concat(item),[])
  .subscribe(v=>console.log(v)) // [ 'Samantha', 'Antonette', 'Bret' ]

You can have a look at https://www.npmjs/package/async

It's a node module which can be used in browser too.

发布评论

评论列表(0)

  1. 暂无评论