Using one Observable
in RxJS, all incoming values should be treated like such:
Keep each value on hold for N seconds.
If within those N seconds, another value matching a custom matcher comes in, then ignore the new value. I.e. compare each new value with all values currently on hold. Values already pushed to the subscribers do not matter. I need a customer matcher function, plain
distinct
won't do (dealing with objects here and e.g.{ids:['a','b']}
should equal{ids: ['b','a']}
If within those N seconds, another value matching some custom criteria comes in, then merge the new value and the original value to one value, using a custom function. Again: Only care about values on hold, do not care about previous values already pushed to subscribers.
For both cases: It should not matter if in between different values come in. I.e. I cannot use operators which simply compare previous values (like
distinctUntilChanged
)After N seconds have passed, push to subscribers.
How do I do that? (Tried to fiddle with scan
, mergeScan
, windowTime
and some other operators but could not figure it out.)
Edit: As per comment request here's the current code-snippet, but it won't help much I guess:
const { from } = require('rxjs');
const { windowTime, concatMap, distinct, tap, filter, map, debounceTime, distinctUntilChanged, scan, mergeScan } = require('rxjs/operators');
const myInpExamples = [
{userId: "userA", ids: ["aa"]},
{userId: "userB", ids: ["cc"]},
{userId: "userA", ids: ["aa"]},
{userId: "userA", ids: ["aa"]},
{userId: "userB", ids: ["bb"]},
{otherIds: ['11']},
{otherIds: ['22']}
]
const arrayDataObservable$ = from(myInpExamples);
const dataPipeline = arrayDataObservable$.pipe(
windowTime(1000),
concatMap((obs) => obs.pipe(distinctUntilChanged(
(prev, curr) => {
return JSON.stringify(prev) === JSON.stringify(curr)
}
)))
)
const subscribeToDataPipeline = subscriberName => {
return dataPipeline.subscribe(val => {
console.log(subscriberName + ' received: ' + JSON.stringify(val, null, 2));
})
}
const handleSubscriptionToDataPipeline = () => {
const subscription1 = subscribeToDataPipeline('Subscriber1');
}
handleSubscriptionToDataPipeline();
Expected outcome here is that Subscriber1
gets those values:
- {userId: "userA", ids: ["aa"]}
- {userId: "userB", ids: ["cc"]}
- {userId: "userB", ids: ["bb"]}
- {otherIds: ['11']}
- {otherIds: ['22']}