最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - Batch requests in Node.js - Stack Overflow

programmeradmin4浏览0评论

My program is communicating with a web service that only accepts ~10 requests per second. From time to time, my program sends 100+ concurrent requests to the web service, causing my program to crash.

How do I limit concurrent requests in Node.js to 5 per second? Im using the request library.

 // IF EVENT AND SENDER
    if(data.sender[0].events && data.sender[0].events.length > 0) {


        // FIND ALL EVENTS
        for(var i = 0; i < data.sender[0].events.length; i++) {

            // IF TYPE IS "ADDED"
            if(data.sender[0].events[i].type == "added") {
                switch (data.sender[0].events[i].link.rel) {
                    case "contact" :
                        batch("added", data.sender[0].events[i].link.href);
                        //_initContacts(data.sender[0].events[i].link.href);
                        break;
                } 
            // IF TYPE IS "UPDATED"
            } else if(data.sender[0].events[i].type == "updated") {

                switch (data.sender[0].events[i].link.rel){                     
                    case "contactPresence" :
                        batch("updated", data.sender[0].events[i].link.href);
                        //_getContactPresence(data.sender[0].events[i].link.href);
                        break;
                    case "contactNote" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactNote(data.sender[0].events[i].link.href);
                        break;
                    case "contactLocation" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactLocation(data.sender[0].events[i].link.href);
                        break;
                    case "presenceSubscription" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _extendPresenceSubscription(data.sender[0].events[i].link.href);
                        break;
                }
            }
        };

And then the homegrown batch method:

var updated = [];
var added = [];

var batch = function(type, url){
    console.log("batch called");


    if (type === "added"){
        console.log("Added batched");
        added.push(url);
        if (added.length > 5) {
            setTimeout(added.forEach(function(req){
                _initContacts(req);
            }), 2000);
            added = [];
        }
    } 
    else if (type === "updated"){
        console.log("Updated batched");
        updated.push(url);
        console.log("Updated length is : ", updated.length);
        if (updated.length > 5){
            console.log("Over 5 updated events");
            updated.forEach(function(req){
                setTimeout(_getContactLocation(req), 2000);
            });
            updated = [];
        }
    }       
};

And an example of the actual request:

var _getContactLocation = function(url){
    r.get(baseUrl + url, 
    { "strictSSL" : false, "headers" : { "Authorization" : "Bearer " + accessToken }}, 
        function(err, res, body){
            if(err)
                console.log(err);
            else {
                var data = JSON.parse(body);
                self.emit("data.contact", data);
            }
        }
    );
};

My program is communicating with a web service that only accepts ~10 requests per second. From time to time, my program sends 100+ concurrent requests to the web service, causing my program to crash.

How do I limit concurrent requests in Node.js to 5 per second? Im using the request library.

 // IF EVENT AND SENDER
    if(data.sender[0].events && data.sender[0].events.length > 0) {


        // FIND ALL EVENTS
        for(var i = 0; i < data.sender[0].events.length; i++) {

            // IF TYPE IS "ADDED"
            if(data.sender[0].events[i].type == "added") {
                switch (data.sender[0].events[i].link.rel) {
                    case "contact" :
                        batch("added", data.sender[0].events[i].link.href);
                        //_initContacts(data.sender[0].events[i].link.href);
                        break;
                } 
            // IF TYPE IS "UPDATED"
            } else if(data.sender[0].events[i].type == "updated") {

                switch (data.sender[0].events[i].link.rel){                     
                    case "contactPresence" :
                        batch("updated", data.sender[0].events[i].link.href);
                        //_getContactPresence(data.sender[0].events[i].link.href);
                        break;
                    case "contactNote" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactNote(data.sender[0].events[i].link.href);
                        break;
                    case "contactLocation" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactLocation(data.sender[0].events[i].link.href);
                        break;
                    case "presenceSubscription" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _extendPresenceSubscription(data.sender[0].events[i].link.href);
                        break;
                }
            }
        };

And then the homegrown batch method:

var updated = [];
var added = [];

var batch = function(type, url){
    console.log("batch called");


    if (type === "added"){
        console.log("Added batched");
        added.push(url);
        if (added.length > 5) {
            setTimeout(added.forEach(function(req){
                _initContacts(req);
            }), 2000);
            added = [];
        }
    } 
    else if (type === "updated"){
        console.log("Updated batched");
        updated.push(url);
        console.log("Updated length is : ", updated.length);
        if (updated.length > 5){
            console.log("Over 5 updated events");
            updated.forEach(function(req){
                setTimeout(_getContactLocation(req), 2000);
            });
            updated = [];
        }
    }       
};

And an example of the actual request:

var _getContactLocation = function(url){
    r.get(baseUrl + url, 
    { "strictSSL" : false, "headers" : { "Authorization" : "Bearer " + accessToken }}, 
        function(err, res, body){
            if(err)
                console.log(err);
            else {
                var data = JSON.parse(body);
                self.emit("data.contact", data);
            }
        }
    );
};
Share Improve this question edited Aug 19, 2013 at 18:10 mupersan82 asked Aug 19, 2013 at 17:40 mupersan82mupersan82 5674 gold badges6 silver badges17 bronze badges 5
  • Would you consider using the async library? – George Commented Aug 19, 2013 at 17:47
  • We'd need some of your code posted to provide a truly helpful answer. You might try: setTimeout(makeARequest, 200); – dc5 Commented Aug 19, 2013 at 17:48
  • Will this not just cause all requests to be sent simultaneously 200 ms later? – mupersan82 Commented Aug 19, 2013 at 18:03
  • Async looks interesting. Will look into it. – mupersan82 Commented Aug 19, 2013 at 18:03
  • Thats why the pseudo code was makeARequest :) @GeorgeP 's updated answer below is a good start. – dc5 Commented Aug 19, 2013 at 19:17
Add a comment  | 

2 Answers 2

Reset to default 11

Using the async library, the mapLimit function does exactly what you want. I can't provide an example for your specific use case as you did not provide any code.

From the readme:


mapLimit(arr, limit, iterator, callback)

The same as map only no more than "limit" iterators will be simultaneously running at any time.

Note that the items are not processed in batches, so there is no guarantee that the first "limit" iterator functions will complete before any others are started.

Arguments

  • arr - An array to iterate over.
  • limit - The maximum number of iterators to run at any time.
  • iterator(item, callback) - A function to apply to each item in the array. The iterator is passed a callback(err, transformed) which must be called once it has completed with an error (which can be null) and a transformed item.
  • callback(err, results) - A callback which is called after all the iterator functions have finished, or an error has occurred. Results is an array of the transformed items from the original array.

Example

async.mapLimit(['file1','file2','file3'], 1, fs.stat, function(err, results){ // results is now an array of stats for each file });


EDIT: Now that you provided code, I see that your use is a bit different from what I assumed. The async library is more useful when you know all the tasks to run up front. I don't know of a library off hand that will easily solve this for you. The above note is likely still relevant to people searching this topic so I'll leave it in.

Sorry, I don't have time to restructure your code, but this is an (un-tested) example of a function that makes an asynchronous request while self-throttling itself to 5 requests per second. I would highly recommend working off of this to come up with a more general solution that fits your code base.

var throttledRequest = (function () {
    var queue = [], running = 0;

    function sendPossibleRequests() {
        var url;
        while (queue.length > 0 && running < 5) {
            url = queue.shift();
            running++;
            r.get(url, { /* YOUR OPTIONS HERE*/ }, function (err, res, body) {
                running--;
                sendPossibleRequests();

                if(err)
                    console.log(err);
                else {
                    var data = JSON.parse(body);
                    self.emit("data.contact", data);
                }
            });
        }
    }

    return function (url) {
        queue.push(url);
        sendPossibleRequests();
    };
})();

Basically, you keep a queue of all the data to be asynchronously processed (such as urls to be requested) and then after each callback (from a request) you try to launch off as many remaining requests as possible.

This is precisely what node's Agent class is designed to address. Have you done something silly like require('http').globalAgent.maxSockets = Number.MAX_VALUE or passed agent: false as a request option?

With Node's default behavior, your program will not send more than 5 concurrent requests at a time. Additionally, the Agent provides optimizations that a simple queue cannot (namely HTTP keepalives).

If you try to make many requests (for example, issue 100 requests from a loop), the first 5 will begin and the Agent will queue the remaining 95. As requests complete, it starts the next.

What you probably want to do is create an Agent for your web service requests, and pass it in to every call to request (rather than mixing requests in with the global agent).

var http=require('http'), svcAgent = http.Agent();

request({ ... , agent: svcAgent });
发布评论

评论列表(0)

  1. 暂无评论