I'm having a problem that I'm sure is entirely based on my (lack of) a plete understanding of Observables.
This is an Angular2 (v4.0.3) project using rx/js and Observables.
I have a state store that holds events in a state service:
// Observable string sources
private currentEventStore = new BehaviorSubject<string>("");
// Observable string streams
public currentEvent$ = this.currentEventStore.asObservable();
// Service message mands
setCurrentEvent(nextEvent: string) {
this.currentEventStore.next(nextEvent);
}
I subscribe to currentEvent$ in my ponents to listen for events, and then act on them.
this.stateSvc.currentEvent$
.subscribe(
currentEvent => {
this.currentEvent = currentEvent;
if (currentEvent != '') {
this.handleCurrentEvent(currentEvent);
}
});
The problem I'm having is that the store is accumulating all events and returning all of them each time, so that when I spawn a 'create' event, my subscribed function picks up the event and creates a record - perfect on the first pass, but if I spawn a second 'create' event, my subscribed function picks up 2 events, and creates 2 records, and on the next 'create' event it creates 3 records, and so on.
What I need to happen is for the event to be flushed out of the stream so that the currentEvent$ store only holds events that haven't been addressed.
Is there a way to flush the store? Or is there something I'm missing here in my expectations and implementation?
I'm having a problem that I'm sure is entirely based on my (lack of) a plete understanding of Observables.
This is an Angular2 (v4.0.3) project using rx/js and Observables.
I have a state store that holds events in a state service:
// Observable string sources
private currentEventStore = new BehaviorSubject<string>("");
// Observable string streams
public currentEvent$ = this.currentEventStore.asObservable();
// Service message mands
setCurrentEvent(nextEvent: string) {
this.currentEventStore.next(nextEvent);
}
I subscribe to currentEvent$ in my ponents to listen for events, and then act on them.
this.stateSvc.currentEvent$
.subscribe(
currentEvent => {
this.currentEvent = currentEvent;
if (currentEvent != '') {
this.handleCurrentEvent(currentEvent);
}
});
The problem I'm having is that the store is accumulating all events and returning all of them each time, so that when I spawn a 'create' event, my subscribed function picks up the event and creates a record - perfect on the first pass, but if I spawn a second 'create' event, my subscribed function picks up 2 events, and creates 2 records, and on the next 'create' event it creates 3 records, and so on.
What I need to happen is for the event to be flushed out of the stream so that the currentEvent$ store only holds events that haven't been addressed.
Is there a way to flush the store? Or is there something I'm missing here in my expectations and implementation?
Share Improve this question asked May 16, 2017 at 18:52 Stephen R. SmithStephen R. Smith 3,4101 gold badge28 silver badges46 bronze badges 1-
2
The only thing
BehaviorSubject
does is that it has a default value that is sent on subscription to each observer (that's whatReplaySubject
does.). It doesn't store values going through so the problem is somewhere else and not in usingBehaviorSubject
. – martin Commented May 16, 2017 at 19:22
1 Answer
Reset to default 4As is often the case, after days of banging my head against a problem, within hours of posting a question I work out the answer.
The issue here is that every time I revisit the ponent that registers the subscriber, it adds a new subscriber to the observable, so that on the second visit, there are two subscribers, and each fires, resulting in the event being handled twice. Come back a third time, and there are now three subscribers, so the event gets handled three times.
What I need to do is modify my subscriptions so that they are assigned to a variable, and then when destroying the ponent, I unsubscribe.
So the subscribe should look like this (note the addition of 'this.observeEvent =':
this.observeEvent = this.stateSvc.currentEvent$
.subscribe(
currentEvent => {
this.currentEvent = currentEvent;
if (currentEvent != '') {
this.handleCurrentEvent(currentEvent);
}
});
And I add this to the ponent:
ngOnDestroy() {
this.observeEvent.unsubscribe();
}
Which unregistered the subscriber so they don't accumulate each time this ponent loads.
I had tried to figure out how to acplish this earlier, but without assigning the Observable to an instance variable, I couldn't find any valid syntax to unsubscribe in the ngOnDestroy method.