Given one IAsyncEnumerable<>
, in general it works fine (even if it can be nonoptimal performance-wise) to call GetAsyncEnumerator
more than once, getting a new IAsyncEnumerator<>
each time. To give an explicit example (all code in this question is in C#), suppose I have this method:
static async IAsyncEnumerable<string> GetStr() {
var item = await Task.FromResult("Bravo");
yield return "Alfa";
yield return item;
yield return "Charlie";
}
then it works fine to do this:
IAsyncEnumerable<string> oneEnumerable = GetStr();
await foreach (var s in oneEnumerable) {
Console.WriteLine(s);
}
Console.WriteLine("Between");
await foreach (var t in oneEnumerable) {
Console.WriteLine(t);
}
Console.WriteLine("End");
If you want to be advanced, you can even have the two enumerators (from the same enumerable) live together, without disposing the first before you acquire the second one, say:
IAsyncEnumerable<string> oneEnumerable = GetStr();
IAsyncEnumerator<string> e = oneEnumerable.GetAsyncEnumerator();
IAsyncEnumerator<string> f = oneEnumerable.GetAsyncEnumerator();
bool c = await f.MoveNextAsync();
bool b = await e.MoveNextAsync();
Console.WriteLine($"b {b} with {e.Current}");
Console.WriteLine($"c {c} with {f.Current}");
await e.DisposeAsync();
await f.DisposeAsync();
The above code works the way you guys expect, the two enumerators are independent and do not mix up their states.
So this was just an introduction; I want to ask about a case where acquiring two enumerators from the same enumerable leads to a weird result. So now consider this method based on Task.WhenEach:
static IAsyncEnumerable<Task<int>> GetTasks() {
IEnumerable<Task<int>> source = [Task.FromResult(7), Task.FromResult(9), Task.FromResult(13)];
return Task.WhenEach(source);
}
and use this code:
IAsyncEnumerable<Task<int>> oneEnumerable = GetTasks();
await foreach (var t in oneEnumerable) {
Console.WriteLine(t);
}
Console.WriteLine("Between");
await foreach (var u in oneEnumerable) {
Console.WriteLine(u);
}
Console.WriteLine("End");
This runs without exception. BUT: The second enumerations gives zero items (body of foreach with u
runs zero times)!
My questions:
- Is this the expected behavior of
Task.WhenEach
? - Is this behavior documented?
I find it very error-prone. If, for some technical reason it is impossible to get more than one enumeration from an enumerable, it would be much nicer if the second call to GetAsyncEnumerator
would throw an exception (or, alternatively, the first call to MoveNextAsync
on the second enumerator would throw, or, awaiting the ValueTask<bool>
produced by MoveNextAsync
would throw).
(I was shown another thread where an IAsyncEnumerable<>
produced by the GetRecordsAsync<>
of a CsvReader
had a similar issue.)
Given one IAsyncEnumerable<>
, in general it works fine (even if it can be nonoptimal performance-wise) to call GetAsyncEnumerator
more than once, getting a new IAsyncEnumerator<>
each time. To give an explicit example (all code in this question is in C#), suppose I have this method:
static async IAsyncEnumerable<string> GetStr() {
var item = await Task.FromResult("Bravo");
yield return "Alfa";
yield return item;
yield return "Charlie";
}
then it works fine to do this:
IAsyncEnumerable<string> oneEnumerable = GetStr();
await foreach (var s in oneEnumerable) {
Console.WriteLine(s);
}
Console.WriteLine("Between");
await foreach (var t in oneEnumerable) {
Console.WriteLine(t);
}
Console.WriteLine("End");
If you want to be advanced, you can even have the two enumerators (from the same enumerable) live together, without disposing the first before you acquire the second one, say:
IAsyncEnumerable<string> oneEnumerable = GetStr();
IAsyncEnumerator<string> e = oneEnumerable.GetAsyncEnumerator();
IAsyncEnumerator<string> f = oneEnumerable.GetAsyncEnumerator();
bool c = await f.MoveNextAsync();
bool b = await e.MoveNextAsync();
Console.WriteLine($"b {b} with {e.Current}");
Console.WriteLine($"c {c} with {f.Current}");
await e.DisposeAsync();
await f.DisposeAsync();
The above code works the way you guys expect, the two enumerators are independent and do not mix up their states.
So this was just an introduction; I want to ask about a case where acquiring two enumerators from the same enumerable leads to a weird result. So now consider this method based on Task.WhenEach:
static IAsyncEnumerable<Task<int>> GetTasks() {
IEnumerable<Task<int>> source = [Task.FromResult(7), Task.FromResult(9), Task.FromResult(13)];
return Task.WhenEach(source);
}
and use this code:
IAsyncEnumerable<Task<int>> oneEnumerable = GetTasks();
await foreach (var t in oneEnumerable) {
Console.WriteLine(t);
}
Console.WriteLine("Between");
await foreach (var u in oneEnumerable) {
Console.WriteLine(u);
}
Console.WriteLine("End");
This runs without exception. BUT: The second enumerations gives zero items (body of foreach with u
runs zero times)!
My questions:
- Is this the expected behavior of
Task.WhenEach
? - Is this behavior documented?
I find it very error-prone. If, for some technical reason it is impossible to get more than one enumeration from an enumerable, it would be much nicer if the second call to GetAsyncEnumerator
would throw an exception (or, alternatively, the first call to MoveNextAsync
on the second enumerator would throw, or, awaiting the ValueTask<bool>
produced by MoveNextAsync
would throw).
(I was shown another thread where an IAsyncEnumerable<>
produced by the GetRecordsAsync<>
of a CsvReader
had a similar issue.)
2 Answers
Reset to default 4It doesn't appear to be documented yet, but the code comments explicitly call this out:
// The enumerable could have GetAsyncEnumerator called on it multiple times.
// As we're dealing with Tasks that only ever transition from non-completed
// to completed, re-enumeration doesn't have much benefit, so we take advantage
// of the optimizations possible by not supporting that and simply have the
// semantics that, no matter how many times the enumerable is enumerated, every
// task is yielded only once. The original GetAsyncEnumerator call will give back
// all the tasks, and all subsequent iterations will be empty.
if (waiter?.TryStart() is not true)
{
yield break;
}
Also:
/// <summary>0 if this has never been used in an iteration; 1 if it has.</summary>
/// <remarks>This is used to ensure we only ever iterate through the tasks once.</remarks>
private int _enumerated;
/// <summary>Called at the beginning of the iterator to assume ownership of the state.</summary>
/// <returns>true if the caller owns the state; false if the caller should end immediately.</returns>
public bool TryStart() => Interlocked.Exchange(ref _enumerated, 1) == 0;
I agree that this appears non-obvious, and should be documented. Preferably it should allow multiple enumerations. I suggest you create a feature-request on GitHub for this.
You may be able to write your own iterator to do this though. Example:
public async static IAsyncEnumerable<T> WhenEach<T>(
IEnumerable<T> tasks,
[EnumeratorCancellation] CancellationToken cancellationToken = default) where T : Task
{
ArgumentNullException.ThrowIfNull(tasks);
await foreach (T task in Task.WhenEach(tasks)
.WithCancellation(cancellationToken).ConfigureAwait(false))
{
yield return task;
}
}
C# iterator methods create enumerables that invoke the body of the method each time a new enumeration starts.
Charlieface's answer explains the behavior of Task.WhenEach
, and includes a specialized solution. This answer is about a general solution for all IAsyncEnumerable<T>
s that have the same quirk with the Task.WhenEach
. The System.Interactive.Async package includes the AsyncEnumerableEx.Defer
method, with this signature:
/// <summary>
/// Returns an async-enumerable sequence that invokes the specified factory function
/// whenever a new observer subscribes.
/// </summary>
public static IAsyncEnumerable<TSource> Defer<TSource>(
Func<IAsyncEnumerable<TSource>> factory);
With the help of this method you can create a deferred enumerable sequence that creates a new IAsyncEnumerable<TSource>
each time it is enumerated.
Usage example:
IAsyncEnumerable<Task<int>> oneEnumerable = AsyncEnumerableEx.Defer(() => GetTasks());
Here is an online demo. It produces this output:
7
9
13
Between
7
9
13
End
IEnumerable
orIAsyncEnumerable
is done withWhere
and other LINQ operators, or enumerator methods. The case with CsvReader has nothing to do with IAsyncEnumerable - the stream of data was consumed to the end. UnlikeIEnumerable<>
which is an interface over a collection or enumerator method,IAsyncEnumerable<>
representes a stream of events that can't just get restarted. It's not just that you can't just start reading from the start of the file, or a network stream, but the source may not provide the same data on the next run. – Panagiotis Kanavos Commented Feb 4 at 10:49IAsyncEnumerable<T>
represents a potentially infinite stream of data produced by some other source. It doesn't control that source, eg the gRPC server stream or SignalR messages. It can be used to create a pipeline of actions that each one works on the stream of data produced by the previous one. It doesn't make much sense to iterate over the same IAsyncEnumerable. In a realistic scenario, multiple iterations will lead to weird results, as each iteration sees different messages – Panagiotis Kanavos Commented Feb 4 at 11:07