最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - Keep values on hold and compare new values with those on hold - Stack Overflow

programmeradmin1浏览0评论

Using one Observable in RxJS, all incoming values should be treated like such:

  • Keep each value on hold for N seconds.

  • If within those N seconds, another value matching a custom matcher comes in, then ignore the new value. I.e. compare each new value with all values currently on hold. Values already pushed to the subscribers do not matter. I need a customer matcher function, plain distinct won't do (dealing with objects here and e.g. {ids:['a','b']} should equal {ids: ['b','a']}

  • If within those N seconds, another value matching some custom criteria comes in, then merge the new value and the original value to one value, using a custom function. Again: Only care about values on hold, do not care about previous values already pushed to subscribers.

  • For both cases: It should not matter if in between different values come in. I.e. I cannot use operators which simply compare previous values (like distinctUntilChanged)

  • After N seconds have passed, push to subscribers.

How do I do that? (Tried to fiddle with scan, mergeScan, windowTime and some other operators but could not figure it out.)

Edit: As per comment request here's the current code-snippet, but it won't help much I guess:

const { from } = require('rxjs');
const { windowTime, concatMap, distinct, tap, filter, map, debounceTime, distinctUntilChanged, scan, mergeScan } = require('rxjs/operators');

const myInpExamples = [
    {userId: "userA", ids: ["aa"]},
    {userId: "userB", ids: ["cc"]},
    {userId: "userA", ids: ["aa"]},
    {userId: "userA", ids: ["aa"]},
    {userId: "userB", ids: ["bb"]},
    {otherIds: ['11']},
    {otherIds: ['22']}
]

const arrayDataObservable$ = from(myInpExamples);

const dataPipeline = arrayDataObservable$.pipe(
    windowTime(1000),
    concatMap((obs) => obs.pipe(distinctUntilChanged(    
        (prev, curr) => {
            return JSON.stringify(prev) === JSON.stringify(curr)
        }    
        )))
)

const subscribeToDataPipeline = subscriberName => {
    return dataPipeline.subscribe(val => {
        console.log(subscriberName + ' received: ' + JSON.stringify(val, null, 2));
    })
}

const handleSubscriptionToDataPipeline = () => {
    const subscription1 = subscribeToDataPipeline('Subscriber1');
}

handleSubscriptionToDataPipeline();

Expected outcome here is that Subscriber1 gets those values:

  • {userId: "userA", ids: ["aa"]}
  • {userId: "userB", ids: ["cc"]}
  • {userId: "userB", ids: ["bb"]}
  • {otherIds: ['11']}
  • {otherIds: ['22']}
发布评论

评论列表(0)

  1. 暂无评论