最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - Limit concurrency of pending promises - Stack Overflow

programmeradmin1浏览0评论

I'm looking for a promise function wrapper that can limit / throttle when a given promise is running so that only a set number of that promise is running at a given time.

In the case below delayPromise should never run concurrently, they should all run one at a time in a first-come-first-serve order.

import Promise from 'bluebird'

function _delayPromise (seconds, str) {
  console.log(str)
  return Promise.delay(seconds)
}

let delayPromise = limitConcurrency(_delayPromise, 1)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(100, "a:b")
  await delayPromise(100, "a:c")
}

async function b() {
  await delayPromise(100, "b:a")
  await delayPromise(100, "b:b")
  await delayPromise(100, "b:c")
}

a().then(() => console.log('done'))

b().then(() => console.log('done'))

Any ideas on how to get a queue like this set up?

I have a "debounce" function from the wonderful Benjamin Gruenbaum. I need to modify this to throttle a promise based on it's own execution and not the delay.

export function promiseDebounce (fn, delay, count) {
  let working = 0
  let queue = []
  function work () {
    if ((queue.length === 0) || (working === count)) return
    working++
    Promise.delay(delay).tap(function () { working-- }).then(work)
    var next = queue.shift()
    next[2](fn.apply(next[0], next[1]))
  }
  return function debounced () {
    var args = arguments
    return new Promise(function (resolve) {
      queue.push([this, args, resolve])
      if (working < count) work()
    }.bind(this))
  }
}

I'm looking for a promise function wrapper that can limit / throttle when a given promise is running so that only a set number of that promise is running at a given time.

In the case below delayPromise should never run concurrently, they should all run one at a time in a first-come-first-serve order.

import Promise from 'bluebird'

function _delayPromise (seconds, str) {
  console.log(str)
  return Promise.delay(seconds)
}

let delayPromise = limitConcurrency(_delayPromise, 1)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(100, "a:b")
  await delayPromise(100, "a:c")
}

async function b() {
  await delayPromise(100, "b:a")
  await delayPromise(100, "b:b")
  await delayPromise(100, "b:c")
}

a().then(() => console.log('done'))

b().then(() => console.log('done'))

Any ideas on how to get a queue like this set up?

I have a "debounce" function from the wonderful Benjamin Gruenbaum. I need to modify this to throttle a promise based on it's own execution and not the delay.

export function promiseDebounce (fn, delay, count) {
  let working = 0
  let queue = []
  function work () {
    if ((queue.length === 0) || (working === count)) return
    working++
    Promise.delay(delay).tap(function () { working-- }).then(work)
    var next = queue.shift()
    next[2](fn.apply(next[0], next[1]))
  }
  return function debounced () {
    var args = arguments
    return new Promise(function (resolve) {
      queue.push([this, args, resolve])
      if (working < count) work()
    }.bind(this))
  }
}
Share Improve this question edited Feb 12, 2023 at 10:08 trincot 350k36 gold badges271 silver badges322 bronze badges asked Aug 4, 2016 at 23:05 ThomasReggiThomasReggi 59.4k97 gold badges257 silver badges459 bronze badges 12
  • 2 async.js and queue.js both support configurable concurrency. – nrabinowitz Commented Aug 4, 2016 at 23:07
  • For arrays or managing the state of a given function and it's instances? – ThomasReggi Commented Aug 4, 2016 at 23:08
  • 2 when a given promise is running so that only a set number of that promise is running at a given time - the code has 6 promises, each will run exactly once - concurrently, but a given promise is run exactly once - the question is poorly worded at best – Jaromanda X Commented Aug 4, 2016 at 23:38
  • 3 Just to be clear here. Promises don't "run". A promise is proxy for the result of an async operation that is already running. – jfriend00 Commented Aug 5, 2016 at 0:52
  • 1 es6-promise-pool? promise-limit? cwait? Not relevant, or not known? – Lee Goddard Commented Nov 7, 2016 at 7:46
 |  Show 7 more comments

