I played around with angular2 and got stuck after a while.
Using http.get
works fine for a single request, but I want to poll live-data every 4 seconds, after tinkering for quite a while and reading a lot of reactivex stuff i ended up with:
Observable.timer(0,4000)
.flatMap(
() => this._http.get(this._url)
.share()
.map(this.extractData)
.catch(this.handleError)
)
.share();
Is there a simple way to start a (4 second) interval after the http.get
-observable has emitted the result of the request? (Or will I end up in observable-hell?)
Timeline i want:
Time(s): 0 - - - - - 1 - - - - - 2 - - - - - 3 - - - - - 4 - - - - - 5 - - - - - 6
Action: Request - - Response - - - - - - - - - - - - - - - - - - - -Request-...
Wait: | wait for 4 seconds -------------------------> |
I played around with angular2 and got stuck after a while.
Using http.get
works fine for a single request, but I want to poll live-data every 4 seconds, after tinkering for quite a while and reading a lot of reactivex stuff i ended up with:
Observable.timer(0,4000)
.flatMap(
() => this._http.get(this._url)
.share()
.map(this.extractData)
.catch(this.handleError)
)
.share();
Is there a simple way to start a (4 second) interval after the http.get
-observable has emitted the result of the request? (Or will I end up in observable-hell?)
Timeline i want:
Time(s): 0 - - - - - 1 - - - - - 2 - - - - - 3 - - - - - 4 - - - - - 5 - - - - - 6
Action: Request - - Response - - - - - - - - - - - - - - - - - - - -Request-...
Wait: | wait for 4 seconds -------------------------> |
Share
Improve this question
edited Jan 24, 2019 at 18:06
frido
14.1k5 gold badges44 silver badges58 bronze badges
asked Jun 21, 2016 at 8:02
SpazzMarticusSpazzMarticus
1,2771 gold badge21 silver badges42 bronze badges
8
|
Show 3 more comments
4 Answers
Reset to default 13Update to RxJS 6
import { timer } from 'rxjs';
import { concatMap, map, expand, catchError } from 'rxjs/operators';
pollData$ = this._http.get(this._url)
.pipe(
map(this.extractData),
catchError(this.handleError)
);
pollData$.pipe(
expand(_ => timer(4000).pipe(concatMap(_ => pollData$)))
).subscribe();
I'm using RxJS 5 and I'm not sure what the RxJS 4 equivalent operators are. Anyway here is my RxJS 5 solution, hope it helps:
var pollData = this._http.get(this._url)
.map(this.extractData)
.catch(this.handleError);
pollData.expand(
() => Observable.timer(4000).concatMap(() => pollData)
).subscribe();
The expand operator will emit the data and recursively start a new Observable with each emission
I managed to do it myself, with the only downside beeing that http.get
can't be repeated more easily.
pollData(): Observable<any> {
//Creating a subject
var pollSubject = new Subject<any>();
//Define the Function which subscribes our pollSubject to a new http.get observable (see _pollLiveData() below)
var subscribeToNewRequestObservable = () => {
this._pollLiveData()
.subscribe(
(res) => { pollSubject.next(res) }
);
};
//Subscribe our "subscription-function" to custom subject (observable) with 4000ms of delay added
pollSubject.delay(4000).subscribe(subscribeToNewRequestObservable);
//Call the "subscription-function" to execute the first request
subscribeToNewRequestObservable();
//Return observable of our subject
return pollSubject.asObservable();
}
private _pollLiveData() {
var url = 'http://localhost:4711/poll/';
return this._http.get(url)
.map(
(res) => { return res.json(); }
);
};
Here is why you can't use the more straight forward subscription:
var subscribeToNewRequestObservable = () => {
this._pollLiveData()
.subscribe(pollSubject);
};
The completion the http.get
-observable would also complete your subject and prevent it from emitting further items.
This is still a cold observable, so unless you subscribe to it no requests will be made.
this._pollService.pollData().subscribe(
(res) => { this.count = res.count; }
);
A minor rework of the answer from Can Nguyen, in case you want polling delay to depend on previous request completion status.
var pollData = () => request() // make request
.do(handler, errorHandler) // handle response data or error
.ignoreElements() // ignore request progress notifications
.materialize(); // wrap error/complete notif-ns into Notification
pollData() // get our Observable<Notification>...
.expand( // ...and recursively map...
(n) => Rx.Observable // ...each Notification object...
.timer(n.error ? 1000 : 5000) // ...(with delay depending on previous completion status)...
.concatMap(() => pollData())) // ...to new Observable<Notification>
.subscribe();
Plunk.
Or alternatively:
var pollData = () => request() // make request
.last() // take last progress value
.catch(() => Rx.Observable.of(null)); // replace error with null-value
pollData()
.expand(
(data) => Rx.Observable
.timer(data ? 5000 : 1000) // delay depends on a value
.concatMap(() => pollData()))
.subscribe((d) => {console.log(d);}); // can subscribe to the value stream at the end
Plunk.
You can try using interval if that is more convenient. Calling subscribe
gives you Subscription
that lets you cancel the polling after sometime.
let observer = Observable.interval(1000 * 4);
let subscription = observer.subsscribe(x => {
this._http.get(this._url)
.share()
.map(this.extractData)
.catch(this.handleError)
});
....
// if you don't require to poll anymore..
subscription.unsubscribe();
observable-hell
? – null canvas Commented Jun 21, 2016 at 8:17