最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - In Rxjs, how do I flatten or merge a stream containing both normal types and Observables? - Stack Overflow

programmeradmin2浏览0评论

Analogous to an array, flatten([1, 2 [3, 4], [5, 6]]) === [1, 2, 3, 4, 5, 6].

I would like to do this in rxjs observables:

const test$ = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).mergeAll()

test$.subscribe(x => console.log(x)) //I want to output 1, 2, 3, 4, 5, 6, 7

mergeAll doesn't work and throws an error.

Here is very dirty solution:

const inElegant$ = Rx.Observable.merge(
  test$.filter(x => x instanceof Rx.Observable).mergeAll(),
  test$.filter(x => !(x instanceof Rx.Observable))
)

inElegant$.subscribe(x => console.log(x));

Are there any better solutions to this?

Jsbin ,console

Analogous to an array, flatten([1, 2 [3, 4], [5, 6]]) === [1, 2, 3, 4, 5, 6].

I would like to do this in rxjs observables:

const test$ = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).mergeAll()

test$.subscribe(x => console.log(x)) //I want to output 1, 2, 3, 4, 5, 6, 7

mergeAll doesn't work and throws an error.

Here is very dirty solution:

const inElegant$ = Rx.Observable.merge(
  test$.filter(x => x instanceof Rx.Observable).mergeAll(),
  test$.filter(x => !(x instanceof Rx.Observable))
)

inElegant$.subscribe(x => console.log(x));

Are there any better solutions to this?

Jsbin http://jsbin./vohizoqiza/1/edit?js,console

Share Improve this question edited May 29, 2016 at 7:13 ckwong asked May 29, 2016 at 6:55 ckwongckwong 1411 silver badge8 bronze badges
Add a ment  | 

3 Answers 3

Reset to default 4

If we have a stream on the form

const stream = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]), 8])

there are a few ways to convert it to a stream of pure numbers (that does not include filtering as in your solution.)

These are three possible solutions:

// prints 1, 2, 3, 4, 5, 6, 7
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .concatAll()
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7
stream
    .selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .mergeAll()
    .subscribe(x => console.log(x));

This all looks good. There are a few things to consider however. If we alter the source stream to make it asynchronous:

const asyncStream = Rx.Observable.interval(1000)
    .select((val, idx) => idx + 8).take(5);

const stream = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]),
    asyncStream, 13, 14, 15])

We get the following results using the same solutions as earlier:

// prints 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .concatAll()
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
    .selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .mergeAll()
    .subscribe(x => console.log(x));

So to sum up. using selectMany or select followed by mergeAll solves the problem of producing a flattened list of the correct type, but the order is not maintained. These solutions will listen to all streams and produce a result when any of the streams produce a value.

The concatAll solution behaves a little different. This solution will listen to each stream in order, switching to the next value/stream only when the last stream has pleted.

So these are some solutions, which one you want depends on your needs. All of them however gets rid of the need to filter the streams.

I personally use a toObservable conversion function for those cases. That function leaves observable unchanged and wraps other types in an observable (with Rx.Observable.return). So it is used like this :

const test$ = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).map(toObservable).mergeAll()

That is close to what you are doing but I found it convenient to be packaged in a separate function that can be reused in other contexts.

As per documentation mergeAll

Merges an observable sequence of observable sequences into an observable sequence.

You have a mixed collection (numbers and observables) so doesn't work.

The only way to do that is how you are doing, handling both the types with different methods: though you should ask yourself how es that you have both Observable and primitive type in the same sequence.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论