最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

c# - `IAsyncEnumerable` behaviour at end of stream - Stack Overflow

programmeradmin0浏览0评论

IAsyncEnumerable 'end of stream' seems to behave differently depending on where the stream comes from...

If you're consuming an IAsyncEnumerable yielded from a network resource like a GRPC stream, you get an error if you try to consume the stream twice, which is what I'd expect.

So similarly in the following code, on the second foreach I'd expect it to throw - but it doesn't it yields the complete stream twice!

var stream = GetStream();

await foreach (var item in stream)
{
    Console.WriteLine(item);
}

await foreach (var item in stream)
{
    Console.WriteLine(item);
}

async IAsyncEnumerable<string> GetStream()
{
    await Task.CompletedTask;
    yield return "yes";
    yield return "no";
    yield return "maybe";
}

What's causing this stream to reset to the beginning?

I came across this problem because I was writing an integration test for code which consumes an IAsyncEnumerable which in production is from a GRPC call but for testing I was creating it as above. The production code was errantly reading the stream twice and throwing but that didn't show up in my integration test because of this behaviour.

How could I get my in-memory stream to behave like a network stream?

IAsyncEnumerable 'end of stream' seems to behave differently depending on where the stream comes from...

If you're consuming an IAsyncEnumerable yielded from a network resource like a GRPC stream, you get an error if you try to consume the stream twice, which is what I'd expect.

So similarly in the following code, on the second foreach I'd expect it to throw - but it doesn't it yields the complete stream twice!

var stream = GetStream();

await foreach (var item in stream)
{
    Console.WriteLine(item);
}

await foreach (var item in stream)
{
    Console.WriteLine(item);
}

async IAsyncEnumerable<string> GetStream()
{
    await Task.CompletedTask;
    yield return "yes";
    yield return "no";
    yield return "maybe";
}

What's causing this stream to reset to the beginning?

I came across this problem because I was writing an integration test for code which consumes an IAsyncEnumerable which in production is from a GRPC call but for testing I was creating it as above. The production code was errantly reading the stream twice and throwing but that didn't show up in my integration test because of this behaviour.

How could I get my in-memory stream to behave like a network stream?

Share Improve this question asked Nov 19, 2024 at 17:39 Richard HuntRichard Hunt 494 bronze badges 1
  • Related: Restricting the enumerations of LINQ queries to One Only (my very first question on StackOverflow). – Theodor Zoulias Commented Nov 20, 2024 at 0:14
Add a comment  | 

2 Answers 2

Reset to default 7

What's causing this stream to reset to the beginning?

The fact that you're basically calling GetAsyncEnumerator() twice (which happens implicitly for a foreach loop). Each time you call GetAsyncEnumerator(), it effectively starts a new state machine at the start of the code.

Note that this is not specific to async enumerables at all - you'd see the same thing with an iterator method returning an IEnumerable<string>.

How could I get my in-memory stream to behave like a network stream?

You could implement IAsyncEnumerable directly yourself, potentially using an iterator method returning IAsyncEnumerator so that you could still use yield return. Here's an example:

// General purpose code
public class OneShotAsyncEnumerable<T> : IAsyncEnumerable<T>
{
    private readonly IAsyncEnumerator<T> iterator;
    private bool consumed;

    public OneShotAsyncEnumerable(IAsyncEnumerator<T> iterator)
    {
        this.iterator = iterator;
    }

    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
    {
        if (consumed)
        {
            throw new InvalidOperationException($"Can only call {nameof(GetAsyncEnumerator)} once");
        }
        consumed = true;
        return iterator;
    }
}

// Usage of the code
var stream = new OneShotAsyncEnumerable<string>(GetStream());

await foreach (var item in stream)
{
    Console.WriteLine(item);
}

await foreach (var item in stream)
{
    Console.WriteLine(item);
}

async IAsyncEnumerator<string> GetStream()
{
    await Task.CompletedTask;
    yield return "yes";
    yield return "no";
    yield return "maybe";
}