6 Answers 6

Reset to default 10

I don't think there are any libraries to do this, but it's actually quite simple to implement yourself:

function sequential(fn) { // limitConcurrency(fn, 1)
    let q = Promise.resolve();
    return function(x) {
        const p = q.then(() => fn(x));
        q = p.reflect();
        return p;
    };
}

For multiple concurrent requests it gets a little trickier, but can be done as well.

function limitConcurrency(fn, n) {
    if (n == 1) return sequential(fn); // optimisation
    let q = Promise.resolve();
    const active = new Set();
    const fst = t => t[0];
    const snd = t => t[1];
    return function(x) {
        function put() {
            const p = fn(x);
            const a = p.reflect().then(() => {
                active.delete(a);
            });
            active.add(a);
            return [Promise.race(active), p];
        }
        if (active.size < n) {
            const r = put()
            q = fst(t);
            return snd(t);
        } else {
            const r = q.then(put);
            q = r.then(fst);
            return r.then(snd)
        }
    };
}

Btw, you might want to have a look at the actors model and CSP. They can simplify dealing with such things, there are a few JS libraries for them out there as well.

Example

import Promise from 'bluebird'

function sequential(fn) {
  var q = Promise.resolve();
  return (...args) => {
    const p = q.then(() => fn(...args))
    q = p.reflect()
    return p
  }
}

async function _delayPromise (seconds, str) {
  console.log(`${str} started`)
  await Promise.delay(seconds)
  console.log(`${str} ended`)
  return str
}

let delayPromise = sequential(_delayPromise)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(200, "a:b")
  await delayPromise(300, "a:c")
}

async function b() {
  await delayPromise(400, "b:a")
  await delayPromise(500, "b:b")
  await delayPromise(600, "b:c")
}

a().then(() => console.log('done'))
b().then(() => console.log('done'))

// --> with sequential()

// $ babel-node test/t.js
// a:a started
// a:a ended
// b:a started
// b:a ended
// a:b started
// a:b ended
// b:b started
// b:b ended
// a:c started
// a:c ended
// b:c started
// done
// b:c ended
// done

// --> without calling sequential()

// $ babel-node test/t.js
// a:a started
// b:a started
// a:a ended
// a:b started
// a:b ended
// a:c started
// b:a ended
// b:b started
// a:c ended
// done
// b:b ended
// b:c started
// b:c ended
// done

Use the throttled-promise module:

https://www.npmjs.com/package/throttled-promise

var ThrottledPromise = require('throttled-promise'),
    promises = [
        new ThrottledPromise(function(resolve, reject) { ... }),
        new ThrottledPromise(function(resolve, reject) { ... }),
        new ThrottledPromise(function(resolve, reject) { ... })
    ];

// Run promises, but only 2 parallel
ThrottledPromise.all(promises, 2)
.then( ... )
.catch( ... );

I have the same problem. I wrote a library to implement it. Code is here. I created a queue to save all the promises. When you push some promises to the queue, the first several promises at the head of the queue would be popped and running. Once one promise is done, the next promise in the queue would also be popped and running. Again and again, until the queue has no Task. You can check the code for details. Hope this library would help you.

Advantages

  • you can define the amount of concurrent promises (near simultaneous requests)
  • consistent flow: once one promise resolve, another request start no need to guess the server capability
  • robust against data choke, if the server stop for a moment, it will just wait, and next tasks will not start just because the clock allowed
  • do not rely on a 3rd party module it is Vanila node.js

1st thing is to make https a promise, so we can use wait to retrieve data (removed from the example) 2nd create a promise scheduler that submit another request as any promise get resolved. 3rd make the calls

Limiting requests taking by limiting the amount of concurrent promises

const https = require('https')

