最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - In RxJS can I set up a ReplaySubject (or similar) that replays all the most recent messages with a unique name? - S

programmeradmin1浏览0评论

I don't know if this is possible! I want some generic code. This should act similar to a ReplaySubject but, on subscribing, should emit each most recent message with a unique name. In the code below, the actual output is ...

I want this to be logged once ... received the latest bbb 5

And I understand why I am getting this output.

The desired output is (order not important) ...

I want this to be logged once ... received the latest aaa 3
I want this to be logged once ... received the latest bbb 5

But I don't know how to achieve this. I know that I could create a second ReplaySubject, one to deal with aaa messages and one to deal with bbb messages. However in my actual code, I have 100+ message names and I don't want to create 100+ ReplaySubject's (one for each). Is there a more generic/scalable solution to this problem?

// import { ReplaySubject } from 'rxjs'
// import { filter } from 'rxjs/operators'

replaySource = new rxjs.ReplaySubject(1)

replaySource.next({name: 'aaa', payload: 1})
replaySource.next({name: 'bbb', payload: 2})
replaySource.next({name: 'aaa', payload: 3})
replaySource.next({name: 'bbb', payload: 4})
replaySource.next({name: 'bbb', payload: 5})

replaySource.pipe(
  rxjs.filter(x => x.name === 'aaa')
).subscribe((x) => {
  console.log('I want this to be logged once ... received the latest aaa ' + x.payload)
})

replaySource.pipe(
  rxjs.filter(x => x.name === 'bbb')
).subscribe((x) => {
  console.log('I want this to be logged once ... received the latest bbb ' + x.payload)
})
<script src=".8.0/rxjs.umd.min.js"></script>

I don't know if this is possible! I want some generic code. This should act similar to a ReplaySubject but, on subscribing, should emit each most recent message with a unique name. In the code below, the actual output is ...

I want this to be logged once ... received the latest bbb 5

And I understand why I am getting this output.

The desired output is (order not important) ...

I want this to be logged once ... received the latest aaa 3
I want this to be logged once ... received the latest bbb 5

But I don't know how to achieve this. I know that I could create a second ReplaySubject, one to deal with aaa messages and one to deal with bbb messages. However in my actual code, I have 100+ message names and I don't want to create 100+ ReplaySubject's (one for each). Is there a more generic/scalable solution to this problem?

// import { ReplaySubject } from 'rxjs'
// import { filter } from 'rxjs/operators'

replaySource = new rxjs.ReplaySubject(1)

replaySource.next({name: 'aaa', payload: 1})
replaySource.next({name: 'bbb', payload: 2})
replaySource.next({name: 'aaa', payload: 3})
replaySource.next({name: 'bbb', payload: 4})
replaySource.next({name: 'bbb', payload: 5})

replaySource.pipe(
  rxjs.filter(x => x.name === 'aaa')
).subscribe((x) => {
  console.log('I want this to be logged once ... received the latest aaa ' + x.payload)
})

replaySource.pipe(
  rxjs.filter(x => x.name === 'bbb')
).subscribe((x) => {
  console.log('I want this to be logged once ... received the latest bbb ' + x.payload)
})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>

