Is there a technique to break an async loop, if it do not pletes within an expected time period? I have a code like this:
(async()=>{
for await(let t of asynDataStreamOrGenerator){
//some data processing
}
//some other code I need to run based on whatever data collected by
//asyncDataStreamOrGenerators within given time period
})()
If this loop is not pleted within a timespan, break out of the loop and process the request further.
Is there a technique to break an async loop, if it do not pletes within an expected time period? I have a code like this:
(async()=>{
for await(let t of asynDataStreamOrGenerator){
//some data processing
}
//some other code I need to run based on whatever data collected by
//asyncDataStreamOrGenerators within given time period
})()
If this loop is not pleted within a timespan, break out of the loop and process the request further.
Share Improve this question edited Oct 12, 2021 at 13:05 Heretic Monkey 12.1k7 gold badges61 silver badges131 bronze badges asked Oct 12, 2021 at 13:01 Anurag VohraAnurag Vohra 1,9931 gold badge17 silver badges34 bronze badges 11-
Seems like that should be the responsibility of
asyncDataStreamOrGenerator
to me... – Heretic Monkey Commented Oct 12, 2021 at 13:10 - what's wrong with simply check a flag at each iteration? (would not break current await, but is it really important?) – apple apple Commented Oct 12, 2021 at 13:12
-
2
const startTime = Date.now(); for(....) { if (Date.now() - startTime > XXXXXX) break;
– epascarello Commented Oct 12, 2021 at 13:12 -
1
Wrap it in something you do control, that you can cancel.
for await ... of
is really for async generators, so you should have opportunity to not generate any more if the timeout lapses (using the technique proposed by @T.J.Crowder). – Heretic Monkey Commented Oct 12, 2021 at 13:23 - 1 What is the asynchronous code? How is the call being made? The better answer depends on the exact details which I asked for after my first ment. – epascarello Commented Oct 12, 2021 at 13:28
3 Answers
Reset to default 3(See also the munity wiki answer I posted with an alternative approach.)
In a ment you've said:
I am designing a consensus algorithm, where every source needs to send the response within a given time frame. If some of such participants are dead!, I mean they do not send values, the loop will be held for ever!
That sounds like a timeout to me. The usual way to implement a timeout is via Promise.race
with a promise wrapped around a timer mechanism (setTimeout
or similar). Promise.race
watches the promises you pass into it and settles as soon as any of them settles (passing on that fulfillment or rejection), disregarding how any of the others settle later.
To do that, you'll need to loop another way instead of for-await-of
and use the promise of the result object directly rather than indirectly. Let's say you have a utility function:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
That returns a promise it fulfills X milliseconds later with whatever value you provide (if any).
Then:
(async () => {
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
if (result === GOT_TIMEOUT) {
// Didn't get a response in time
console.log("Timeout");
} else {
// Got a response
if (result.done) {
// Iteration plete
console.log("Iteration plete");
break;
}
// ...some data processing on `result.value`...
console.log(`Process ${result.value}`);
}
}
} finally {
try {
it.return?.(); // Close the iterator if it needs closing
} catch { }
}
})();
Live Example using random durations for the async iterator's work, but forcing a timeout on the third iteration:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 3 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example();
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Didn't get a response in time
console.log(`Got timeout in ${elapsed}ms`);
} else {
// Got a response
if (result.done) {
// Iteration plete
console.log(`Got iteration plete result in ${elapsed}ms`);
break;
}
// ...some data processing on `result.value`...
console.log(`Got result ${result.value} to process in ${elapsed}ms`);
}
}
} finally {
try {
it.return?.(); // Close the iterator if it needs closing
} catch { }
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
Here's that example with the timeout on the first iteration, since you seemed concerned about that case:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 1 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example();
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Didn't get a response in time
console.log(`Got timeout in ${elapsed}ms`);
} else {
// Got a response
if (result.done) {
// Iteration plete
console.log(`Got iteration plete result in ${elapsed}ms`);
break;
}
// ...some data processing on `result.value`...
console.log(`Got result ${result.value} to process in ${elapsed}ms`);
}
}
} finally {
try {
it.return?.(); // Close the iterator if it needs closing
} catch { }
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
If you don't want the processing to hold up collection of the next value, you could not await
the processing that you do (perhaps build up an array of the promises for pletion of that processing and Promise.all
them at the end).
Or if you want to bail out of the entire operation:
(async () => {
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const results = [];
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
if (result === GOT_TIMEOUT) {
// Didn't get a response in time, bail
console.log("Timeout");
break;
}
// Got a response
if (result.done) {
// Iteration plete
console.log("Iteration plete");
break;
}
console.log(`Got ${result.value}`);
results.push(result.value);
}
} finally {
try {
it.return?.();
} catch { }
}
// ...code here to process the contents of `results`...
for (const value of results) {
console.log(`Process ${value}`);
}
})();
Live Example:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 3 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example(); // For the example
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const results = [];
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Didn't get a response in time, bail
console.log(`Got timeout after ${elapsed}ms`);
break;
}
// Got a response
if (result.done) {
// Iteration plete
console.log(`Got iteration plete after ${elapsed}ms`);
break;
}
console.log(`Got value ${result.value} after ${elapsed}ms`);
results.push(result.value);
}
} finally {
try {
it.return?.();
} catch { }
}
// ...code here to process the contents of `results`...
for (const value of results) {
console.log(`Process ${value}`);
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
And again where it times out on the first pass but not every pass (since this bails on the first timeout, we don't see subsequent ones):
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
const ms = i === 1 ? 600 : Math.floor(Math.random() * 100);
await delay(ms);
yield i;
}
}
(async () => {
const asynDataStreamOrGenerator = example(); // For the example
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
const results = [];
const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
try {
while (true) {
const p = it.next();
const start = Date.now();
const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
const elapsed = Date.now() - start;
if (result === GOT_TIMEOUT) {
// Didn't get a response in time, bail
console.log(`Got timeout after ${elapsed}ms`);
break;
}
// Got a response
if (result.done) {
// Iteration plete
console.log(`Got iteration plete after ${elapsed}ms`);
break;
}
console.log(`Got value ${result.value} after ${elapsed}ms`);
results.push(result.value);
}
} finally {
try {
it.return?.();
} catch { }
}
// ...code here to process the contents of `results`...
for (const value of results) {
console.log(`Process ${value}`);
}
})();
.as-console-wrapper {
max-height: 100% !important;
}
Or some bination of the two. You'll need to tweak this based on what you're really doing, but that's a direction that seems reasonable.
In all of the above:
- Replace
it.return?.();
withif (it.return) { it.return(); }
if your environment doesn't support optional chaining yet. - Replace
catch { }
withcatch (e) { }
if your environment doesn't support optionalcatch
bindings yet.
You can use a timeout Promise
(timer
in the code)
and use Promise.race
on each iteration.
The code below would print up to around 30 while the generator can generate more.
async function wait(ms) {
return new Promise(r=>setTimeout(r, ms))
}
async function* asynDataStreamOrGenerator() {
for (let i = 0; i < 100; ++i) {
await wait(30)
yield i;
}
}
async function* iterate_until(generator, timeout) {
let timer = wait(timeout).then(_=>Promise.reject("TimeOut"))
for (;;) {
let it = generator.next()
let result = await Promise.race([timer, it])
if (result.done) break;
yield result.value;
}
}
{(async () => {
try {
for await (let t of iterate_until(asynDataStreamOrGenerator(), 1000)) {
console.log(t)
}
} catch (e) { /* catch timeout, rethrow if needed*/ }
})()}
JSFiddle Link
In the ments on the question, Heretic Monkey suggested wrapping the async iterable in another one that implements a timeout. That's a very good idea, because then the code using that wrapper can use for-await-of
.
If you want to keep going after a timeout, it looks something like this:
async function* timeoutWrapper(asyncIterable, timeoutDuration, timeoutValue) {
const it = asyncIterable[Symbol.asyncIterator]();
try {
while (true) {
const result = await Promise.race([
it.next(),
delay(timeoutDuration, timeoutValue)
]);
if (result === timeoutValue) {
yield timeoutValue;
} else if (result.done) {
break;
} else {
yield result.value;
}
}
} finally {
it.return?.();
}
}
Using it:
for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT, GOT_TIMEOUT)) {
if (t === GOT_TIMEOUT) {
// Didn't get a response in time
console.log("Timeout");
} else {
// Got a response
// ...some data processing on `result.value`...
console.log(`Process ${t}`);
}
}
console.log("Done");
Live Example:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
await delay(200 + i * 100);
yield i;
}
}
async function* timeoutWrapper(asyncIterable, timeoutDuration, timeoutValue) {
const it = asyncIterable[Symbol.asyncIterator]();
try {
while (true) {
const result = await Promise.race([
it.next(),
delay(timeoutDuration, timeoutValue)
]);
if (result === timeoutValue) {
yield timeoutValue;
} else if (result.done) {
break;
} else {
yield result.value;
}
}
} finally {
it.return?.();
}
}
(async () => {
const asynDataStreamOrGenerator = example();
const TIMEOUT = 500; // Milliseconds
const GOT_TIMEOUT = {};
for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT, GOT_TIMEOUT)) {
if (t === GOT_TIMEOUT) {
// Didn't get a response in time
console.log("Timeout");
} else {
// Got a response
// ...some data processing on `result.value`...
console.log(`Process ${t}`);
}
}
console.log("Done");
})();
If you don't want to keep going after a timeout, you could have it throw an error (terminating the iteration) instead:
async function* timeoutWrapper(asyncIterable, timeoutDuration) {
const TIMEOUT_VALUE = {};
const it = asyncIterable[Symbol.asyncIterator]();
try {
while (true) {
const result = await Promise.race([
it.next(),
delay(timeoutDuration, TIMEOUT_VALUE)
]);
if (result === TIMEOUT_VALUE) {
throw new Error(`Timeout after ${timeoutDuration}ms`);
} else if (result.done) {
break;
} else {
yield result.value;
}
}
} finally {
it.return?.();
}
}
Using it:
try {
for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT)) {
// Got a response
console.log(`Process ${t}`);
}
console.log("Done");
} catch (e) {
console.error(e.message);
}
Live Example:
const delay = (ms, value) => new Promise(resolve => {
setTimeout(resolve, ms, value);
});
async function* example() {
for (let i = 1; i <= 6; ++i) {
await delay(200 + i * 100);
yield i;
}
}
async function* timeoutWrapper(asyncIterable, timeoutDuration) {
const TIMEOUT_VALUE = {};
const it = asyncIterable[Symbol.asyncIterator]();
try {
while (true) {
const result = await Promise.race([
it.next(),
delay(timeoutDuration, TIMEOUT_VALUE)
]);
if (result === TIMEOUT_VALUE) {
throw new Error(`Timeout after ${timeoutDuration}ms`);
} else if (result.done) {
break;
} else {
yield result.value;
}
}
} finally {
it.return?.();
}
}
(async () => {
const asynDataStreamOrGenerator = example();
const TIMEOUT = 500; // Milliseconds
try {
for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT)) {
// Got a response
console.log(`Process ${t}`);
}
console.log("Done");
} catch (e) {
console.error(e.message);
}
})();