最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - How to implement time expiry hot observable in RxJS (or general in Reactive Extensions) - Stack Overflow

programmeradmin2浏览0评论

I'd like to implement Time Expiry cache with RxJs. Here is example of "normal" cache:

//let this represents "heavy duty job"
var data = Rx.Observable.return(Math.random() * 1000).delay(2000);

//and we want to cache result
var cachedData = new Rx.AsyncSubject();
data.subscribe(cachedData);

cachedData.subscribe(function(data){
    //after 2 seconds, result is here and data is cached
    //next subscribe returns immediately data
    cachedData.subscribe(function(data2){ /*this is "instant"*/ });
});

When subscribe on cachedData is called for the first time, "heavy duty job" is called, and after 2 seconds result is saved in cachedData (AsyncSubject). Any other subsequent subscribe on cachedData returns immediately with saved result (thus cache implementation).

What I'd like to achieve is to "spice" this up with time period within cachedData is valid, and when that time passes, I'd like to re-run "heavy duty job" for new data and cache this again for new time period, etc...

Desired behaviour:

//pseudo code
cachedData.youShouldExpireInXSeconds(10);


//let's assume that all code is sequential from here

//this is 1.st run
cachedData.subscribe(function (data) {
    //this first subscription actually runs "heavy duty job", and
    //after 2 seconds first result data is here
});

//this is 2.nd run, just after 1.st run finished
cachedData.subscribe(function (data) {
    //this result is cached
});

//15 seconds later
// cacheData should expired
cachedData.subscribe(function (data) {
    //i'm expecting same behaviour as it was 1.st run:
    // - this runs new "heavy duty job"
    // - and after 2 seconds we got new data result
});


//....
//etc

I'm new to Rx(Js) and cannot figure out how to implement this hot observable with cooldown.

I'd like to implement Time Expiry cache with RxJs. Here is example of "normal" cache:

//let this represents "heavy duty job"
var data = Rx.Observable.return(Math.random() * 1000).delay(2000);

//and we want to cache result
var cachedData = new Rx.AsyncSubject();
data.subscribe(cachedData);

cachedData.subscribe(function(data){
    //after 2 seconds, result is here and data is cached
    //next subscribe returns immediately data
    cachedData.subscribe(function(data2){ /*this is "instant"*/ });
});

When subscribe on cachedData is called for the first time, "heavy duty job" is called, and after 2 seconds result is saved in cachedData (AsyncSubject). Any other subsequent subscribe on cachedData returns immediately with saved result (thus cache implementation).

What I'd like to achieve is to "spice" this up with time period within cachedData is valid, and when that time passes, I'd like to re-run "heavy duty job" for new data and cache this again for new time period, etc...

Desired behaviour:

//pseudo code
cachedData.youShouldExpireInXSeconds(10);


//let's assume that all code is sequential from here

//this is 1.st run
cachedData.subscribe(function (data) {
    //this first subscription actually runs "heavy duty job", and
    //after 2 seconds first result data is here
});

//this is 2.nd run, just after 1.st run finished
cachedData.subscribe(function (data) {
    //this result is cached
});

//15 seconds later
// cacheData should expired
cachedData.subscribe(function (data) {
    //i'm expecting same behaviour as it was 1.st run:
    // - this runs new "heavy duty job"
    // - and after 2 seconds we got new data result
});


//....
//etc

I'm new to Rx(Js) and cannot figure out how to implement this hot observable with cooldown.

Share Improve this question edited Oct 21, 2014 at 11:01 Tomo asked Oct 20, 2014 at 21:34 TomoTomo 6,8971 gold badge24 silver badges32 bronze badges
Add a ment  | 

2 Answers 2

Reset to default 6

All you are missing is to schedule a task to replace your cachedData with a new AsyncSubject after a time period. Here's how to do it as a new Rx.Observable method:

Rx.Observable.prototype.cacheWithExpiration = function(expirationMs, scheduler) {
    var source = this,
        cachedData = undefined;

    // Use timeout scheduler if scheduler not supplied
    scheduler = scheduler || Rx.Scheduler.timeout;

    return Rx.Observable.create(function (observer) {

        if (!cachedData) {
            // The data is not cached.
            // create a subject to hold the result
            cachedData = new Rx.AsyncSubject();

            // subscribe to the query
            source.subscribe(cachedData);

            // when the query pletes, start a timer which will expire the cache
            cachedData.subscribe(function () {
                scheduler.scheduleWithRelative(expirationMs, function () {
                    // clear the cache
                    cachedData = undefined;
                });
            });
        }

        // subscribe the observer to the cached data
        return cachedData.subscribe(observer);
    });
};

Usage:

// a *cold* observable the issues a slow query each time it is subscribed
var data = Rx.Observable.return(42).delay(5000);

// the cached query
var cachedData = data.cacheWithExpiration(15000);

// first observer must wait
cachedData.subscribe();

// wait 3 seconds

// second observer gets result instantly
cachedData.subscribe();

// wait 15 seconds

// observer must wait again
cachedData.subscribe();

A simple solution would be to create a custom pipeable operator to repeatWhen a duration has passed. Here's what I came up with:

 export const refreshAfter = (duration: number) => (source: Observable<any>) =>
                                  source.pipe(
                                     repeatWhen(obs => obs.pipe(delay(duration))),
                                     publishReplay(1), 
                                     refCount());

Then I'm using it like this:

const serverTime$ = this.environmentClient.getServer().pipe(map(s => s.localTime))
const cachedServerTime$ = serverTime.pipe(refreshAfter(5000)); // 5s cache

Important note: This uses publishReplay(1), refCount() because shareReplay(1) doesn't unsubscribe from the source observable so it'll keep hitting your server forever. Unfortunately this has the consequence that on error that error will be replayed from the publishReplay(1), refCount(). There's a 'new improved' shareReplay ing soon. See notes here on a similar question. Once this 'new' version is available this answer should be updated - but the beauty of custom operators is you can fix them in one place.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论