I have an array of objects. For each object I need to trigger an asynchronous request (http call). But I only want to have a certain maximum of requests running at the same time. Also, it would be nice (but not neccessary) if I could have one single synchronization point after all requests finished to execute some code.
I've tried suggestions from:
Limit number of requests at a time with RxJS
How to limit the concurrency of flatMap?
Fire async request in parallel but get result in order using rxjs
and many more... I even tried making my own operators.
Either the answers on those pages are too old to work with my code or I can't figure out how to put everything together so all types fit nicely.
This is what I have so far:
for (const obj of objects) {
this.myService.updateObject(obj).subscribe(value => {
this.anotherService.set(obj);
});
}
EDIT 1: Ok, I think we're getting there! With the answers of Julius and pschild (both seem to work equally) I managed to limit the number of requests. But now it will only fire the first batch of 4 and never fire the rest. So now I have:
const concurrentRequests = 4;
from(objects)
.pipe(
mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
tap(result => this.anotherService.set(result))
).subscribe();
Am I doing something wrong with the subscribe()
?
Btw: The mergeMap
with resultSelector
parameter is deprecated, so I used mergeMap
without it.
Also, the obj
of the mergeMap
is not visible in the tap
, so I had to use tap
's parameter
EDIT 2:
Make sure your observers plete! (It cost me a whole day)
I have an array of objects. For each object I need to trigger an asynchronous request (http call). But I only want to have a certain maximum of requests running at the same time. Also, it would be nice (but not neccessary) if I could have one single synchronization point after all requests finished to execute some code.
I've tried suggestions from:
Limit number of requests at a time with RxJS
How to limit the concurrency of flatMap?
Fire async request in parallel but get result in order using rxjs
and many more... I even tried making my own operators.
Either the answers on those pages are too old to work with my code or I can't figure out how to put everything together so all types fit nicely.
This is what I have so far:
for (const obj of objects) {
this.myService.updateObject(obj).subscribe(value => {
this.anotherService.set(obj);
});
}
EDIT 1: Ok, I think we're getting there! With the answers of Julius and pschild (both seem to work equally) I managed to limit the number of requests. But now it will only fire the first batch of 4 and never fire the rest. So now I have:
const concurrentRequests = 4;
from(objects)
.pipe(
mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
tap(result => this.anotherService.set(result))
).subscribe();
Am I doing something wrong with the subscribe()
?
Btw: The mergeMap
with resultSelector
parameter is deprecated, so I used mergeMap
without it.
Also, the obj
of the mergeMap
is not visible in the tap
, so I had to use tap
's parameter
EDIT 2:
Make sure your observers plete! (It cost me a whole day)
Share Improve this question edited May 19, 2019 at 11:18 Genome Prime asked May 18, 2019 at 8:14 Genome PrimeGenome Prime 451 silver badge9 bronze badges 7- Yes, you're right with the deprecation :-) I updated my answer accordingly. Could you have a look at my example at Stackblitz? I cannot reproduce the errors you got... maybe you could also create an example showing the errors? – pschild Commented May 18, 2019 at 21:35
- I cannot reproduce the problem in Stackblitz. I don't think it has anything to do with angular/electron/nodejs... at least I hope. Anyway here is a Stackblitz which is more similar to my code: https://stackblitz./edit/rxjs-dawwsl?file=index.ts I swear I checked the code character for character, but still.. The only thing I can say is that my code doesn't hit the finalize method. But the first 4 requests all go through the pipe nicely. – Genome Prime Commented May 18, 2019 at 22:29
-
Oh and also, there are no errors on the console. How can I find out where and why it's stuck? I've tried with the
catchError
operator - no luck. – Genome Prime Commented May 18, 2019 at 22:36 -
I think I'm losing my mind.. I copied the stackblitz code into my app and it works... so I can rule out my stack electron/angular etc. I've created another stackblitz which es even closer to my code: https://stackblitz./edit/angular-7-master-yf1cik And I've tried the error function inside the
subscribe
method... nothing – Genome Prime Commented May 19, 2019 at 10:19 - 1 Nice! Glad I could help! – pschild Commented May 19, 2019 at 11:23
3 Answers
Reset to default 4You can use the third parameter of mergeMap
to limit the number of concurrent inner subscriptions. Use finalize
to execute something after all requests finished:
const concurrentRequests = 5;
from(objects)
.pipe(
mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
tap(res => this.anotherService.set(res))),
finalize(() => console.log('Sequence plete'))
);
See the example on Stackblitz.
from(objects).pipe(
bufferCount(10),
concatMap(objs => forkJoin(objs.map(obj =>
this.myService.updateObject(obj).pipe(
tap(value => this.anotherService.set(obj))
)))),
finalize(() => console.log('all requests are done'))
)
Code is not tested, but you get the idea. Let me know if any error or explanation is needed
I had the same issue once. When I tried to load multiple images from server. I had to send http requests one after another. I achieved desired oute using awaited promise. Here is the sample code:
async ngOnInit() {
for (const number of this.numbers) {
await new Promise(resolve => {
this.http.get(`https://jsonplaceholder.typicode./todos/${number}`).subscribe(
data => {
this.responses.push(data);
console.log(data);
resolve();
}
);
});
}
}
Main idea is here to resolve the promise once you get the response. With this technique you can e up with custom logic to execute one method once all the requests finished.
Here is the stackblitz. Open up the console to see it in action. :)