I don't know if this is possible! I want some generic code. This should act similar to a ReplaySubject but, on subscribing, should emit each most recent message with a unique name. In the code below, the actual output is ...
I want this to be logged once ... received the latest bbb 5
And I understand why I am getting this output.
The desired output is (order not important) ...
I want this to be logged once ... received the latest aaa 3
I want this to be logged once ... received the latest bbb 5
But I don't know how to achieve this. I know that I could create a second ReplaySubject, one to deal with aaa
messages and one to deal with bbb
messages. However in my actual code, I have 100+ message names and I don't want to create 100+ ReplaySubject's (one for each). Is there a more generic/scalable solution to this problem?
// import { ReplaySubject } from 'rxjs'
// import { filter } from 'rxjs/operators'
replaySource = new rxjs.ReplaySubject(1)
replaySource.next({name: 'aaa', payload: 1})
replaySource.next({name: 'bbb', payload: 2})
replaySource.next({name: 'aaa', payload: 3})
replaySource.next({name: 'bbb', payload: 4})
replaySource.next({name: 'bbb', payload: 5})
replaySource.pipe(
rxjs.filter(x => x.name === 'aaa')
).subscribe((x) => {
console.log('I want this to be logged once ... received the latest aaa ' + x.payload)
})
replaySource.pipe(
rxjs.filter(x => x.name === 'bbb')
).subscribe((x) => {
console.log('I want this to be logged once ... received the latest bbb ' + x.payload)
})
<script src=".8.0/rxjs.umd.min.js"></script>
I don't know if this is possible! I want some generic code. This should act similar to a ReplaySubject but, on subscribing, should emit each most recent message with a unique name. In the code below, the actual output is ...
I want this to be logged once ... received the latest bbb 5
And I understand why I am getting this output.
The desired output is (order not important) ...
I want this to be logged once ... received the latest aaa 3
I want this to be logged once ... received the latest bbb 5
But I don't know how to achieve this. I know that I could create a second ReplaySubject, one to deal with aaa
messages and one to deal with bbb
messages. However in my actual code, I have 100+ message names and I don't want to create 100+ ReplaySubject's (one for each). Is there a more generic/scalable solution to this problem?
// import { ReplaySubject } from 'rxjs'
// import { filter } from 'rxjs/operators'
replaySource = new rxjs.ReplaySubject(1)
replaySource.next({name: 'aaa', payload: 1})
replaySource.next({name: 'bbb', payload: 2})
replaySource.next({name: 'aaa', payload: 3})
replaySource.next({name: 'bbb', payload: 4})
replaySource.next({name: 'bbb', payload: 5})
replaySource.pipe(
rxjs.filter(x => x.name === 'aaa')
).subscribe((x) => {
console.log('I want this to be logged once ... received the latest aaa ' + x.payload)
})
replaySource.pipe(
rxjs.filter(x => x.name === 'bbb')
).subscribe((x) => {
console.log('I want this to be logged once ... received the latest bbb ' + x.payload)
})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
Share
Improve this question
edited Mar 23, 2023 at 17:01
halfer
20.3k19 gold badges109 silver badges202 bronze badges
asked Mar 21, 2023 at 12:44
danday74danday74
57.3k55 gold badges269 silver badges333 bronze badges
4
- 1 rxjs.dev/api/index/function/distinct – possum Commented Mar 21, 2023 at 12:55
- thanks for the input but not sure this will meet my needs as my objects have a name and a payload and the differing payloads will prevent it from working ((I didn't show the payload in my post to keep things simple) - thanks tho! – danday74 Commented Mar 21, 2023 at 13:46
- I read your post the first time and I don't think you read that documentation. You can pass a function to determine the key that determines if it's been seen before or not. – possum Commented Mar 22, 2023 at 10:01
- @possum thanks for your input much appreciated, TBH you are correct - however, I've looked at the docs now and it seems to have the same problem as seen in the Sleepwalker answer - i.e. Using distinct will emit the first values, not the most recent – danday74 Commented Mar 22, 2023 at 10:32
10 Answers
Reset to default 2Leave the default buffer for your subject (infinite buffer), and pipe distinct (use the arg to specify that only the name property must be distinct).
// import { ReplaySubject } from 'rxjs'
// import { filter } from 'rxjs/operators'
replaySource = new rxjs.ReplaySubject().pipe(rxjs.distinct(x => x.name))
replaySource.next({name: 'aaa', payload: 1});
replaySource.next({name: 'bbb', payload: 2});
replaySource.next({name: 'aaa', payload: 3});
replaySource.next({name: 'bbb', payload: 4});
replaySource.next({name: 'bbb', payload: 5});
replaySource.subscribe(x => {
console.log(`I want this to be logged once ... received the latest ${x.name} ${x.payload}`)
})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
As far as I'm aware, what you are attempting to do isn't possible using a ReplaySubject
. ReplaySubject
doesn’t close when its next
function is called, and the observable has no way of knowing which will be the final value until the observable is plete, so operators such as last
, takeLast
, etc., won’t work. This is because the stream doesn’t end, so the operators don’t know when to stop operating.
When you subscribe to a ReplaySubject
, you get each value in sequence, not at once, so any operator you use can only operate on one value at a time. This is also true for callbacks passed to the subscribe function; you won’t be able to do it within the subscribe block either.
The only way to acplish what you’re looking to do using a single observable is to create a stream of values in an observable that pletes.
const myObservable = new Observable((subscriber) => {
subscriber.next({ name: 'aaa', payload: 1 });
subscriber.next({ name: 'bbb', payload: 2 });
subscriber.next({ name: 'aaa', payload: 3 });
subscriber.next({ name: 'bbb', payload: 4 });
subscriber.next({ name: 'bbb', payload: 5 });
subscriber.plete();
});
With an observable that pletes, you can use the reduce
operator to get what you’re looking for.
myObservable
.filter(x => x.name === 'aaa')
.reduce((acc, val) => val)
.subscribe(x => {
console.log('I want this to be logged once … received the latest aaa ' + x.payload);
});
myObservable
.filter(x => x.name === 'bbb')
.reduce((acc, val) => val)
.subscribe(x => {
console.log('I want this to be logged once … received the latest bbb ' + x.payload);
});
Based on the answer given by OP, i'd like to make following suggestion which i think can leverage the advantage of replaySubject's functionality better
const setReplay = new rxjs.Subject(1) // the setter
const replayWatcher = new rxjs.ReplaySubject(1) // where the full Replay is stored
// append the stored Replay with new value
// to stop listening for new value, just unsubscribe replayListner
const replayListner = setReplay.pipe(
rxjs.scan((acc, value, index) => {
acc[value.name] = value.payload
return acc
}, {})
).subscribe(replayWatcher)
setReplay.next({name: 'aaa', payload: 1});
setReplay.next({name: 'bbb', payload: 2});
setReplay.next({name: 'aaa', payload: 3});
setReplay.next({name: 'bbb', payload: 4});
setReplay.next({name: 'bbb', payload: 5});
// u can create any kind of parsing method using pipe method of Observable
function getAllReplayInArrayFormat(){
return replayWatcher.pipe(
rxjs.map(allReplay => Object.keys(allReplay).map(
keys => `I want this to be logged once ... received the latest ${keys} ${allReplay[keys]}`
)),
rxjs.take(1),
)
}
function getFilteredReplay(name){
return replayWatcher.pipe(
rxjs.map(allReplay => allReplay[name]),
rxjs.map(filteredReplay => `I want this to be logged once ... received the latest ${name} ${filteredReplay}`),
rxjs.take(1),
)
}
replayWatcher.subscribe(console.log).unsubscribe();
getAllReplayInArrayFormat().subscribe(console.log);
getFilteredReplay("aaa").subscribe(console.log);
getFilteredReplay("bbb").subscribe(console.log);
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
You could make your own pipe operator with a BehaviorSubject ?
You can also specify if you want to listen once, or keep listening
const source = new rxjs.BehaviorSubject('');
function monitor(name, listenOnce = true) {
const operators = [rxjs.filter(v => v.name === name)];
if (listenOnce) operators.push(rxjs.first());
return rxjs.pipe(...operators);
}
function log({ name, payload }) { console.log(`Name is ${name}, value is ${payload}`); }
const aaaSub = source.pipe(monitor('aaa')).subscribe(log);
const bbbSub = source.pipe(monitor('bbb', false)).subscribe(log);
source.next({name: 'aaa', payload: 1})
source.next({name: 'bbb', payload: 2})
source.next({name: 'aaa', payload: 3})
source.next({name: 'bbb', payload: 4})
source.next({name: 'bbb', payload: 5})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
To emit each most recent message with a unique name, you can use a single ReplaySubject
and filter the messages based on their name. I have kept buffer size to 1. If you want then you can use without any buffer size but in that case it will keep all emitted messages.
Please let me know if this helps you.
import { ReplaySubject, from } from 'rxjs';
import { groupBy, mergeMap, map } from 'rxjs/operators';
interface Message {
name: string;
payload: number;
}
const messages: Message[] = [
{ name: 'aaa', payload: 1 },
{ name: 'bbb', payload: 2 },
{ name: 'ccc', payload: 1 },
{ name: 'aaa', payload: 3 },
{ name: 'bbb', payload: 4 },
{ name: 'bbb', payload: 5 },
// more messages
];
const replaySubject = new ReplaySubject<Message>(1);
from(messages)
.pipe(
groupBy((message) => message.name),
mergeMap((group$) => group$.pipe(map((message) => ({ name: group$.key, payload: message.payload })))),
)
.subscribe((message) => {
console.log(`I want this to be logged once ... received the latest ${message.name} with ${message.payload}`);
});
const subject = new rxjs.Subject();
const replay = subject.asObservable().pipe(
rxjs.scan((map, value) => map.set(value.name, value), new Map()),
rxjs.shareReplay(1),
rxjs.switchMap((map) => rxjs.from(map.values()))
);
replay.subscribe() // You need to call subscribe before any next() methods to activate the replay Observable or use
// connectable(replay).connect()
subject.next({name: 'aaa', payload: 1})
subject.next({name: 'bbb', payload: 2})
subject.next({name: 'aaa', payload: 3})
subject.next({name: 'bbb', payload: 4})
subject.next({name: 'bbb', payload: 5})
replay.subscribe((x) => {
console.log(`I want this to be logged once ... received the latest ${x.name} ${x.payload}`)
})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
You can implement your class inherited from Subject. Here is an example code in Typescript (just remove type annotations if you are using JavaScript)
import { PartialObserver, Subject, Subscription } from "rxjs";
import { filter } from "rxjs/operators";
class ReplayLatestPayloadSubject<T> extends Subject<T> {
// Array of keys used to replay the events for a new subscriber in the order they arrived.
// If order doesn't matter look on the keys in _cached dictionary instead in replayOldMessages
private _keys: string[] = [];
// A dictionary to map a key to the latest payload.
private _cached: Record<string, T> = {};
// construtor having a getKey message to get the key for the object (in the example the name property)
constructor(private getKey: (value: T) => string) {
super();
}
// Override the subscribe method (all overloads)
public subscribe(observer?: PartialObserver<T>): Subscription;
public subscribe(
next?: (value: T) => void,
error?: (error: any) => void,
plete?: () => void
): Subscription;
public subscribe(
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
plete?: () => void
): Subscription {
if (typeof observerOrNext === "function") {
// The overload with a next method is used
this.replayOldMessages(observerOrNext);
} else if (observerOrNext?.next) {
// the overload with an observer is used.
this.replayOldMessages((v) => observerOrNext.next(v));
}
// Call base class
return super.subscribe(observerOrNext as any, error, plete);
}
// replay messages when a new subscriber is called
private replayOldMessages(next: (value: T) => void) {
for (let key of this._keys) {
next(this._cached[key]);
}
}
// override the next function to store the values to be replayed later.
public next(value?: T): void {
if (value) {
const key = this.getKey(value);
if (!this._cached[key]) {
this._keys.push(key);
}
this._cached[key] = value;
}
super.next(value);
}
}
// Example
const replaySource = new ReplayLatestPayloadSubject<{
name: string;
payload: number;
}>((v) => v.name);
replaySource.next({ name: "aaa", payload: 1 });
replaySource.next({ name: "bbb", payload: 2 });
replaySource.next({ name: "aaa", payload: 3 });
replaySource.next({ name: "bbb", payload: 4 });
replaySource.next({ name: "bbb", payload: 5 });
replaySource.pipe(filter((x) => x.name === "aaa")).subscribe((x) => {
console.log(
"I want this to be logged once ... received the latest aaa " + x.payload
);
});
replaySource.pipe(filter((x) => x.name === "bbb")).subscribe((x) => {
console.log(
"I want this to be logged once ... received the latest bbb " + x.payload
);
});
Here is a solution that only use one subject and pipes. Internally the subject keeps a map of object.name => object
with scan, so it can always update the corresponding name with the latest object it gets. Then, it emit for each value of the map.
The subject needs to be connected right away so it can start running its logic before the first subscription.
If you want the notification from only a specific object, just pipe filter
on replaySource
before subscribing
replaySource = new rxjs.ReplaySubject(1).pipe(
rxjs.scan((acc, item) => {
acc[item.name] = item
return acc
}, {}),
rxjs.switchMap(obj => rxjs.from(Object.values(obj)))
)
rxjs.connectable(replaySource).connect()
replaySource.next({ name: 'aaa', payload: 1 })
replaySource.next({ name: 'bbb', payload: 2 })
replaySource.next({ name: 'aaa', payload: 3 })
replaySource.next({ name: 'bbb', payload: 4 })
replaySource.next({ name: 'bbb', payload: 5 })
replaySource.subscribe(x => {
console.log(`I want this to be logged once ... received the latest ${x.name} ${x.payload}`)
})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
Here's an answer showing how I have implemented this based on the answer by @KArslan (his answer now deleted by moderator) - not overly happy with the implementation though as it does not clean up a ReplaySubject
when it is no longer in use and the solution does not feel like a good clean RxJS approach.
const messageReplaySources = {}
const messageReplay$ = name => {
// create ReplaySubject (if it does not exist) on subscription
createReplay(name)
return messageReplaySources[name].asObservable()
}
const sendReplay = (name, payload) => {
// create ReplaySubject (if it does not exist) when message is sent
createReplay(name)
const message = { name, payload }
messageReplaySources[name].next(message)
}
const createReplay = name => {
if (messageReplaySources[name] == null) messageReplaySources[name] = new rxjs.ReplaySubject(1)
}
sendReplay('aaa', 1)
sendReplay('bbb', 2)
sendReplay('aaa', 3)
sendReplay('bbb', 4)
sendReplay('bbb', 5)
messageReplay$('aaa').subscribe(message => {
console.log(`I want this to be logged once ... received the latest ${message.name} ${message.payload}`)
})
messageReplay$('bbb').subscribe(message => {
console.log(`I want this to be logged once ... received the latest ${message.name} ${message.payload}`)
})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
This works for RxJS 6+ (I needed it working for RxJS 6). If using this in RxJS 7 you may want to tweak it to use connectable
instead of publishReplay
It uses distinctUntilChanged
to:
- avoid duplicate processing
- fix the bug spoken of by Ákos Vandra-Meyer wherein ... messages sent after subscribing result in
the contents of the map being fully dumped after each ining message
NB: lodash isEqual
is used but this can easily be swapped out for a non-lodash equality check
const AAA = 'aaa'
const BBB = 'bbb'
const RS1 = new rxjs.Subject()
const RS2 = RS1.pipe(
rxjs.scan((acc, item) => ({...acc, [item.name]: item}), {}),
rxjs.distinctUntilChanged(_.isEqual),
rxjs.publishReplay(1)
)
RS2.connect()
const replay$ = RS2.pipe(
rxjs.switchMap(obj => rxjs.from(Object.values(obj)))
).asObservable()
console.log('before sub')
RS1.next({name: AAA, payload: 1})
RS1.next({name: BBB, payload: 2})
RS1.next({name: AAA, payload: 3})
RS1.next({name: BBB, payload: 4})
RS1.next({name: BBB, payload: 5})
replay$.pipe(
rxjs.filter(x => x.name === AAA),
rxjs.distinctUntilChanged(_.isEqual)
).subscribe(x => {
console.log(`${x.name} ${x.payload}`)
})
replay$.pipe(
rxjs.filter(x => x.name === BBB),
rxjs.distinctUntilChanged(_.isEqual)
).subscribe(x => {
console.log(`${x.name} ${x.payload}`)
})
console.log('after sub')
RS1.next({name: BBB, payload: 5}) // does nothing coz of distinctUntilChanged
RS1.next({name: BBB, payload: 6})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
<script src="https://cdnjs.cloudflare./ajax/libs/lodash.js/4.17.21/lodash.min.js"></script>