最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - RxJS: How to do some clean-up before the next valid value is passed? - Stack Overflow

programmeradmin2浏览0评论

I have to streams which I merge to get an URL to load an image: one stream for drop events and one for an file input change. On every new path I load this image and draw it to a canvas. This canvas is passed into another stream. It looks like this:

// prevent browsers default behavior for dropTargetElement
[ 'drop', 'dragover' ].forEach(function(eventName) {
  Rx.Observable.fromEvent(dropTargetElement, eventName).subscribe(function(event) {
    event.preventDefault();
  });
});

// file path stream merged from openFileInputs change and openFileDropTargets drop
Rx.Observable.merge(Rx.Observable.fromEvent(inputElement, 'change').map(function(event) {
  return event.target.value;
}), Rx.Observable.fromEvent(dropTargetElement, 'drop').map(function(event) {
  return event.dataTransfer.files[0].path;
})).map(function(path) {
  var image = new Image();
  image.src = path;

  // note: I return an Observable in this map function
  // is this even good practice? if yes, is mergeAll the best
  // way to get the "load" event?
  return Rx.Observable.fromEvent(image, 'load');
}).mergeAll().map(function(event) {
  return event.path[0];
}).subscribe(function(image) {
  var canvas = document.createElement('canvas');
  var context = canvas.getContext('2d');

  canvas.width = image.width;
  canvas.height = image.height;
  context.drawImage(image, 0, 0);

  canvasState.onNext(canvas);
});

(Side question: Is it "allowed" to return Observables in map?)

My canvasState looks like this:

var canvasState = new Rx.BehaviorSubject(undefined);

// draw image
canvasState.filter(function(canvas) {
  return !!canvas;
}).subscribe(function drawImage(canvas) {
  document.body.appendChild(canvas);
});

As you can see I append the canvas to the body, if my canvas value is truthy. But every time a new canvas es in I would like to remove the old one. What is the best way to achieve that? Is something like this possible?

// draw image
canvasState.filter(function(canvas) {
  return !!canvas;
}).beforeNext(function(oldCanvas) {
  // remove old one
}).subscribe(function drawImage(canvas) {
  document.body.appendChild(canvas);
});

I have to streams which I merge to get an URL to load an image: one stream for drop events and one for an file input change. On every new path I load this image and draw it to a canvas. This canvas is passed into another stream. It looks like this:

// prevent browsers default behavior for dropTargetElement
[ 'drop', 'dragover' ].forEach(function(eventName) {
  Rx.Observable.fromEvent(dropTargetElement, eventName).subscribe(function(event) {
    event.preventDefault();
  });
});

// file path stream merged from openFileInputs change and openFileDropTargets drop
Rx.Observable.merge(Rx.Observable.fromEvent(inputElement, 'change').map(function(event) {
  return event.target.value;
}), Rx.Observable.fromEvent(dropTargetElement, 'drop').map(function(event) {
  return event.dataTransfer.files[0].path;
})).map(function(path) {
  var image = new Image();
  image.src = path;

  // note: I return an Observable in this map function
  // is this even good practice? if yes, is mergeAll the best
  // way to get the "load" event?
  return Rx.Observable.fromEvent(image, 'load');
}).mergeAll().map(function(event) {
  return event.path[0];
}).subscribe(function(image) {
  var canvas = document.createElement('canvas');
  var context = canvas.getContext('2d');

  canvas.width = image.width;
  canvas.height = image.height;
  context.drawImage(image, 0, 0);

  canvasState.onNext(canvas);
});

(Side question: Is it "allowed" to return Observables in map?)

My canvasState looks like this:

var canvasState = new Rx.BehaviorSubject(undefined);

// draw image
canvasState.filter(function(canvas) {
  return !!canvas;
}).subscribe(function drawImage(canvas) {
  document.body.appendChild(canvas);
});

As you can see I append the canvas to the body, if my canvas value is truthy. But every time a new canvas es in I would like to remove the old one. What is the best way to achieve that? Is something like this possible?

// draw image
canvasState.filter(function(canvas) {
  return !!canvas;
}).beforeNext(function(oldCanvas) {
  // remove old one
}).subscribe(function drawImage(canvas) {
  document.body.appendChild(canvas);
});
Share Improve this question asked Feb 1, 2015 at 14:24 PipoPipo 5,8217 gold badges37 silver badges46 bronze badges 3
  • 1 Note that you probably need to listen to the image's error event as well so you can take action if an image fails to load... – Brandon Commented Feb 3, 2015 at 4:35
  • You're right! Thanks for the tip. – Pipo Commented Feb 3, 2015 at 6:16
  • Interesting question with a good answer. Note that there are other resources you create and don't clean up: the event listener themselves. I just wrote about this in github./Reactive-Extensions/RxJS/issues/1016 Found this question while searching how to clean up these subscriptions... :-) – PhiLho Commented Nov 4, 2015 at 15:49
Add a ment  | 

1 Answer 1

Reset to default 8

Nested Observables

Yes it is normal to have a map operation return an Observable. This gives you an observable stream of observable streams. There are Rx 3 operations to flatten that nested observable one level:

  • mergeAll - subscribe to all of the inner streams as they arrive. You usually end up concurrently subscribed to multiple streams and usually your final stream will have results from the different inner streams intermingled. You can supply a maxConcurrent parameter to limit the number of concurrent inner subscriptions. When the limit is hit, new inner observables get queued and do not get subscribed until a previous inner observable pletes.
  • concatAll - subscribe to each inner observable stream one at a time, in order. Your resulting stream will produce items in a predictable order. Concat is just Merge with maxConcurrent set to 1.
  • switch - subscribe to the first inner stream. Then, when a new stream arrives, "switch" to it. Basically unsubscribe from the previous inner stream and subscribe to the latest inner stream. Use switch when you only want to listen to the most recent inner stream.

(In my code, I've also implemented a fourth flattening operator: concatLatest which is like concat, but when an inner stream pletes, instead of jumping to the next stream in the queue, it jumps straight to the most recent sequence in the queue, throwing away any inner streams skipped. It is sort of in between concat and switch in behavior and I find it very useful for handling backpressure while still producing results).

So, it is very mon to have .map(...).mergeAll() or .map(...).concatAll() or .map(...).switch(). It is so mon, that there are Rx methods for these 3 cases:

  • source.flatMap(selector) is equivalent to source.map(selector).mergeAll()
  • source.concatMap(selector) is equivalent to source.map(selector).concatAll()
  • source.flatMapLatest(selector) is equivalent to source.map(selector).switch()

Your Code

With the above information, I'd suggest you use switch instead of merge. As your code stands now, if someone changes the value quickly, you could have 2 image requests out simultaneously and if they plete in the wrong order you will end up with the wrong final canvas. switch will cancel the stale image request and switch over to the new one.

Note that if the user keeps changing the value faster than the image requests can plete, then they could never see any images since you'll keep switching to the new one. This is where I usually use my concatLatest instead because it will plete some intermediate requests now and then while it is attemping to keep up. YMMV.

Cleaning up the old canvases

When my observable stream has a map operation that produces resources that must be cleaned up, I tend to use scan because it can track the previous value for me (without me needing to go make a closure variable to hold the previous value):

canvasState
    .filter(function (c) { return !!c; })
    .scan({}, function (acc, canvas) {
        return { previous: acc.canvas, canvas: canvas };
    })
    .subscribe(function (v) {
        // remove the old canvas
        if (v.previous) { document.body.removeChild(v.previous); }

        // add the new one
        document.body.appendChild(v.canvas);
    });
发布评论

评论列表(0)

  1. 暂无评论