I have to streams which I merge to get an URL to load an image: one stream for drop events and one for an file input change. On every new path I load this image and draw it to a canvas. This canvas is passed into another stream. It looks like this:
// prevent browsers default behavior for dropTargetElement
[ 'drop', 'dragover' ].forEach(function(eventName) {
Rx.Observable.fromEvent(dropTargetElement, eventName).subscribe(function(event) {
event.preventDefault();
});
});
// file path stream merged from openFileInputs change and openFileDropTargets drop
Rx.Observable.merge(Rx.Observable.fromEvent(inputElement, 'change').map(function(event) {
return event.target.value;
}), Rx.Observable.fromEvent(dropTargetElement, 'drop').map(function(event) {
return event.dataTransfer.files[0].path;
})).map(function(path) {
var image = new Image();
image.src = path;
// note: I return an Observable in this map function
// is this even good practice? if yes, is mergeAll the best
// way to get the "load" event?
return Rx.Observable.fromEvent(image, 'load');
}).mergeAll().map(function(event) {
return event.path[0];
}).subscribe(function(image) {
var canvas = document.createElement('canvas');
var context = canvas.getContext('2d');
canvas.width = image.width;
canvas.height = image.height;
context.drawImage(image, 0, 0);
canvasState.onNext(canvas);
});
(Side question: Is it "allowed" to return Observables in map
?)
My canvasState
looks like this:
var canvasState = new Rx.BehaviorSubject(undefined);
// draw image
canvasState.filter(function(canvas) {
return !!canvas;
}).subscribe(function drawImage(canvas) {
document.body.appendChild(canvas);
});
As you can see I append the canvas to the body, if my canvas value is truthy. But every time a new canvas es in I would like to remove the old one. What is the best way to achieve that? Is something like this possible?
// draw image
canvasState.filter(function(canvas) {
return !!canvas;
}).beforeNext(function(oldCanvas) {
// remove old one
}).subscribe(function drawImage(canvas) {
document.body.appendChild(canvas);
});
I have to streams which I merge to get an URL to load an image: one stream for drop events and one for an file input change. On every new path I load this image and draw it to a canvas. This canvas is passed into another stream. It looks like this:
// prevent browsers default behavior for dropTargetElement
[ 'drop', 'dragover' ].forEach(function(eventName) {
Rx.Observable.fromEvent(dropTargetElement, eventName).subscribe(function(event) {
event.preventDefault();
});
});
// file path stream merged from openFileInputs change and openFileDropTargets drop
Rx.Observable.merge(Rx.Observable.fromEvent(inputElement, 'change').map(function(event) {
return event.target.value;
}), Rx.Observable.fromEvent(dropTargetElement, 'drop').map(function(event) {
return event.dataTransfer.files[0].path;
})).map(function(path) {
var image = new Image();
image.src = path;
// note: I return an Observable in this map function
// is this even good practice? if yes, is mergeAll the best
// way to get the "load" event?
return Rx.Observable.fromEvent(image, 'load');
}).mergeAll().map(function(event) {
return event.path[0];
}).subscribe(function(image) {
var canvas = document.createElement('canvas');
var context = canvas.getContext('2d');
canvas.width = image.width;
canvas.height = image.height;
context.drawImage(image, 0, 0);
canvasState.onNext(canvas);
});
(Side question: Is it "allowed" to return Observables in map
?)
My canvasState
looks like this:
var canvasState = new Rx.BehaviorSubject(undefined);
// draw image
canvasState.filter(function(canvas) {
return !!canvas;
}).subscribe(function drawImage(canvas) {
document.body.appendChild(canvas);
});
As you can see I append the canvas to the body, if my canvas value is truthy. But every time a new canvas es in I would like to remove the old one. What is the best way to achieve that? Is something like this possible?
// draw image
canvasState.filter(function(canvas) {
return !!canvas;
}).beforeNext(function(oldCanvas) {
// remove old one
}).subscribe(function drawImage(canvas) {
document.body.appendChild(canvas);
});
Share
Improve this question
asked Feb 1, 2015 at 14:24
PipoPipo
5,8217 gold badges37 silver badges46 bronze badges
3
- 1 Note that you probably need to listen to the image's error event as well so you can take action if an image fails to load... – Brandon Commented Feb 3, 2015 at 4:35
- You're right! Thanks for the tip. – Pipo Commented Feb 3, 2015 at 6:16
- Interesting question with a good answer. Note that there are other resources you create and don't clean up: the event listener themselves. I just wrote about this in github./Reactive-Extensions/RxJS/issues/1016 Found this question while searching how to clean up these subscriptions... :-) – PhiLho Commented Nov 4, 2015 at 15:49
1 Answer
Reset to default 8Nested Observables
Yes it is normal to have a map
operation return an Observable. This gives you an observable stream of observable streams. There are Rx 3 operations to flatten that nested observable one level:
- mergeAll - subscribe to all of the inner streams as they arrive. You usually end up concurrently subscribed to multiple streams and usually your final stream will have results from the different inner streams intermingled. You can supply a
maxConcurrent
parameter to limit the number of concurrent inner subscriptions. When the limit is hit, new inner observables get queued and do not get subscribed until a previous inner observable pletes. - concatAll - subscribe to each inner observable stream one at a time, in order. Your resulting stream will produce items in a predictable order. Concat is just
Merge
with maxConcurrent set to 1. - switch - subscribe to the first inner stream. Then, when a new stream arrives, "switch" to it. Basically unsubscribe from the previous inner stream and subscribe to the latest inner stream. Use
switch
when you only want to listen to the most recent inner stream.
(In my code, I've also implemented a fourth flattening operator: concatLatest
which is like concat
, but when an inner stream pletes, instead of jumping to the next stream in the queue, it jumps straight to the most recent sequence in the queue, throwing away any inner streams skipped. It is sort of in between concat
and switch
in behavior and I find it very useful for handling backpressure while still producing results).
So, it is very mon to have .map(...).mergeAll()
or .map(...).concatAll()
or .map(...).switch()
. It is so mon, that there are Rx methods for these 3 cases:
source.flatMap(selector)
is equivalent tosource.map(selector).mergeAll()
source.concatMap(selector)
is equivalent tosource.map(selector).concatAll()
source.flatMapLatest(selector)
is equivalent tosource.map(selector).switch()
Your Code
With the above information, I'd suggest you use switch
instead of merge
. As your code stands now, if someone changes the value quickly, you could have 2 image requests out simultaneously and if they plete in the wrong order you will end up with the wrong final canvas. switch
will cancel the stale image request and switch over to the new one.
Note that if the user keeps changing the value faster than the image requests can plete, then they could never see any images since you'll keep switching to the new one. This is where I usually use my concatLatest
instead because it will plete some intermediate requests now and then while it is attemping to keep up. YMMV.
Cleaning up the old canvases
When my observable stream has a map operation that produces resources that must be cleaned up, I tend to use scan
because it can track the previous value for me (without me needing to go make a closure variable to hold the previous value):
canvasState
.filter(function (c) { return !!c; })
.scan({}, function (acc, canvas) {
return { previous: acc.canvas, canvas: canvas };
})
.subscribe(function (v) {
// remove the old canvas
if (v.previous) { document.body.removeChild(v.previous); }
// add the new one
document.body.appendChild(v.canvas);
});