Alternatively, you could write an extension method to wrap an IAsyncEnumerable<T>, e.g.

// General purpose code
public static class Extensions
{
    public static IAsyncEnumerable<T> IterateOnlyOnce<T>(this IAsyncEnumerable<T> iterable) =>
        new OneShotAsyncEnumerable<T>(iterable);

    private class OneShotAsyncEnumerable<T> : IAsyncEnumerable<T>
    {
        private readonly IAsyncEnumerable<T> iterable;
        private bool consumed;

        public OneShotAsyncEnumerable(IAsyncEnumerable<T> iterable)
        {
            this.iterable = iterable;
        }

        public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
        {
            if (consumed)
            {
                throw new InvalidOperationException($"Can only call {nameof(GetAsyncEnumerator)} once");
            }
            consumed = true;
            return iterable.GetAsyncEnumerator(cancellationToken);
        }
    }
}

// Usage of the code
var stream = GetStream().IterateOnlyOnce();

await foreach (var item in stream)
{
    Console.WriteLine(item);
}

await foreach (var item in stream)
{
    Console.WriteLine(item);
}

async IAsyncEnumerable<string> GetStream()
{
    await Task.CompletedTask;
    yield return "yes";
    yield return "no";
    yield return "maybe";
}

Yes, different IAsyncEnumerable<>s can behave differently in this way, because that aspect of the behavior is not defined by the interface itself. This is the case for pretty much any Stream-like construct, including IEnumerable<>, IQueryable<>, IAsyncEnumerable<>, IObservable<>, and in some senses even Streams.

For example, there are "hot" and "cold" observables, and Eager and Lazy IEnumerables.

The presence of yield return in a C# method creates a lazily-evaluated ("cold") enumerable: an object that defers execution until a call is made to GetEnumerator() or GetAsyncEnumerator().

The compiled code generated by the foreach syntax calls GetEnumerator() or GetAsyncEnumerator(), which is somewhat analogous to calling Subscribe() on an observable. This is why your code appears to "reset" the stream for each loop: it's actually creating two separate "streams" that iterate separately.

The IAsyncEnumerable<> in your live environment is coming from a gRPC stream that's already been opened. To simulate this, you'll need to keep track of state specific to the call to GetStream() in a context which does not defer execution via yield return. This is relatively easy thanks to Local Functions.

IAsyncEnumerable<string> GetStream()
{
    bool alreadyCalled = false;
    return GetStreamInternal();
    async IAsyncEnumerable<string> GetStreamInternal()
    {
        if(alreadyCalled)
        {
            throw new InvalidOperationException("This IAsyncEnumerable is intended to simulate a gRPC stream, and should not be iterated over twice.");
        }
        alreadyCalled = true;
        yield return "yes";
        yield return "no";
        yield return "maybe";
    }    
}

While I was writing this, Jon Skeet updated his answer with something that does the same thing, but separates out the "one-shot" aspect into a separate, composable class rather than using a closure. That approach can be helpful if you need to do this in multiple places, and don't want to repeat the pattern.

If you feel objects are a poor man's closures, here's a "functional" approach to the same pattern:

public static class Extensions
{
    public static IAsyncEnumerable<T> SimulateGRpcOneShotBehavior<T>(this IAsyncEnumerable<T> source)
    {
        bool alreadyCalled = false;
        return GetOneShotEnumerable();
        async IAsyncEnumerable<T> GetOneShotEnumerable()
        {
            if (alreadyCalled)
            {
                throw new InvalidOperationException("This IAsyncEnumerable is intended to simulate a gRPC stream, and should not be iterated over twice.");
            }
            alreadyCalled = true;
            await foreach (var element in source)
            {
                yield return element;
            }
        }
    }
}
IAsyncEnumerable<string> GetStream()
{
    return GetStreamInternal().SimulateGRpcOneShotBehavior();
    async IAsyncEnumerable<string> GetStreamInternal()
    {
        yield return "yes";
        yield return "no";
        yield return "maybe";
    }    
}
发布评论

评论列表(0)

  1. 暂无评论