Is there a good way to check if not pleted Observable is empty at that exact time?
let cache = new ReplaySubject<number>(1);
...
// Here I want to know if 'cache' still empty or not. And, for example, fill it with initial value.
cache.isEmpty().subscribe(isEmpty => {
if (isEmpty) {
console.log("I want to be here!!!");
cache.next(0);
}
});
// but that code does not work until cacheplete()
Is there a good way to check if not pleted Observable is empty at that exact time?
let cache = new ReplaySubject<number>(1);
...
// Here I want to know if 'cache' still empty or not. And, for example, fill it with initial value.
cache.isEmpty().subscribe(isEmpty => {
if (isEmpty) {
console.log("I want to be here!!!");
cache.next(0);
}
});
// but that code does not work until cache.plete()
Share
Improve this question
asked Jan 11, 2017 at 8:29
vdshbvdshb
1,9992 gold badges29 silver badges43 bronze badges
1
-
An "empty"
Observable
isn't really a meaningful concept in this context, I think you are conflating concerns between theObserver
andObservable
. What is it you are trying to acplish by this? – paulpdaniels Commented Jan 11, 2017 at 9:00
4 Answers
Reset to default 4Actually, it's not that simple and the accepted answer is not very universal. You want to check whether ReplaySubject
is empty at this particular point in time.
However, if you want to make this truly patible with ReplaySubject
you need to take into account also windowTime
parameter that specifies "time to live" for each value that goes through this object. This means that whether your cache
is empty or not will change in time.
ReplaySubject
has method _trimBufferThenGetEvents
that does what you need. Unfortunately, this method is private so you need to make a little "hack" in JavaScript and extend its prototype
directly.
import { ReplaySubject } from 'rxjs';
// Tell the piler there's a isNowEmpty() method
declare module "rxjs/ReplaySubject" {
interface ReplaySubject<T> {
isNowEmpty(): boolean;
}
}
ReplaySubject.prototype['isNowEmpty'] = function() {
let events = this._trimBufferThenGetEvents();
return events.length > 0;
};
Then using this ReplaySubject
is simple:
let s = new ReplaySubject<number>(1, 100);
s.next(3);
console.log(s.isNowEmpty());
s.next(4);
setTimeout(() => {
s.next(5);
s.subscribe(val => console.log('cached:', val));
console.log(s.isNowEmpty());
}, 200);
setTimeout(() => {
console.log(s.isNowEmpty());
}, 400);
Note that some calls to isNowEmpty()
return true
, while others return false
. For example the last one returns false
because the value was invalidated in the meantime.
This example prints:
true
cached: 5
true
false
See live demo: https://jsbin./sutaka/3/edit?js,console
You could use .scan()
to accumulate your count, and map that to a boolean whether it's nonzero. (It takes a second parameter for a seed value which would make it start with a 0, so it always reflects the current count.)
I've also added a .filter() instead of an if statement to make it cleaner:
let cache = new ReplaySubject<number>(1);
cache
.map((object: T) => 1)
.scan((count: number, ining: number) => count + ining, 0)
.map((sum) => sum == 0)
.filter((isEmpty: boolean) => isEmpty)
.subscribe((isEmpty: boolean) => {
console.log("I want to be here!!!");
cache.next(0);
});
You could use takeUntil()
:
Observable.of(true)
.takeUntil(cache)
.do(isEmpty => {
if (isEmpty) {
console.log("I want to be here!!!");
cache.next(0);
}
})
.subscribe();
However this will just work once.
Another way would be to "null" the cache and initialize it as empty by using a BehaviorSubject
:
let cache = new BehaviorSubject<number>(null as any);
...
cache
.do(content => {
if (content == null) {
console.log("I want to be here!!!");
cache.next(0);
}
})
.subscribe();
And of course you could initialize the cache with some default value right away.
startWith
let cache = new ReplaySubject<number>(1);
isEmpty$ = cache.pipe(mapTo(false), startWith(true));
This says:
- Whatever the value is emitted by cache - map it to
false
. (because it isn't empty after an emission) - Start with
true
if nothing has been emitted yet (because that means it's empty)