Share Improve this question edited Mar 23, 2023 at 17:01 halfer 20.3k19 gold badges109 silver badges202 bronze badges asked Mar 21, 2023 at 12:44 danday74danday74 57.3k55 gold badges269 silver badges333 bronze badges 4
  • 1 rxjs.dev/api/index/function/distinct – possum Commented Mar 21, 2023 at 12:55
  • thanks for the input but not sure this will meet my needs as my objects have a name and a payload and the differing payloads will prevent it from working ((I didn't show the payload in my post to keep things simple) - thanks tho! – danday74 Commented Mar 21, 2023 at 13:46
  • I read your post the first time and I don't think you read that documentation. You can pass a function to determine the key that determines if it's been seen before or not. – possum Commented Mar 22, 2023 at 10:01
  • @possum thanks for your input much appreciated, TBH you are correct - however, I've looked at the docs now and it seems to have the same problem as seen in the Sleepwalker answer - i.e. Using distinct will emit the first values, not the most recent – danday74 Commented Mar 22, 2023 at 10:32
Add a ment  | 

10 Answers 10

Reset to default 2

Leave the default buffer for your subject (infinite buffer), and pipe distinct (use the arg to specify that only the name property must be distinct).

// import { ReplaySubject } from 'rxjs'
// import { filter } from 'rxjs/operators'

replaySource = new rxjs.ReplaySubject().pipe(rxjs.distinct(x => x.name))

replaySource.next({name: 'aaa', payload: 1});
replaySource.next({name: 'bbb', payload: 2});
replaySource.next({name: 'aaa', payload: 3});
replaySource.next({name: 'bbb', payload: 4});
replaySource.next({name: 'bbb', payload: 5});

replaySource.subscribe(x => {
  console.log(`I want this to be logged once ... received the latest ${x.name} ${x.payload}`)
})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>

As far as I'm aware, what you are attempting to do isn't possible using a ReplaySubject. ReplaySubject doesn’t close when its next function is called, and the observable has no way of knowing which will be the final value until the observable is plete, so operators such as last, takeLast, etc., won’t work. This is because the stream doesn’t end, so the operators don’t know when to stop operating.

When you subscribe to a ReplaySubject, you get each value in sequence, not at once, so any operator you use can only operate on one value at a time. This is also true for callbacks passed to the subscribe function; you won’t be able to do it within the subscribe block either.

The only way to acplish what you’re looking to do using a single observable is to create a stream of values in an observable that pletes.

const myObservable = new Observable((subscriber) => {
    subscriber.next({ name: 'aaa', payload: 1 });
    subscriber.next({ name: 'bbb', payload: 2 });
    subscriber.next({ name: 'aaa', payload: 3 });
    subscriber.next({ name: 'bbb', payload: 4 });
    subscriber.next({ name: 'bbb', payload: 5 });
    subscriber.plete();
});

With an observable that pletes, you can use the reduce operator to get what you’re looking for.

myObservable
    .filter(x => x.name === 'aaa')
    .reduce((acc, val) => val)
    .subscribe(x => {
        console.log('I want this to be logged once … received the latest aaa ' + x.payload);
    });

myObservable
    .filter(x => x.name === 'bbb')
    .reduce((acc, val) => val)
    .subscribe(x => {
        console.log('I want this to be logged once … received the latest bbb ' + x.payload);
    });

Based on the answer given by OP, i'd like to make following suggestion which i think can leverage the advantage of replaySubject's functionality better

const setReplay = new rxjs.Subject(1) // the setter
const replayWatcher = new rxjs.ReplaySubject(1) // where the full Replay is stored

// append the stored Replay with new value
// to stop listening for new value, just unsubscribe replayListner
const replayListner = setReplay.pipe(
  rxjs.scan((acc, value, index) => {
    acc[value.name] = value.payload
    return acc
  }, {})
).subscribe(replayWatcher)

setReplay.next({name: 'aaa', payload: 1});
setReplay.next({name: 'bbb', payload: 2});
setReplay.next({name: 'aaa', payload: 3});
setReplay.next({name: 'bbb', payload: 4});
setReplay.next({name: 'bbb', payload: 5});

// u can create any kind of parsing method using pipe method of Observable
function getAllReplayInArrayFormat(){
  return replayWatcher.pipe(
    rxjs.map(allReplay => Object.keys(allReplay).map(
      keys => `I want this to be logged once ... received the latest ${keys} ${allReplay[keys]}`
    )),
    rxjs.take(1),
  )
}
function getFilteredReplay(name){
  return replayWatcher.pipe(
    rxjs.map(allReplay => allReplay[name]),
    rxjs.map(filteredReplay => `I want this to be logged once ... received the latest ${name} ${filteredReplay}`),
    rxjs.take(1),
  )
}

replayWatcher.subscribe(console.log).unsubscribe();
getAllReplayInArrayFormat().subscribe(console.log);
getFilteredReplay("aaa").subscribe(console.log);
getFilteredReplay("bbb").subscribe(console.log);
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>

You could make your own pipe operator with a BehaviorSubject ?

You can also specify if you want to listen once, or keep listening

const source = new rxjs.BehaviorSubject('');

function monitor(name, listenOnce = true) {
   const operators = [rxjs.filter(v => v.name === name)];
   if (listenOnce) operators.push(rxjs.first());
   return rxjs.pipe(...operators);
}

function log({ name, payload }) { console.log(`Name is ${name}, value is ${payload}`); }

const aaaSub = source.pipe(monitor('aaa')).subscribe(log);
const bbbSub = source.pipe(monitor('bbb', false)).subscribe(log);

source.next({name: 'aaa', payload: 1})
source.next({name: 'bbb', payload: 2})
source.next({name: 'aaa', payload: 3})
source.next({name: 'bbb', payload: 4})
source.next({name: 'bbb', payload: 5})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>

To emit each most recent message with a unique name, you can use a single ReplaySubject and filter the messages based on their name. I have kept buffer size to 1. If you want then you can use without any buffer size but in that case it will keep all emitted messages.

Please let me know if this helps you.

import { ReplaySubject, from } from 'rxjs';
import { groupBy, mergeMap, map } from 'rxjs/operators';

interface Message {
  name: string;
  payload: number;
}

const messages: Message[] = [
  { name: 'aaa', payload: 1 },
  { name: 'bbb', payload: 2 },
  { name: 'ccc', payload: 1 },
  { name: 'aaa', payload: 3 },
  { name: 'bbb', payload: 4 },
  { name: 'bbb', payload: 5 },
  // more messages
];

const replaySubject = new ReplaySubject<Message>(1);

from(messages)
  .pipe(
    groupBy((message) => message.name),
    mergeMap((group$) => group$.pipe(map((message) => ({ name: group$.key, payload: message.payload })))),
  )
  .subscribe((message) => {
    console.log(`I want this to be logged once ... received the latest ${message.name} with ${message.payload}`);
  });

const subject = new rxjs.Subject();
const replay = subject.asObservable().pipe(
  rxjs.scan((map, value) => map.set(value.name, value), new Map()),
  rxjs.shareReplay(1),
  rxjs.switchMap((map) => rxjs.from(map.values()))
);

replay.subscribe() // You need to call subscribe before any next() methods to activate the replay Observable or use
// connectable(replay).connect()

subject.next({name: 'aaa', payload: 1})
subject.next({name: 'bbb', payload: 2})
subject.next({name: 'aaa', payload: 3})
subject.next({name: 'bbb', payload: 4})
subject.next({name: 'bbb', payload: 5})

replay.subscribe((x) => {
  console.log(`I want this to be logged once ... received the latest ${x.name} ${x.payload}`)
})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>

You can implement your class inherited from Subject. Here is an example code in Typescript (just remove type annotations if you are using JavaScript)

import { PartialObserver, Subject, Subscription } from "rxjs";
import { filter } from "rxjs/operators";

class ReplayLatestPayloadSubject<T> extends Subject<T> {
  // Array of keys used to replay the events for a new subscriber in the order they arrived.
  // If order doesn't matter look on the keys in _cached dictionary instead in replayOldMessages
  private _keys: string[] = [];
  // A dictionary to map a key to the latest payload.
  private _cached: Record<string, T> = {};

  // construtor having a getKey message to get the key for the object (in the example the name property)
  constructor(private getKey: (value: T) => string) {
    super();
  }

  // Override the subscribe method (all overloads)
  public subscribe(observer?: PartialObserver<T>): Subscription;
  public subscribe(
    next?: (value: T) => void,
    error?: (error: any) => void,
    plete?: () => void
  ): Subscription;
  public subscribe(
    observerOrNext?: PartialObserver<T> | ((value: T) => void),
    error?: (error: any) => void,
    plete?: () => void
  ): Subscription {
    if (typeof observerOrNext === "function") {
      // The overload with a next method is used
      this.replayOldMessages(observerOrNext);
    } else if (observerOrNext?.next) {
      // the overload with an observer is used.
      this.replayOldMessages((v) => observerOrNext.next(v));
    }
    // Call base class
    return super.subscribe(observerOrNext as any, error, plete);
  }

  // replay messages when a new subscriber is called
  private replayOldMessages(next: (value: T) => void) {
    for (let key of this._keys) {
      next(this._cached[key]);
    }
  }

  // override the next function to store the values to be replayed later.
  public next(value?: T): void {
    if (value) {
      const key = this.getKey(value);
      if (!this._cached[key]) {
        this._keys.push(key);
      }
      this._cached[key] = value;
    }
    super.next(value);
  }
}

// Example

const replaySource = new ReplayLatestPayloadSubject<{
  name: string;
  payload: number;
}>((v) => v.name);

replaySource.next({ name: "aaa", payload: 1 });
replaySource.next({ name: "bbb", payload: 2 });
replaySource.next({ name: "aaa", payload: 3 });
replaySource.next({ name: "bbb", payload: 4 });
replaySource.next({ name: "bbb", payload: 5 });

replaySource.pipe(filter((x) => x.name === "aaa")).subscribe((x) => {
  console.log(
    "I want this to be logged once ... received the latest aaa " + x.payload
  );
});

replaySource.pipe(filter((x) => x.name === "bbb")).subscribe((x) => {
  console.log(
    "I want this to be logged once ... received the latest bbb " + x.payload
  );
});

Here is a solution that only use one subject and pipes. Internally the subject keeps a map of object.name => object with scan, so it can always update the corresponding name with the latest object it gets. Then, it emit for each value of the map.
The subject needs to be connected right away so it can start running its logic before the first subscription.

If you want the notification from only a specific object, just pipe filter on replaySource before subscribing

replaySource = new rxjs.ReplaySubject(1).pipe(
  rxjs.scan((acc, item) => {
    acc[item.name] = item
    return acc
  }, {}),
  rxjs.switchMap(obj => rxjs.from(Object.values(obj)))
)
rxjs.connectable(replaySource).connect()

replaySource.next({ name: 'aaa', payload: 1 })
replaySource.next({ name: 'bbb', payload: 2 })
replaySource.next({ name: 'aaa', payload: 3 })
replaySource.next({ name: 'bbb', payload: 4 })
replaySource.next({ name: 'bbb', payload: 5 })

replaySource.subscribe(x => {
  console.log(`I want this to be logged once ... received the latest ${x.name} ${x.payload}`)
})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>

Here's an answer showing how I have implemented this based on the answer by @KArslan (his answer now deleted by moderator) - not overly happy with the implementation though as it does not clean up a ReplaySubject when it is no longer in use and the solution does not feel like a good clean RxJS approach.

const messageReplaySources = {}

const messageReplay$ = name => {
  // create ReplaySubject (if it does not exist) on subscription
  createReplay(name)
  return messageReplaySources[name].asObservable()
}

const sendReplay = (name, payload) => {
  // create ReplaySubject (if it does not exist) when message is sent
  createReplay(name)
  const message = { name, payload }
  messageReplaySources[name].next(message)
}

const createReplay = name => {
  if (messageReplaySources[name] == null) messageReplaySources[name] = new rxjs.ReplaySubject(1)
}

sendReplay('aaa', 1)
sendReplay('bbb', 2)
sendReplay('aaa', 3)
sendReplay('bbb', 4)
sendReplay('bbb', 5)

messageReplay$('aaa').subscribe(message => {
  console.log(`I want this to be logged once ... received the latest ${message.name} ${message.payload}`)
})

messageReplay$('bbb').subscribe(message => {
  console.log(`I want this to be logged once ... received the latest ${message.name} ${message.payload}`)
})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>

This works for RxJS 6+ (I needed it working for RxJS 6). If using this in RxJS 7 you may want to tweak it to use connectable instead of publishReplay

It uses distinctUntilChanged to:

  • avoid duplicate processing
  • fix the bug spoken of by Ákos Vandra-Meyer wherein ... messages sent after subscribing result in the contents of the map being fully dumped after each ining message

NB: lodash isEqual is used but this can easily be swapped out for a non-lodash equality check

const AAA = 'aaa'
const BBB = 'bbb'

const RS1 = new rxjs.Subject()

const RS2 = RS1.pipe(
  rxjs.scan((acc, item) => ({...acc, [item.name]: item}), {}),
  rxjs.distinctUntilChanged(_.isEqual),
  rxjs.publishReplay(1)
)
RS2.connect()

const replay$ = RS2.pipe(
  rxjs.switchMap(obj => rxjs.from(Object.values(obj)))
).asObservable()

console.log('before sub')

RS1.next({name: AAA, payload: 1})
RS1.next({name: BBB, payload: 2})
RS1.next({name: AAA, payload: 3})
RS1.next({name: BBB, payload: 4})
RS1.next({name: BBB, payload: 5})

replay$.pipe(
  rxjs.filter(x => x.name === AAA),
  rxjs.distinctUntilChanged(_.isEqual)
).subscribe(x => {
  console.log(`${x.name} ${x.payload}`)
})

replay$.pipe(
  rxjs.filter(x => x.name === BBB),
  rxjs.distinctUntilChanged(_.isEqual)
).subscribe(x => {
  console.log(`${x.name} ${x.payload}`)
})

console.log('after sub')

RS1.next({name: BBB, payload: 5}) // does nothing coz of distinctUntilChanged
RS1.next({name: BBB, payload: 6})
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
<script src="https://cdnjs.cloudflare./ajax/libs/lodash.js/4.17.21/lodash.min.js"></script>

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论