最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - RxJs Dynamically add events from another EventEmitter - Stack Overflow

programmeradmin0浏览0评论

I have an Observable ing from an EventEmitter which is really just a http connection, streaming events.

Occasionally I have to disconnect from the underlying stream and reconnect. I am not sure how to handle this with rxjs.

I am not sure if i can plete a source and then dynamically add other "source" to the source, or if I have to do something like i have at the very bottom.

var Rx = require('rx'),
    EventEmitter = require('events').EventEmitter;

  var eventEmitter = new EventEmitter();
  var eventEmitter2 = new EventEmitter();

  var source = Rx.Observable.fromEvent(eventEmitter, 'data')

  var subscription = source.subscribe(function (data) {
    console.log('data: ' + data);
  });

  setInterval(function() {
    eventEmitter.emit('data', 'foo');
  }, 500);

  // eventEmitter stop emitting data, underlying connection closed
  // now attach seconds eventemitter (new connection)

  // something like this but obvouisly doesn't work
  source
    .fromEvent(eventEmitter2, 'data')

Puesdo code that is more of what i am doing, I am creating a second stream connection before I close the first, so i don't "lose" any data. Here i am not sure how to stop the Observable without "losing" records due to onNext not being called due to the buffer.

  var streams = [], notifiers = [];

  // create initial stream
  createNewStream();

  setInterval(function() {
    if (params of stream have changed) createNewStream();
  }, $1minutes / 3);

  function createNewStream() {
    var stream = new eventEmitterStream();

    stream.once('connected', function() {
      stopOthers();

      streams.push(stream);
      createSource(stream, 'name', 'id');
    });
  }

  function stopOthers() {
    while(streams.length > 0) {
      streams.pop().stop(); // stop the old stream
    }

    while(notifiers.length > 0) {
      // if i call this, the buffer may lose records, before onNext() called
      //notifiers.pop()(Rx.Notification.createOnCompleted());
    }
  }

  function createObserver(tag) {
    return Rx.Observer.create(
      function (x) {
        console.log('Next: ', tag, x.length, x[0], x[x.length-1]);
      },
      function (err) {
        console.log('Error: ', tag, err);
      },
      function () {
        console.log('Completed', tag);
      });
  }

  function createSource(stream, event, id) {
    var source = Rx.Observable
      .fromEvent(stream, event)
      .bufferWithTimeOrCount(time, max);

    var subscription = source.subscribe(createObserver(id));
    var notifier = subscription.toNotifier();
    notifiers.push(notifier);
  }

I have an Observable ing from an EventEmitter which is really just a http connection, streaming events.

Occasionally I have to disconnect from the underlying stream and reconnect. I am not sure how to handle this with rxjs.

I am not sure if i can plete a source and then dynamically add other "source" to the source, or if I have to do something like i have at the very bottom.

var Rx = require('rx'),
    EventEmitter = require('events').EventEmitter;

  var eventEmitter = new EventEmitter();
  var eventEmitter2 = new EventEmitter();

  var source = Rx.Observable.fromEvent(eventEmitter, 'data')

  var subscription = source.subscribe(function (data) {
    console.log('data: ' + data);
  });

  setInterval(function() {
    eventEmitter.emit('data', 'foo');
  }, 500);

  // eventEmitter stop emitting data, underlying connection closed
  // now attach seconds eventemitter (new connection)

  // something like this but obvouisly doesn't work
  source
    .fromEvent(eventEmitter2, 'data')

Puesdo code that is more of what i am doing, I am creating a second stream connection before I close the first, so i don't "lose" any data. Here i am not sure how to stop the Observable without "losing" records due to onNext not being called due to the buffer.

  var streams = [], notifiers = [];

  // create initial stream
  createNewStream();

  setInterval(function() {
    if (params of stream have changed) createNewStream();
  }, $1minutes / 3);

  function createNewStream() {
    var stream = new eventEmitterStream();

    stream.once('connected', function() {
      stopOthers();

      streams.push(stream);
      createSource(stream, 'name', 'id');
    });
  }

  function stopOthers() {
    while(streams.length > 0) {
      streams.pop().stop(); // stop the old stream
    }

    while(notifiers.length > 0) {
      // if i call this, the buffer may lose records, before onNext() called
      //notifiers.pop()(Rx.Notification.createOnCompleted());
    }
  }

  function createObserver(tag) {
    return Rx.Observer.create(
      function (x) {
        console.log('Next: ', tag, x.length, x[0], x[x.length-1]);
      },
      function (err) {
        console.log('Error: ', tag, err);
      },
      function () {
        console.log('Completed', tag);
      });
  }

  function createSource(stream, event, id) {
    var source = Rx.Observable
      .fromEvent(stream, event)
      .bufferWithTimeOrCount(time, max);

    var subscription = source.subscribe(createObserver(id));
    var notifier = subscription.toNotifier();
    notifiers.push(notifier);
  }
Share Improve this question asked Apr 24, 2015 at 22:37 dredre 1,4343 gold badges21 silver badges31 bronze badges 2
  • 1 Read the part about buffering and backpressure in the docs. – Benjamin Gruenbaum Commented Apr 25, 2015 at 14:28
  • @BenjaminGruenbaum I have read the docs. There are many examples on general pages and on method specific ones. This is where I got the buffer methods. Is there a specific page or portion you think would've most helpful. – dre Commented Apr 26, 2015 at 17:39
Add a ment  | 

1 Answer 1

Reset to default 9

First and formost, you need to make sure you can remove all listeners from your previously "dead" emitter. Otherwise you'll create a leaky application.

It seems like the only way you'll know that an EventEmitter has died is to watch frequency, unless you have an event that fires on error or pletion (for disconnections). The latter is much, much more preferrable.

Regardless, The secret sauce of Rx is making sure to wrap your data stream creation and teardown in your observable. If wrap the creation of the emitter in your observable, as well as a means to tear it down, you'll be able to use awesome things like the retry operator to recreate that observable.

So if you have no way of knowing if it died, and you want to reconnect it, you can use something like this:

// I'll presume you have some function to get an EventEmitter that 
// is already set up
function getEmitter() {
  var emitter = new EventEmitter();
  setInterval(function(){
    emitter.emit('data', 'foo');
  }, 500)
  return emitter;
}


var emitterObservable = Observable.create(function(observer) {
  // setup the data stream
  var emitter = getEmitter();
  var handler = function(d) {
    observer.onNext(d);
  };
  emitter.on('data', handler);

  return function() {
    // tear down the data stream in your disposal function
    emitter.removeListener('on', handler);
  };
});

// Now you can do Rx magic!
emitterObservable
  // if it doesn't emit in 700ms, throw a timeout error
  .timeout(700)
  // catch all* errors and retry
  // this means the emitter will be torn down and recreated 
  // if it times out!
  .retry()
  // do something with the values
  .subscribe(function(x) { console.log(x); });

* NOTE: retry catches all errors, so you may want to add a catch above it to handle non-timeout errors. Up to you.

发布评论

评论列表(0)

  1. 暂无评论