最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

c# - Getting a 2nd `IAsyncEnumerator<>` from the same `IAsyncEnumerable<>` based on `Task.WhenEach&a

programmeradmin0浏览0评论

Given one IAsyncEnumerable<>, in general it works fine (even if it can be nonoptimal performance-wise) to call GetAsyncEnumerator more than once, getting a new IAsyncEnumerator<> each time. To give an explicit example (all code in this question is in C#), suppose I have this method:

static async IAsyncEnumerable<string> GetStr() {
    var item = await Task.FromResult("Bravo");
    yield return "Alfa";
    yield return item;
    yield return "Charlie";
}

then it works fine to do this:

IAsyncEnumerable<string> oneEnumerable = GetStr();
await foreach (var s in oneEnumerable) {
    Console.WriteLine(s);
}
Console.WriteLine("Between");
await foreach (var t in oneEnumerable) {
    Console.WriteLine(t);
}
Console.WriteLine("End");

If you want to be advanced, you can even have the two enumerators (from the same enumerable) live together, without disposing the first before you acquire the second one, say:

IAsyncEnumerable<string> oneEnumerable = GetStr();
IAsyncEnumerator<string> e = oneEnumerable.GetAsyncEnumerator();
IAsyncEnumerator<string> f = oneEnumerable.GetAsyncEnumerator();
bool c = await f.MoveNextAsync();
bool b = await e.MoveNextAsync();

Console.WriteLine($"b {b} with {e.Current}");
Console.WriteLine($"c {c} with {f.Current}");
await e.DisposeAsync();
await f.DisposeAsync();

The above code works the way you guys expect, the two enumerators are independent and do not mix up their states.


So this was just an introduction; I want to ask about a case where acquiring two enumerators from the same enumerable leads to a weird result. So now consider this method based on Task.WhenEach:

static IAsyncEnumerable<Task<int>> GetTasks() {
    IEnumerable<Task<int>> source = [Task.FromResult(7), Task.FromResult(9), Task.FromResult(13)];
    return Task.WhenEach(source);
}

and use this code:

IAsyncEnumerable<Task<int>> oneEnumerable = GetTasks();
await foreach (var t in oneEnumerable) {
    Console.WriteLine(t);
}
Console.WriteLine("Between");
await foreach (var u in oneEnumerable) {
    Console.WriteLine(u);
}
Console.WriteLine("End");

This runs without exception. BUT: The second enumerations gives zero items (body of foreach with u runs zero times)!


My questions:

  • Is this the expected behavior of Task.WhenEach?
  • Is this behavior documented?

I find it very error-prone. If, for some technical reason it is impossible to get more than one enumeration from an enumerable, it would be much nicer if the second call to GetAsyncEnumerator would throw an exception (or, alternatively, the first call to MoveNextAsync on the second enumerator would throw, or, awaiting the ValueTask<bool> produced by MoveNextAsync would throw).

(I was shown another thread where an IAsyncEnumerable<> produced by the GetRecordsAsync<> of a CsvReader had a similar issue.)

Given one IAsyncEnumerable<>, in general it works fine (even if it can be nonoptimal performance-wise) to call GetAsyncEnumerator more than once, getting a new IAsyncEnumerator<> each time. To give an explicit example (all code in this question is in C#), suppose I have this method:

static async IAsyncEnumerable<string> GetStr() {
    var item = await Task.FromResult("Bravo");
    yield return "Alfa";
    yield return item;
    yield return "Charlie";
}

then it works fine to do this:

IAsyncEnumerable<string> oneEnumerable = GetStr();
await foreach (var s in oneEnumerable) {
    Console.WriteLine(s);
}
Console.WriteLine("Between");
await foreach (var t in oneEnumerable) {
    Console.WriteLine(t);
}
Console.WriteLine("End");

If you want to be advanced, you can even have the two enumerators (from the same enumerable) live together, without disposing the first before you acquire the second one, say:

IAsyncEnumerable<string> oneEnumerable = GetStr();
IAsyncEnumerator<string> e = oneEnumerable.GetAsyncEnumerator();
IAsyncEnumerator<string> f = oneEnumerable.GetAsyncEnumerator();
bool c = await f.MoveNextAsync();
bool b = await e.MoveNextAsync();

Console.WriteLine($"b {b} with {e.Current}");
Console.WriteLine($"c {c} with {f.Current}");
await e.DisposeAsync();
await f.DisposeAsync();

The above code works the way you guys expect, the two enumerators are independent and do not mix up their states.


So this was just an introduction; I want to ask about a case where acquiring two enumerators from the same enumerable leads to a weird result. So now consider this method based on Task.WhenEach:

static IAsyncEnumerable<Task<int>> GetTasks() {
    IEnumerable<Task<int>> source = [Task.FromResult(7), Task.FromResult(9), Task.FromResult(13)];
    return Task.WhenEach(source);
}

and use this code:

IAsyncEnumerable<Task<int>> oneEnumerable = GetTasks();
await foreach (var t in oneEnumerable) {
    Console.WriteLine(t);
}
Console.WriteLine("Between");
await foreach (var u in oneEnumerable) {
    Console.WriteLine(u);
}
Console.WriteLine("End");

This runs without exception. BUT: The second enumerations gives zero items (body of foreach with u runs zero times)!


My questions:

  • Is this the expected behavior of Task.WhenEach?
  • Is this behavior documented?

I find it very error-prone. If, for some technical reason it is impossible to get more than one enumeration from an enumerable, it would be much nicer if the second call to GetAsyncEnumerator would throw an exception (or, alternatively, the first call to MoveNextAsync on the second enumerator would throw, or, awaiting the ValueTask<bool> produced by MoveNextAsync would throw).

(I was shown another thread where an IAsyncEnumerable<> produced by the GetRecordsAsync<> of a CsvReader had a similar issue.)

Share Improve this question edited Feb 4 at 12:59 Theodor Zoulias 44k7 gold badges105 silver badges143 bronze badges asked Feb 4 at 10:40 Jeppe Stig NielsenJeppe Stig Nielsen 62k12 gold badges117 silver badges192 bronze badges 3
  • Filtering an IEnumerable or IAsyncEnumerable is done with Where and other LINQ operators, or enumerator methods. The case with CsvReader has nothing to do with IAsyncEnumerable - the stream of data was consumed to the end. Unlike IEnumerable<> which is an interface over a collection or enumerator method, IAsyncEnumerable<> representes a stream of events that can't just get restarted. It's not just that you can't just start reading from the start of the file, or a network stream, but the source may not provide the same data on the next run. – Panagiotis Kanavos Commented Feb 4 at 10:49
  • What is the actual business requirement? The question describes the code, not the actual problem. An IAsyncEnumerable<T> represents a potentially infinite stream of data produced by some other source. It doesn't control that source, eg the gRPC server stream or SignalR messages. It can be used to create a pipeline of actions that each one works on the stream of data produced by the previous one. It doesn't make much sense to iterate over the same IAsyncEnumerable. In a realistic scenario, multiple iterations will lead to weird results, as each iteration sees different messages – Panagiotis Kanavos Commented Feb 4 at 11:07
  • 2 Repro fiddle dotnetfiddle/Oz23PW I'd guess there is some sort of bug. – Charlieface Commented Feb 4 at 11:22
Add a comment  | 

2 Answers 2

Reset to default 4

It doesn't appear to be documented yet, but the code comments explicitly call this out:

// The enumerable could have GetAsyncEnumerator called on it multiple times.
// As we're dealing with Tasks that only ever transition from non-completed
// to completed, re-enumeration doesn't have much benefit, so we take advantage
// of the optimizations possible by not supporting that and simply have the
// semantics that, no matter how many times the enumerable is enumerated, every
// task is yielded only once. The original GetAsyncEnumerator call will give back
// all the tasks, and all subsequent iterations will be empty.
if (waiter?.TryStart() is not true)
{
    yield break;
}

Also:

/// <summary>0 if this has never been used in an iteration; 1 if it has.</summary>
/// <remarks>This is used to ensure we only ever iterate through the tasks once.</remarks>
private int _enumerated;

/// <summary>Called at the beginning of the iterator to assume ownership of the state.</summary>
/// <returns>true if the caller owns the state; false if the caller should end immediately.</returns>
public bool TryStart() => Interlocked.Exchange(ref _enumerated, 1) == 0;

I agree that this appears non-obvious, and should be documented. Preferably it should allow multiple enumerations. I suggest you create a feature-request on GitHub for this.

You may be able to write your own iterator to do this though. Example:

public async static IAsyncEnumerable<T> WhenEach<T>(
    IEnumerable<T> tasks,
    [EnumeratorCancellation] CancellationToken cancellationToken = default) where T : Task
{
    ArgumentNullException.ThrowIfNull(tasks);
    await foreach (T task in Task.WhenEach(tasks)
        .WithCancellation(cancellationToken).ConfigureAwait(false))
    {
        yield return task;
    }
}

C# iterator methods create enumerables that invoke the body of the method each time a new enumeration starts.

Charlieface's answer explains the behavior of Task.WhenEach, and includes a specialized solution. This answer is about a general solution for all IAsyncEnumerable<T>s that have the same quirk with the Task.WhenEach. The System.Interactive.Async package includes the AsyncEnumerableEx.Defer method, with this signature:

/// <summary>
/// Returns an async-enumerable sequence that invokes the specified factory function
/// whenever a new observer subscribes.
/// </summary>
public static IAsyncEnumerable<TSource> Defer<TSource>(
    Func<IAsyncEnumerable<TSource>> factory);

With the help of this method you can create a deferred enumerable sequence that creates a new IAsyncEnumerable<TSource> each time it is enumerated.

Usage example:

IAsyncEnumerable<Task<int>> oneEnumerable = AsyncEnumerableEx.Defer(() => GetTasks());

Here is an online demo. It produces this output:

7
9
13
Between
7
9
13
End

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论
ok 不同模板 switch ($forum['model']) { /*case '0': include _include(APP_PATH . 'view/htm/read.htm'); break;*/ default: include _include(theme_load('read', $fid)); break; } } break; case '10': // 主题外链 / thread external link http_location(htmlspecialchars_decode(trim($thread['description']))); break; case '11': // 单页 / single page $attachlist = array(); $imagelist = array(); $thread['filelist'] = array(); $threadlist = NULL; $thread['files'] > 0 and list($attachlist, $imagelist, $thread['filelist']) = well_attach_find_by_tid($tid); $data = data_read_cache($tid); empty($data) and message(-1, lang('data_malformation')); $tidlist = $forum['threads'] ? page_find_by_fid($fid, $page, $pagesize) : NULL; if ($tidlist) { $tidarr = arrlist_values($tidlist, 'tid'); $threadlist = well_thread_find($tidarr, $pagesize); // 按之前tidlist排序 $threadlist = array2_sort_key($threadlist, $tidlist, 'tid'); } $allowpost = forum_access_user($fid, $gid, 'allowpost'); $allowupdate = forum_access_mod($fid, $gid, 'allowupdate'); $allowdelete = forum_access_mod($fid, $gid, 'allowdelete'); $access = array('allowpost' => $allowpost, 'allowupdate' => $allowupdate, 'allowdelete' => $allowdelete); $header['title'] = $thread['subject']; $header['mobile_link'] = $thread['url']; $header['keywords'] = $thread['keyword'] ? $thread['keyword'] : $thread['subject']; $header['description'] = $thread['description'] ? $thread['description'] : $thread['brief']; $_SESSION['fid'] = $fid; if ($ajax) { empty($conf['api_on']) and message(0, lang('closed')); $apilist['header'] = $header; $apilist['extra'] = $extra; $apilist['access'] = $access; $apilist['thread'] = well_thread_safe_info($thread); $apilist['thread_data'] = $data; $apilist['forum'] = $forum; $apilist['imagelist'] = $imagelist; $apilist['filelist'] = $thread['filelist']; $apilist['threadlist'] = $threadlist; message(0, $apilist); } else { include _include(theme_load('single_page', $fid)); } break; default: message(-1, lang('data_malformation')); break; } ?>