How do I write a method that limits Q promise concurrency?
For instance, I have a method spawnProcess
. It returns a Q promise.
I want no more than 5 process spawned at a time, but transparently to the calling code.
What I need to implement is a function with signature
function limitConcurrency(promiseFactory, limit)
that I can call like
spawnProcess = limitConcurrency(spawnProcess, 5);
// use spawnProcess as usual
I already started working on my version, but I wonder if anyone has a concise implementation that I can check against.
How do I write a method that limits Q promise concurrency?
For instance, I have a method spawnProcess
. It returns a Q promise.
I want no more than 5 process spawned at a time, but transparently to the calling code.
What I need to implement is a function with signature
function limitConcurrency(promiseFactory, limit)
that I can call like
spawnProcess = limitConcurrency(spawnProcess, 5);
// use spawnProcess as usual
I already started working on my version, but I wonder if anyone has a concise implementation that I can check against.
Share Improve this question asked Dec 12, 2013 at 14:38 Dan AbramovDan Abramov 268k86 gold badges416 silver badges518 bronze badges 6- Are you writing code for the browser, or for Node? If it's the former, there is no concurrency... – Matt Ball Commented Dec 12, 2013 at 14:40
- @Matt: I'm writing for the node. I don't mean concurrency as in threading, I mean concurrency as in “promises running at the same time”. – Dan Abramov Commented Dec 12, 2013 at 14:41
- What did you try? Using a waiting queue and buffering requests shouldn't be too hard. – schlingel Commented Dec 12, 2013 at 14:43
- @schlingel: I'm not saying it's hard. As soon as I'm ready, I'll post the complete code. I'm just trying to figure out how to properly chain promises so the next one starts as soon as the previous one has finished. – Dan Abramov Commented Dec 12, 2013 at 14:46
- In case the request can be executed instantly you can return the deferred which is resolved by the process, in case you have to queue it you will have to use two seperate deferreds. One which calls a function which decreases the counter and additionally resolves the second deferred. The second deferred should be returned by the spawnProcess function. – schlingel Commented Dec 12, 2013 at 14:50
4 Answers
Reset to default 9I have a library that does this for you https://github.com/ForbesLindesay/throat
You can use it via browserify or download the standalone build from brcdn (https://www.brcdn.org/?module=throat&version=latest) and add it as a script tag.
Then (assuming the Promise
constructor is polyfilled or implemented in your environment) you can do:
//remove this line if using standalone build
var throat = require('throat');
function limitConcurrency(promiseFactory, limit) {
var fn = throat(promiseFactory, limit);
return function () {
return Q(fn.apply(this, arguments));
}
}
You could just call throat(promiseFactory, limit)
directly but that would return a promise promise rather than a Q promise.
I also really like using it with array.map.
// only allow 3 parallel downloads
var downloadedItems = Q.all(items.map(throat(download, 3)));
This seems to be working for me.
I'm not sure if I could simplify it. The recursion in scheduleNextJob
is necessary so the running < limit
and limit++
always execute in the same tick.
Also available as a gist.
'use strict';
var Q = require('q');
/**
* Constructs a function that proxies to promiseFactory
* limiting the count of promises that can run simultaneously.
* @param promiseFactory function that returns promises.
* @param limit how many promises are allowed to be running at the same time.
* @returns function that returns a promise that eventually proxies to promiseFactory.
*/
function limitConcurrency(promiseFactory, limit) {
var running = 0,
semaphore;
function scheduleNextJob() {
if (running < limit) {
running++;
return Q();
}
if (!semaphore) {
semaphore = Q.defer();
}
return semaphore.promise
.finally(scheduleNextJob);
}
function processScheduledJobs() {
running--;
if (semaphore && running < limit) {
semaphore.resolve();
semaphore = null;
}
}
return function () {
var args = arguments;
function runJob() {
return promiseFactory.apply(this, args);
}
return scheduleNextJob()
.then(runJob)
.finally(processScheduledJobs);
};
}
module.exports = {
limitConcurrency: limitConcurrency
}
The Deferred promise implementation has gate
function which works exactly that way:
spawnProcess = deferred.gate(spawnProcess, 5);
I wrote a little library to do this: https://github.com/suprememoocow/qlimit
It's extremely easy to use and is specifically designed to work with Q promises:
var qlimit = require('qlimit');
var limit = qlimit(2); // 2 being the maximum concurrency
// Using the same example as above
return Q.all(items.map(limit(function(item, index, collection) {
return performOperationOnItem(item);
}));
It can also be used to limit concurrency to a specific resource, like this:
var qlimit = require('qlimit');
var limit = qlimit(2); // 2 being the maximum concurrency
var fetchSomethingFromEasilyOverwhelmedBackendServer = limit(function(id) {
// Emulating the backend service
return Q.delay(1000)
.thenResolve({ hello: 'world' });
});