function httpRequest(method, path, body = null) {
  const reqOpt = { 
    method: method,
    path: path,
    hostname: 'dbase.ez-mn.net', 
    headers: {
      "Content-Type": "application/json",
      "Cache-Control": "no-cache"
    }
  }
  if (method == 'GET') reqOpt.path = path + '&max=20000'
  if (body) reqOpt.headers['Content-Length'] = Buffer.byteLength(body);
  return new Promise((resolve, reject) => {
  const clientRequest = https.request(reqOpt, incomingMessage => {
      let response = {
          statusCode: incomingMessage.statusCode,
          headers: incomingMessage.headers,
          body: []
      };
      let chunks = ""
      incomingMessage.on('data', chunk => { chunks += chunk; });
      incomingMessage.on('end', () => {
          if (chunks) {
              try {
                  response.body = JSON.parse(chunks);
              } catch (error) {
                  reject(error)
              }
          }
          console.log(response)
          resolve(response);
      });
  });
  clientRequest.on('error', error => { reject(error); });
  if (body) { clientRequest.write(body)  }  
  clientRequest.end();

  });
}

    const asyncLimit = (fn, n) => {
      const pendingPromises = new Set();

  return async function(...args) {
    while (pendingPromises.size >= n) {
      await Promise.race(pendingPromises);
    }

    const p = fn.apply(this, args);
    const r = p.catch(() => {});
    pendingPromises.add(r);
    await r;
    pendingPromises.delete(r);
    return p;
  };
};
// httpRequest is the function that we want to rate the amount of requests
// in this case, we set 8 requests running while not blocking other tasks (concurrency)


let ratedhttpRequest = asyncLimit(httpRequest, 8);

// this is our datase and caller    
let process = async () => {
  patchData=[
      {path: '/rest/slots/80973975078587', body:{score:3}},
      {path: '/rest/slots/809739750DFA95', body:{score:5}},
      {path: '/rest/slots/AE0973750DFA96', body:{score:5}}]

  for (let i = 0; i < patchData.length; i++) {
    ratedhttpRequest('PATCH', patchData[i].path,  patchData[i].body)
  }
  console.log('completed')
}

process() 

I wrote @watchable/nevermore to meet this requirement, as well as permitting combinations of concurrency with rate-limiting, timeout, backoff and so on.

You could achieve your requirement with a limiter function like so...

import { createExecutorStrategy } from "@watchable/nevermore";

const { createExecutor } = createExecutorStrategy({
  "concurrency": 1,
})

const _delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

const delay = createExecutor(_delay);

However, you could also go further and have rate limits, retries and so on...

const { createExecutor } = createExecutorStrategy({
  concurrency: 1,
  intervalMs: 100,
  backoffMs: 1000,
  timeoutMs: 3000,
  retries: 3,
}); 

Full API documentation is at https://watchable.dev/api/modules/_watchable_nevermore.html and install from npm at https://www.npmjs.com/package/@watchable/nevermore

The classic way of running async processes in series is to use async.js and use async.series(). If you prefer promise based code then there is a promise version of async.js: async-q

With async-q you can once again use series:

async.series([
    function(){return delayPromise(100, "a:a")},
    function(){return delayPromise(100, "a:b")},
    function(){return delayPromise(100, "a:c")}
])
.then(function(){
    console.log(done);
});

Running two of them at the same time will run a and b concurrently but within each they will be sequential:

// these two will run concurrently but each will run
// their array of functions sequentially:
async.series(a_array).then(()=>console.log('a done'));
async.series(b_array).then(()=>console.log('b done'));

If you want to run b after a then put it in the .then():

async.series(a_array)
.then(()=>{
    console.log('a done');
    return async.series(b_array);
})
.then(()=>{
    console.log('b done');
});

If instead of running each sequentially you want to limit each to run a set number of processes concurrently then you can use parallelLimit():

// Run two promises at a time:
async.parallelLimit(a_array,2)
.then(()=>console.log('done'));

Read up the async-q docs: https://github.com/dbushong/async-q/blob/master/READJSME.md

发布评论

评论列表(0)

  1. 暂无评论