最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - How to combine observables in a non-blocking way? - Stack Overflow

programmeradmin0浏览0评论

I have a collection of Observables that retrieve different data types each. I'm chaining those Observable to get all the data I want for my page. Fact is all those informations are independent, so loading one should not block or interfere in loading the others. which I am not able to achieve.

Here is an example of what I've done for the moment:

  getAll(): Observable<any[]> {
    return this.getHome().pipe(
      tap((home: Home) => this.application = home),
      mergeMap((home: Home) =>
        bineLatest([
          this.getAssets(home.images).pipe(defaultIfEmpty([])),
          this.getStyling(home.styling).pipe(defaultIfEmpty({})),
          this.getCategories(home.categories).pipe(defaultIfEmpty([])),
        ])
      )
    );
  }

Here, the Observable are successfully chained, so I get all the data I want, but bineLatest() does the next request once the previous is pleted, and parent emits collected data once everything is finished which creates delay. I've been trying to use merge() instead of bineLatest() so that each data received emits it instantly to the parent Observable, but apparently I just can't do that

So, how can I manage to chain those Observables so that each submit in child Observable does emit directly to parent Observable ?

I have a collection of Observables that retrieve different data types each. I'm chaining those Observable to get all the data I want for my page. Fact is all those informations are independent, so loading one should not block or interfere in loading the others. which I am not able to achieve.

Here is an example of what I've done for the moment:

  getAll(): Observable<any[]> {
    return this.getHome().pipe(
      tap((home: Home) => this.application = home),
      mergeMap((home: Home) =>
        bineLatest([
          this.getAssets(home.images).pipe(defaultIfEmpty([])),
          this.getStyling(home.styling).pipe(defaultIfEmpty({})),
          this.getCategories(home.categories).pipe(defaultIfEmpty([])),
        ])
      )
    );
  }

Here, the Observable are successfully chained, so I get all the data I want, but bineLatest() does the next request once the previous is pleted, and parent emits collected data once everything is finished which creates delay. I've been trying to use merge() instead of bineLatest() so that each data received emits it instantly to the parent Observable, but apparently I just can't do that

So, how can I manage to chain those Observables so that each submit in child Observable does emit directly to parent Observable ?

Share Improve this question asked Jun 12, 2019 at 11:46 Fabien SacrepeigneFabien Sacrepeigne 1568 bronze badges 4
  • 1 " how can I manage to chain those Observables so that each submit in child Observable does emit directly to parent Observable", this is the problem, how do you know what type is being emitted? If you want to emit into a stream, you can wire up a subject. – Avin Kavish Commented Jun 12, 2019 at 12:03
  • @AvinKavish I need to investigate on Subjects, do you have great reads about that ? – Fabien Sacrepeigne Commented Jun 12, 2019 at 12:21
  • Yeah, give me a min, I also have a half written answer with subjects. – Avin Kavish Commented Jun 12, 2019 at 12:22
  • Have a look, it's only a reference implementation. Subjects can be bined in lots of different ways. – Avin Kavish Commented Jun 12, 2019 at 13:09
Add a ment  | 

3 Answers 3

Reset to default 6

You need forkJoin, it returns an Array with the results of every Observable (in the same order than forkJoin parameters.

  getAll(): Observable<any[]> {
    return this.getHome().pipe(
      tap((home: Home) => this.application = home),
      mergeMap((home: Home) =>
        forkJoin(
          this.getAssets(home.images).pipe(defaultIfEmpty([])),
          this.getStyling(home.styling).pipe(defaultIfEmpty({})),
          this.getCategories(home.categories).pipe(defaultIfEmpty([]))
        )
      )
    );
  }

If they are independent, you can handle them independently and emit them on a subject.

  getAll(): Observable<any> {
    const subject = new Subject<any>();
    const handler = (res: any) => subject.next(res);

    this.getHome().pipe(
      tap((home: Home) => this.application = home),
      tap(handler),
      tap((home: Home) => {
        forkJoin(
          this.getAssets(home.images).pipe(defaultIfEmpty([]),tap(handler))
          this.getStyling(home.styling).pipe(defaultIfEmpty({}), tap(handler))                  
          this.getCategories(home.categories).pipe(defaultIfEmpty([]), tap(handler))
        ).subscribe(res => subject.plete())
      })
    ).subscribe();
    return subject.asObservable();
  }

With this method, type checking has to be done by the consumer.

Subjects are documented here.

I've been trying to use merge() instead of bineLatest() so that each data received emits it instantly to the parent Observable, but apparently I just can't do that

It sounds like you're looking for partial data as soon as it is ready. Both bineLatest() and forkJoin() will only emit data when they can populate an array of equal length to the number of observables. So you either wait for at least one value in each observable or wait for them all to plete, but they emit an array and they need to know what to put in it.

I think you want to emit each piece of data as soon as it's ready.

Start by using merge() to emit values from all 3 observables, but for each observable use map() to create an object key/value pair for that given type (images, styling, categories).

Finally, use a scan() operator to aggregate the values into a single object that will emit each property as it is updated.

Consumers of the stream can check if a property is null to see if it is ready or not. The stream will emit 3 values before it is plete.

  getAll(): Observable<{home: Home, images: any[], styling: any[], categories: any[]}> {
    return this.getHome().pipe(
      tap((home: Home) => this.application = home),
      switchMap((home: Home) =>
        merge([
          this.getAssets(home.images).pipe(
             defaultIfEmpty([]),
             map(images => ({images}))
          ),
          this.getStyling(home.styling).pipe(
             defaultIfEmpty({}),
             map(styling => ({styling}))
          ),
          this.getCategories(home.categories).pipe(
             defaultIfEmpty([]),
             map(categories => ({categories})
          ),
        ]),
        scan((acc, next) => ({...acc, ...next}), {home, images: null, styling: null, categories: null})
      )
    );
  }

You could add the startWith({}) operator to the merge() operator if you wanted to emit a first value where all properties are null. As an early value for consumers so they know reading for the other values has started.

发布评论

评论列表(0)

  1. 暂无评论