Analogous to an array, flatten([1, 2 [3, 4], [5, 6]]) === [1, 2, 3, 4, 5, 6]
.
I would like to do this in rxjs observables:
const test$ = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).mergeAll()
test$.subscribe(x => console.log(x)) //I want to output 1, 2, 3, 4, 5, 6, 7
mergeAll doesn't work and throws an error.
Here is very dirty solution:
const inElegant$ = Rx.Observable.merge(
test$.filter(x => x instanceof Rx.Observable).mergeAll(),
test$.filter(x => !(x instanceof Rx.Observable))
)
inElegant$.subscribe(x => console.log(x));
Are there any better solutions to this?
Jsbin ,console
Analogous to an array, flatten([1, 2 [3, 4], [5, 6]]) === [1, 2, 3, 4, 5, 6]
.
I would like to do this in rxjs observables:
const test$ = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).mergeAll()
test$.subscribe(x => console.log(x)) //I want to output 1, 2, 3, 4, 5, 6, 7
mergeAll doesn't work and throws an error.
Here is very dirty solution:
const inElegant$ = Rx.Observable.merge(
test$.filter(x => x instanceof Rx.Observable).mergeAll(),
test$.filter(x => !(x instanceof Rx.Observable))
)
inElegant$.subscribe(x => console.log(x));
Are there any better solutions to this?
Jsbin http://jsbin./vohizoqiza/1/edit?js,console
Share Improve this question edited May 29, 2016 at 7:13 ckwong asked May 29, 2016 at 6:55 ckwongckwong 1411 silver badge8 bronze badges3 Answers
Reset to default 4If we have a stream on the form
const stream = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]), 8])
there are a few ways to convert it to a stream of pure numbers (that does not include filtering as in your solution.)
These are three possible solutions:
// prints 1, 2, 3, 4, 5, 6, 7
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.concatAll()
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7
stream
.selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.mergeAll()
.subscribe(x => console.log(x));
This all looks good. There are a few things to consider however. If we alter the source stream to make it asynchronous:
const asyncStream = Rx.Observable.interval(1000)
.select((val, idx) => idx + 8).take(5);
const stream = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]),
asyncStream, 13, 14, 15])
We get the following results using the same solutions as earlier:
// prints 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.concatAll()
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
.selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.mergeAll()
.subscribe(x => console.log(x));
So to sum up. using selectMany
or select
followed by mergeAll
solves the problem of producing a flattened list of the correct type, but the order is not maintained. These solutions will listen to all streams and produce a result when any of the streams produce a value.
The concatAll
solution behaves a little different. This solution will listen to each stream in order, switching to the next value/stream only when the last stream has pleted.
So these are some solutions, which one you want depends on your needs. All of them however gets rid of the need to filter the streams.
I personally use a toObservable
conversion function for those cases. That function leaves observable unchanged and wraps other types in an observable (with Rx.Observable.return
). So it is used like this :
const test$ = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).map(toObservable).mergeAll()
That is close to what you are doing but I found it convenient to be packaged in a separate function that can be reused in other contexts.
As per documentation mergeAll
Merges an observable sequence of observable sequences into an observable sequence.
You have a mixed collection (numbers and observables) so doesn't work.
The only way to do that is how you are doing, handling both the types with different methods: though you should ask yourself how es that you have both Observable and primitive type in the same sequence.