I have a strange use case where I need to keep track of all previous emitted events.
Thanks to the ReplaySubject, it works perfectly so far. On every new subscriber, this Subject re-emits every previous events.
Now, for a specific scenario, I need to be able to give only the latest published events (a bit like a BehaviorSubject), but keeping the source the same events.
Here is a snippet of what I'm trying to achieve: stackblitz
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.mySubject = new ReplaySubject();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, plete) {
return this.mySubject.subscribe(next, error, plete);
}
subscribe(next, error, plete) {
return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, plete);
}
}
const myEventManager = new EventManager();
myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");
myEventManager.fullSubscribe(v => {
console.log("SUB 1", v);
});
myEventManager.subscribe(v => {
console.log("SUB 2", v);
});
Thank you
I have a strange use case where I need to keep track of all previous emitted events.
Thanks to the ReplaySubject, it works perfectly so far. On every new subscriber, this Subject re-emits every previous events.
Now, for a specific scenario, I need to be able to give only the latest published events (a bit like a BehaviorSubject), but keeping the source the same events.
Here is a snippet of what I'm trying to achieve: stackblitz
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.mySubject = new ReplaySubject();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, plete) {
return this.mySubject.subscribe(next, error, plete);
}
subscribe(next, error, plete) {
return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, plete);
}
}
const myEventManager = new EventManager();
myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");
myEventManager.fullSubscribe(v => {
console.log("SUB 1", v);
});
myEventManager.subscribe(v => {
console.log("SUB 2", v);
});
Thank you
Share Improve this question edited Apr 30, 2021 at 14:54 Patrick Roberts 52.1k10 gold badges117 silver badges163 bronze badges asked Apr 30, 2021 at 14:36 anousssanousss 3576 silver badges14 bronze badges 4-
1
The RxJS#
last()
operator does this – Mrk Sef Commented Apr 30, 2021 at 15:40 -
@MrkSef not exactly.
last()
won't work the way they're requesting, it just outputs the last published event before the observable pletes. OP is requesting that when theReplaySubject
is subscribed to, the "hot" subject immediately emits the last event that was published (likeBehaviorSubject
) rather than all the events that have ever been published to it like default. – Patrick Roberts Commented Apr 30, 2021 at 15:45 -
1
@PatrickRoberts Good point! Okay, use the RxJS#
debounceTime(0)
. I think0
should work here as the replay happens synchronously. If not, he can up the debounce time a little bit. – Mrk Sef Commented Apr 30, 2021 at 17:27 - I thought about debounceTime(0), the problem with that is that if subsequent emissions happen on the same tick, only the last one will be received. – BizzyBob Commented Apr 30, 2021 at 17:28
3 Answers
Reset to default 3If you keep track of the number of events you've published, you could use skip
:
subscribe(next, error?, plete?) {
return this.mySubject.pipe(
skip(this.publishCount - 1)
).subscribe(next, error, plete);
}
Here's a StackBlitz demo.
Instead of forcing a ReplaySubject
to behave like a BehaviorSubject
, you can arrive at ReplaySubject
-like behavior by manipulating a BehaviorSubject
.
import { BehaviorSubject, from, concat } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';
class EventManager {
constructor() {
this.mySubject = new BehaviorSubject();
this.allEmittedValues = this.mySubject.pipe(
scan((xs, x) => [...xs, x], []),
shareReplay(1)
);
// Necessary since we need to start accumulating allEmittedValues
// immediately.
this.allEmittedValues.subscribe();
}
dispose() {
// ends all subscriptions
this.mySubject.plete();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, plete) {
// First, take the latest value of the accumulated array of emits and
// unroll it into an observable
const existingEmits$ = this.allEmittedValues.pipe(
take(1),
concatMap((emits) => from(emits))
);
// Then, subscribe to the main subject, skipping the replayed value since
// we just got it at the tail end of existingEmits$
const futureEmits$ = this.mySubject.pipe(skip(1));
return concat(existingEmits$, futureEmits$).subscribe(
next,
error,
plete
);
}
subscribe(next, error, plete) {
return this.mySubject.subscribe(next, error, plete);
}
}
Why not just have an instance of ReplaySubject
and BehaviorSubject
on EventManager
?
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.replaySubject = new ReplaySubject();
this.behaviorSubject = new BehaviorSubject();
}
publish(value) {
this.replaySubject.next(value);
this.behaviorSubject.next(value);
}
fullSubscribe(next, error, plete) {
return this.replaySubject.subscribe(next, error, plete);
}
subscribe(next, error, plete) {
return this.behaviorSubject.subscribe(next, error, plete);
}
}