I have a function that returns promise.
I want to subscribe to that promise using rx.js in such way that calling of this function will push notifications to all subscribers.
Here is what I have:
var subject = new Rx.Subject();
var subscription = subject.subscribe(
function (x) { console.log('onNext: ' + x); },
function (e) { console.log('onError: ' + e.message); },
function () { console.log('onCompleted'); }
);
//here I want to push first notification
subject.fromPromise(functionThatReturnsPromise());
//some code
//here I want to push second notification
subject.fromPromise(functionThatReturnsPromise());
As a result subscriber receives only one notification.
How to solve that?
I have a function that returns promise.
I want to subscribe to that promise using rx.js in such way that calling of this function will push notifications to all subscribers.
Here is what I have:
var subject = new Rx.Subject();
var subscription = subject.subscribe(
function (x) { console.log('onNext: ' + x); },
function (e) { console.log('onError: ' + e.message); },
function () { console.log('onCompleted'); }
);
//here I want to push first notification
subject.fromPromise(functionThatReturnsPromise());
//some code
//here I want to push second notification
subject.fromPromise(functionThatReturnsPromise());
As a result subscriber receives only one notification.
How to solve that?
Share Improve this question edited May 22, 2019 at 7:38 Liam 29.8k28 gold badges139 silver badges203 bronze badges asked Jul 31, 2014 at 14:37 QvatraQvatra 3,8577 gold badges52 silver badges81 bronze badges1 Answer
Reset to default 7var subject = new Rx.Subject();
var subscription = subject.mergeAll().subscribe(
function (x) { console.log('onNext: ' + x); },
function (e) { console.log('onError: ' + e.message); },
function () { console.log('onCompleted'); }
);
subject.onNext(Rx.Observable.fromPromise(functionThatReturnsPromise()));
//some code
subject.onNext(Rx.Observable.fromPromise(functionThatReturnsPromise()));
The subject here is now a metastream (an observable of observables). And before subscribing to it, we "flatten" it by calling mergeAll()
. On the onNext()
we are feeding Observables to the subject, that's what makes it a metastream.