I'm still learning rxjs, and I'm a little stuck on how to write a custom pare function for the operator distinctUntilChanged.
I looked into using distinctUntilKeyChanged, which works great for a single key...But I have two keys that I need to pare.
It seems I might need to incorprate the scan operator to pare the current value emitted to the last value emitted....?
Ok, here is my code. I'm streaming map center changes from google maps. I don't need the map center GeoLocation to be very precise, so I'm rounding off most of the decimals returned by google maps.
searchStream$
.map((value)=>{
return {
lat: round(value[1].lat, 1),
lng: round(value[1].lng, 1)
}
}).distinctUntilKeyChanged('lat')
.do((position)=>{console.log(position)})
.subscribe((position)=>{ this._store.dispatch(new QueryUpdateGeoPositionAPIAction({latitude: position.lat, longitude: position.lng})) });
So back to my question, how can I pare both properties(lat & lng) to ensure that it's only emitting values when either one of the values change?
Thanks for the help!
I'm still learning rxjs, and I'm a little stuck on how to write a custom pare function for the operator distinctUntilChanged.
I looked into using distinctUntilKeyChanged, which works great for a single key...But I have two keys that I need to pare.
It seems I might need to incorprate the scan operator to pare the current value emitted to the last value emitted....?
Ok, here is my code. I'm streaming map center changes from google maps. I don't need the map center GeoLocation to be very precise, so I'm rounding off most of the decimals returned by google maps.
searchStream$
.map((value)=>{
return {
lat: round(value[1].lat, 1),
lng: round(value[1].lng, 1)
}
}).distinctUntilKeyChanged('lat')
.do((position)=>{console.log(position)})
.subscribe((position)=>{ this._store.dispatch(new QueryUpdateGeoPositionAPIAction({latitude: position.lat, longitude: position.lng})) });
So back to my question, how can I pare both properties(lat & lng) to ensure that it's only emitting values when either one of the values change?
Thanks for the help!
Share Improve this question asked Jun 20, 2017 at 18:08 calbear47calbear47 1,2113 gold badges21 silver badges42 bronze badges 1- There is an example in the docs: reactivex.io/rxjs/class/es6/… – cartant Commented Jun 20, 2017 at 18:59
2 Answers
Reset to default 3From the RxJS section of the distinct
documentation:
In RxJS, the distinct operator has two optional parameters:
- a function that accepts an item emitted by the source Observable and returns a key which will be used instead of the item itself when paring two items for distinctness
- a function that accepts two items (or two keys) and pares them for distinctness, returning
false
if they are distinct (an equality function is the default if you do not supply your own function here)
So it looks to me (with no testing) that you could simply pass something like
(a, b) => a.lat === b.lat && a.lon === b.lon
I'm not sure about RxJS conventions to know how you should pass this (second optional) parameter.
I was having the same problem and this situation isn't covered by the docs.
this will add the .distinctUntilKeysChanged
operator. just pass it any keys to "watch" for
const {
Observable,
Subject
} = Rx;
Observable.prototype.distinctUntilKeysChanged = function(...keys) {
return this.distinctUntilChanged((old, current) =>
// if no value changed,
// the array will only have true values,
// includes(false) will be false,
// convert to oposite (!),
// returns true;
// => true = do nothing
// if any value changed,
// the array will include some false,
// includes(false) will be true,
// convert to oposite (!),
// returns false;
// => false = emit
!keys
.map(key => old[key] === current[key]) // converts to array of boolean
.includes(false) // if any value changed
);
};
const stream = new Subject();
stream
.distinctUntilKeysChanged('prop', 'prop2')
.subscribe(obj => console.log(obj));
// should log
stream.next({
prop: 42,
prop2: 54,
});
// should log
stream.next({
prop: 12346,
prop2: 54,
});
// shouldn't log because neither prop nor prop2 changed
stream.next({
prop: 12346,
prop2: 54,
});
// should log
stream.next({
prop: 12346,
prop2: 5454665654645,
});
<script src="https://unpkg./@reactivex/[email protected]/dist/global/Rx.js"></script>
the only downside to this is that you can't specify a custom parison function. if you do want to specify one you could use this instead (which is more similar to the implementation of .distinctUntilKeyChanged
where the first argument is the key and the second is the parison function). Notice that this one takes an array of keys where the first one took the keys as separate arguments
const {
Observable,
Subject
} = Rx;
Observable.prototype.distinctUntilKeysChanged = function(keys, pare) {
return this.distinctUntilChanged((old, current) =>
// if no value changed,
// the array will only have true values,
// includes(false) will be false,
// convert to oposite (!),
// returns true;
// => true = do nothing
// if any value changed,
// the array will include some false,
// includes(false) will be true,
// convert to oposite (!),
// returns false;
// => false = emit
!keys
.map(key => pare ? pare(old[key], current[key]) : old[key] === current[key]) // converts to array of boolean
.includes(false) // if any value changed
);
};
const stream = new Subject();
stream
.distinctUntilKeysChanged(['prop', 'prop2'])
.subscribe(obj => console.log(obj));
// should log
stream.next({
prop: 42,
prop2: 54,
});
// should log
stream.next({
prop: 12346,
prop2: 54,
});
// shouldn't log because neither prop nor prop2 changed
stream.next({
prop: 12346,
prop2: 54,
});
// should log
stream.next({
prop: 12346,
prop2: 5454665654645,
});
<script src="https://unpkg./@reactivex/[email protected]/dist/global/Rx.js"></script>
hope you find it useful