最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - How can I get the single latest value from an infinite RxJs stream that is not the initial value? - Stack Overflow

programmeradmin1浏览0评论

The concept

This is a mocked angular2 project.

When consuming the observable stream from the redux store I tried to filter first and then take/takeLast/last the latest value. After that I want to resolve the promise when the stream pletes but it does not when using takeLast operator.

So the question is: What operator setup can I use to get the latest value from the stream?

The setup

I simplified my Angular 2 setup to this gist of RxJs usage.

  • source observable is managed by redux library and is not pleted
  • service is providing some logic to retrieve the latest value from the stream
  • ponent is consuming value promise style

Here is a working example: /

The redux store mock:

var latestTime$ = new Rx.Subject();
setInterval(function(){
     latestTime$.onNext(Date.now()); 
}, 2000);

The service injectable mock:

var timeStore = null;
var getLatestTime = function() {

  return new Promise((resolve, reject) => {

     latestTime$

     /* 
        filter out 'null' for when the button is clicked
        before the store updates the first time
      */
     .filter(function(x) {
        console.log('filter: ', x);
        return x === typeof('number');
     })

     // try to end to stream by taking the last from the stream ?!?!?!?
     .takeLast(1)

     // handle promise
     .subscribe(

       function (x) {
         console.log('Next: ' + x);
         // store latest stream value
         timeStore = x;
       },
       function (err) {
         console.log('Error: ' + err);
         reject(err)
       },
       function () {
         console.log('Completed');
         // pass on latest value of endless when stream pletes 
         resolve(timeStore);
       }

    );

  });

};

And a consuming mock ponent:

document.querySelector("#foo").addEventListener("click", function(event) {

  var time = getLatestTime();

  time.then((latestTime) => {
    console.log('latestTime: ', latestTime);
  });

  time.catch((err) => {
    console.log('oh oh: ', err);
  });

}, false);

The concept

This is a mocked angular2 project.

When consuming the observable stream from the redux store I tried to filter first and then take/takeLast/last the latest value. After that I want to resolve the promise when the stream pletes but it does not when using takeLast operator.

So the question is: What operator setup can I use to get the latest value from the stream?

The setup

I simplified my Angular 2 setup to this gist of RxJs usage.

  • source observable is managed by redux library and is not pleted
  • service is providing some logic to retrieve the latest value from the stream
  • ponent is consuming value promise style

Here is a working example: https://fiddle.jshell/markus_falk/an41z6g9/

The redux store mock:

var latestTime$ = new Rx.Subject();
setInterval(function(){
     latestTime$.onNext(Date.now()); 
}, 2000);

The service injectable mock:

var timeStore = null;
var getLatestTime = function() {

  return new Promise((resolve, reject) => {

     latestTime$

     /* 
        filter out 'null' for when the button is clicked
        before the store updates the first time
      */
     .filter(function(x) {
        console.log('filter: ', x);
        return x === typeof('number');
     })

     // try to end to stream by taking the last from the stream ?!?!?!?
     .takeLast(1)

     // handle promise
     .subscribe(

       function (x) {
         console.log('Next: ' + x);
         // store latest stream value
         timeStore = x;
       },
       function (err) {
         console.log('Error: ' + err);
         reject(err)
       },
       function () {
         console.log('Completed');
         // pass on latest value of endless when stream pletes 
         resolve(timeStore);
       }

    );

  });

};

And a consuming mock ponent:

document.querySelector("#foo").addEventListener("click", function(event) {

  var time = getLatestTime();

  time.then((latestTime) => {
    console.log('latestTime: ', latestTime);
  });

  time.catch((err) => {
    console.log('oh oh: ', err);
  });

}, false);
Share Improve this question edited Sep 29, 2016 at 11:04 Markus asked Sep 29, 2016 at 7:48 MarkusMarkus 2,2743 gold badges22 silver badges32 bronze badges 7
  • I'm not sure really get the gist of the question's code or the fiddle, but an answer to your question's title would be: latestTime$.skip(1).take(1).toPromise(). The takeLast operator won't emit anything until the underlying observable pletes - that appears to the be your problem - and you shouldn't need to create your own promise. – cartant Commented Sep 29, 2016 at 8:20
  • I think you got the gist. Problem with skip(1).take(1) is that it will take the second entry in the stream but I need the last one. – Markus Commented Sep 29, 2016 at 9:05
  • I think you can use just last() operator or if you want to skip the first value and take the last use skip(1).last(). – martin Commented Sep 29, 2016 at 9:15
  • Just tried in the fiddle. skip(1).last() will not even call next? – Markus Commented Sep 29, 2016 at 9:23
  • 1 No, because last() emits only the last value from Observable after it pletes. See reactivex.io/documentation/operators/last.html – martin Commented Sep 29, 2016 at 9:27
 |  Show 2 more ments

2 Answers 2

Reset to default 2

This should simulate your situation.

See live demo: https://jsfiddle/usualcarrot/zh07hfrc/1/

var subject = new Rx.Subject();

subject.skip(1).last().subscribe(function(val) {
  console.log('next:', val);
}, function(val) {
  console.log('error:', val);
}, function() {
  console.log('pleted');
});

subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
subject.onNext(5);
subject.onCompleted();

This prints to console:

next: 5
pleted

Instead of console.log('pleted'); you'd put the resolve(...). Maybe this is not even necessary and you can use just return the Subject and subscribe to it as well (?) depending on your use case. In that case use asObservable() to hide the fact you're using a Subject. See similar use-case with asObservable().

I use bineLatest for such scenarios. Here is a simple POC:


import { ReplaySubject, bineLatest, of } from 'rxjs';

const source = new ReplaySubject();

source.next('1');
source.next('2');
source.next('3');

bineLatest([source, of(true)]).subscribe(([data]) => {
  console.log('data:', data) // data: 3
});

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论