最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - ReactiveX: Group and Buffer only last item in each group - Stack Overflow

programmeradmin0浏览0评论

How to group an Observable, and from each GroupedObservable keep in memory only the last emitted item? So that each group would behave just like BehaviorSubject.

Something like this:

{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}

So in memory we'd have only have the last item for each user:

{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}

And when a subscriber subscribes, these two items were issued right away (each in it's own emission). Like we had BehaviorSubject for each user.

onCompleted() is never expected to fire, as people may chat forever.

I don't know in advance what user values there can be.

How to group an Observable, and from each GroupedObservable keep in memory only the last emitted item? So that each group would behave just like BehaviorSubject.

Something like this:

{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}

So in memory we'd have only have the last item for each user:

{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}

And when a subscriber subscribes, these two items were issued right away (each in it's own emission). Like we had BehaviorSubject for each user.

onCompleted() is never expected to fire, as people may chat forever.

I don't know in advance what user values there can be.

Share Improve this question edited Nov 18, 2016 at 5:01 user3743222 18.7k5 gold badges73 silver badges75 bronze badges asked Dec 8, 2015 at 0:21 Dzmitry LazerkaDzmitry Lazerka 1,9212 gold badges23 silver badges43 bronze badges 2
  • 1 Its unclear from your description what format you want the results in. Do you want a single stream that emits an array of the last items? Or do you want separate Observables for each user? – paulpdaniels Commented Dec 8, 2015 at 2:00
  • @paulpdaniels Good question. The answer is, well, doesn't really matter. My task needs single stream, but if I have multiple Observables, then I'll just merge them into one. It's better to get each item in it's own emission, although an array of items in one emission would work too. – Dzmitry Lazerka Commented Dec 8, 2015 at 3:30
Add a ment  | 

2 Answers 2

Reset to default 3

I assume your chatlog observable is hot. The groupObservables emitted by #groupBy will consequently also be hot and won't keep anything in memory by themselves.

To get the behavior you want (discard everything but the last value from before subscription and continue from there) you could use a ReplaySubject(1).

Please correct me if I'm wrong

see jsbin

var groups = chatlog
      .groupBy(message => message.user)
      .map(groupObservable => {
        var subject = new Rx.ReplaySubject(1);
        groupObservable.subscribe(value => subject.onNext(value));
        return subject;
      });

You can write the reducing function that turns out the latest emitted items of grouped observables, pass that to a scan observable, and use shareReplay to recall the last values emitted for new subscribers. It would be something like this :

var fn_scan = function ( aMessages, message ) {
  // aMessages is the latest array of messages
  // this function will update aMessages to reflect the arrival of the new message
  var aUsers = aMessages.map(function ( x ) {return x.user;});
  var index = aUsers.indexOf(message.user);
  if (index > -1) {
    // remove previous message from that user...
    aMessages.splice(index, 1);
  }
  // ...and push the latest message
  aMessages.push(message);
  return aMessages;
};
var groupedLatestMessages$ = messages$
    .scan(fn_scan, [])
    .shareReplay(1);

So what you get anytime you subscribe is an array whose size at any moment will be the number of users who emitted messages, and whose content will be the messages emitted by the users ordered by time of emission.

Anytime there is a subscription the latest array is immediately passed on to the subscriber. That's an array though, I can't think of a way how to pass the values one by one, at the same time fulfilling your specifications. Hope that is enough for your use case.

UPDATE : jsbin here http://jsfiddle/zs7ydw6b/2

发布评论

评论列表(0)

  1. 暂无评论