I'm trying to use RxJS for a simple short poll. It needs to make a request once every delay
seconds to the location path
on the server, ending once one of two conditions are reached: either the callback isComplete(data)
returns true or it has tried the server more than maxTries
. Here's the basic code:
newShortPoll(path, maxTries, delay, isComplete) {
return Observable.interval(delay)
.take(maxTries)
.flatMap((tryNumber) => http.get(path))
.doWhile((data) => !isComplete(data));
}
However, doWhile doesn't exist in RxJS 5.0, so the condition where it can only try the server maxTries
works, thanks to the take() call, but the isComplete
condition does not work. How can I make it so the observable will next() values until isComplete returns true, at which point it will next() that value and plete().
I should note that takeWhile()
does not work for me here. It does not return the last value, which is actually the most important, since that's when we know it's done.
Thanks!
I'm trying to use RxJS for a simple short poll. It needs to make a request once every delay
seconds to the location path
on the server, ending once one of two conditions are reached: either the callback isComplete(data)
returns true or it has tried the server more than maxTries
. Here's the basic code:
newShortPoll(path, maxTries, delay, isComplete) {
return Observable.interval(delay)
.take(maxTries)
.flatMap((tryNumber) => http.get(path))
.doWhile((data) => !isComplete(data));
}
However, doWhile doesn't exist in RxJS 5.0, so the condition where it can only try the server maxTries
works, thanks to the take() call, but the isComplete
condition does not work. How can I make it so the observable will next() values until isComplete returns true, at which point it will next() that value and plete().
I should note that takeWhile()
does not work for me here. It does not return the last value, which is actually the most important, since that's when we know it's done.
Thanks!
Share Improve this question asked Mar 2, 2016 at 20:48 Colton VoegeColton Voege 1231 silver badge5 bronze badges 2- Possibly a duplicate of: stackoverflow./questions/36007911/… – Brian Vanderbusch Commented Apr 25, 2016 at 21:02
-
It's not a duplicate in the sense that the question is asking for a replacement of
doWhile
. – Bjorn Commented Nov 10, 2016 at 19:06
4 Answers
Reset to default 3We can create a utility function to create a second Observable that emits every item that the inner Observable emits; however, we will call the onCompleted function once our condition is met:
function takeUntilInclusive(inner$, predicate) {
return Rx.Observable.create(observer => {
var subscription = inner$.subscribe(item => {
observer.onNext(item);
if (predicate(item)) {
observer.onCompleted();
}
}, observer.onError, observer.onCompleted);
return () => {
subscription.dispose();
}
});
}
And here's a quick snippet using our new utility method:
const inner$ = Rx.Observable.range(0, 4);
const data$ = takeUntilInclusive(inner$, (x) => x > 2);
data$.subscribe(x => console.log(x));
// >> 0
// >> 1
// >> 2
// >> 3
This answer is based off: RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after
You can achieve this by using retry and first operators.
// helper observable that can return inplete/plete data or fail.
var server = Rx.Observable.create(function (observer) {
var x = Math.random();
if(x < 0.1) {
observer.next(true);
} else if (x < 0.5) {
observer.error("error");
} else {
observer.next(false);
}
observer.plete();
return function () {
};
});
function isComplete(data) {
return data;
}
var delay = 1000;
Rx.Observable.interval(delay)
.switchMap(() => {
return server
.do((data) => {
console.log('Server returned ' + data);
}, () => {
console.log('Server threw');
})
.retry(3);
})
.first((data) => isComplete(data))
.subscribe(() => {
console.log('Got pleted value');
}, () => {
console.log('Got error');
});
<script src="https://cdnjs.cloudflare./ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
It's an old question, but I also had to poll an endpoint and arrived at this question. Here's my own doWhile
operator I ended up creating:
import { pipe, from } from 'rxjs';
import { switchMap, takeWhile, filter, map } from 'rxjs/operators';
export function doWhile<T>(shouldContinue: (a: T) => boolean) {
return pipe(
switchMap((data: T) => from([
{ data, continue: true },
{ data, continue: shouldContinue(data), exclude: true }
])),
takeWhile(message => message.continue),
filter(message => !message.exclude),
map(message => message.data)
);
}
It's a little weird, but it works for me so far. You could use it with the take
like you were trying.
i was googling to find a do while behavior, i found this question. and then i found out that doWhile takes in a second param inclusive
boolean. so maybe you can do?:
takeWhile((data) => !isComplete(data), true)