发布于 2026-01-06 4 阅读
0

Asynchronous coroutines with C# and IAsyncEnumerable Asynchronous coroutines with C# and IAsyncEnumerable

使用 C# 和 IAsyncEnumerable 实现异步协程

使用 C# 实现异步协程IAsyncEnumerable

使用 C# 实现异步协程IAsyncEnumerable

本文基于我最近在悉尼 Alt.Net 用户组聚会上发表的演讲

包含完整源代码的仓库可以在这里找到。

更新:在.NET Conf 2010上就此主题发表演讲是一次令人兴奋的机会和经历。我的演讲视频现已上传至 YouTube:

演讲幻灯片也已提供

介绍

TLDR,直接跳到现实生活中的例子

协程是协同执行的函数,这一概念已经存在了几十年。根据维基百科协程与线程非常相似。然而,协程是协同执行多任务的,而线程通常是抢占式执行多任务的

协程适用于类似脚本的场景,在这些场景中,代码执行流程可以在每个逻辑步骤之后暂停和恢复。在内部,协程使用某种编程语言语法糖来生成状态机方法。

在 C# 世界中,它们因Unity 游戏开发平台而流行起来,Unity 使用了IEnumerator-style 方法,并且yield return为此而使用。

在 C# 8 之前,无法在同一个方法中同时使用 `and`awaityield return`or`,这使得在协程中使用异步操作变得困难。现在,随着编译器对 `or` 的支持IAsyncEnumerable,这种操作可以自然而然地实现,我们将在本文中探讨此选项。

这里列出的代码的执行环境是 Windows Forms .NET Core 3.1 应用程序,但相同的技术可以用于任何运行 C# 代码的地方。

基于拉取的协程方法IEnumerable/IEnumerator

这种方法已经使用了十多年,自yieldC# 2.0 引入以来就一直存在。以下是如何在 Unity 视频游戏中以方法形式实现淡入淡出效果IEnumerator(摘自 Unity文档)。使用该方法yield return可以将for循环“分散”到多个帧渲染迭代中:



IEnumerator Fade() 
{
    for (float ft = 1f; ft >= 0; ft -= 0.1f) 
    {
        Color c = renderer.material.color;
        c.a = ft;
        renderer.material.color = c;
        yield return null;
    }
}


Enter fullscreen mode Exit fullscreen mode

传统上,我们使用yield return同步生成数据序列,以便使用 LINQ 操作符进行后续处理。相比之下,协程关注的是代码而非数据,我们将其yield return作为一种工具,将代码拆分成多个独立执行的块。

这很方便,因为我们可以使用所有常规的控制流语句(if`if`、for` whileif`、foreach`if`、` usingif` 等)
,而通常情况下,我们需要使用一连串的回调函数。不过,这里也有一个明显的限制:C# 不允许在代码块yield内使用try {}`if` 语句。

让我们创建自己的示例。我们希望CoroutineA这两个CoroutineB函数在主 UI 线程上协同执行。在实际应用中,它们可能负责绘制动画效果、进行后台拼写检查、语法高亮或其他特定的ViewModelUI 更新。

为了简单起见,这里我们只使用控制台来显示一些可视化的进度输出:



private static IEnumerable<int> CoroutineA()
{
    for (int i = 0; i < 80; i++)
    {
        Console.SetCursorPosition(0, 0);
        Console.Write($"{nameof(CoroutineA)}: {new String('A', i)}");
        yield return i;
    }
}

private static IEnumerable<int> CoroutineB()
{
    for (int i = 0; i < 80; i++)
    {
        Console.SetCursorPosition(0, 1);
        Console.Write($"{nameof(CoroutineB)}: {new String('B', i)}");
        yield return i;
    }
}


Enter fullscreen mode Exit fullscreen mode

执行流程可以用下图表示:

协程流

为了协同运行这两个协程,我们需要一个调度器,也称为协程驱动程序。它的作用是推动每个协程的执行流程从一个协程跳转到下一个协程yield return。这可以通过定时器间隔、用户输入事件,甚至像IObservableReactiveX 工作流中的订阅之类的机制来实现。

在这个简单的示例中,我们将使用 Windows Forms 定时器。调度器会主动“拉取”后续操作的执行,方法是在每个事件yield发生时调用相应的函数。`completion`是一个辅助函数,用于将两个流合并为一个流。IEnumerator.MoveNextTickCoroutineCombinatorIEnumerable

以下是调度程序代码



private static async ValueTask RunCoroutinesAsync(CancellationToken token)
{
    // combine two IEnumerable sequences into one and get an IEnumerator for it
    using var combined = CoroutineCombinator<int>.Combine(
        CoroutineA, 
        CoroutineB)
        .GetEnumerator();

    var tcs = new TaskCompletionSource<bool>();
    using var rego = token.Register(() => tcs.TrySetCanceled(), useSynchronizationContext: true);

    using var timer = new System.Windows.Forms.Timer { Interval = 50 };
    timer.Tick += (s, e) =>
    {
        try
        {
            // upon each timer tick,
            // pull/execute the next slice 
            // of the combined coroutine code flow
            if (!combined.MoveNext())
            {
                tcs.TrySetResult(true);
            }
        }
        catch (Exception ex)
        {
            tcs.TrySetException(ex);
        }
    };

    timer.Start();
    await tcs.Task;
}


Enter fullscreen mode Exit fullscreen mode

运行它:

运行基于拉取的协程

与其使用IEnumerable/ yield return,我们也可以尝试用async/来实现同样的效果await



private static async Task CoroutineA()
{
    for (int i = 0; i < 80; i++)
    {
        Console.SetCursorPosition(0, 0);
        Console.Write($"{nameof(CoroutineA)}: {new String('A', i)}");
        // Task.Yield behavior depends, see https://stackoverflow.com/a/23441833
        await Task.Yield(); 
    }
}


Enter fullscreen mode Exit fullscreen mode

然而,这样做语义上会有所不同。我们只会收到整个方法完成的通知,而不会收到中间步骤的通知(与之前不同yield return)。因此,我们将无法精确控制执行流程在特定点的暂停和恢复方式await。虽然可以实现自定义的 `await`TaskScheduler或C#SynchronizationContext`awaitable`来控制它,但这会增加代码的复杂性和运行时开销。

理想情况下,我们应该使用async/await来等待实际异步 API 的结果,而不是用于暂停执行流程,而应该使用yield return来暂停执行流程。

在 C# 8 之前,无法在同一个方法中将两者结合起来,但现在我们可以做到了。

基于推送的协程方法(或异步拉取)IAsyncEnumerable/IAsyncIEnumerator

2018 年,C# 8.0 引入了对异步流的支持,并带来了一些新的语言和运行时特性,例如:

如果您不熟悉异步流的概念,我强烈建议您阅读Stephen Toub所著的《在 C# 8 中使用异步枚举进行迭代》

简而言之,类似于IEnumerable使用 `get()` 生成数据流以供 `prugs()` 函数使用IEnumerator.MoveNext, `get()` 函数IAsyncEnumerable用于生成事件流,这些事件流可以通过等待 `get()` 函数的结果来异步使用IAsyncEnumerator.MoveNextAsync。我有一篇相关的博客文章:“使用 ReactiveX 或 Channels 将 C# 事件作为异步流”

因此,类比于IEnumerable,我们可以使用IAsyncEnumerable-methods 来实现内部带有异步调用的协程。

在深入探讨实际示例之前,让我们先`time` 重现一下之前基于IEnumerable`-based`CoroutineA`-time` 的操作。我们仍然希望按照固定的定时器间隔运行延续操作,但同时也要确保在 UI 线程上执行任何微任务之前,UI 线程的消息队列中没有待处理的用户输入。以下代码的作用就在于此:CoroutineBIAsyncEnumerableyield returninputIdler.Yield()



private static async IAsyncEnumerable<int> CoroutineA(
    [EnumeratorCancellation] CancellationToken token)
{
    var inputIdler = new InputIdler();
    for (int i = 0; i < 80; i++)
    {
        // yield to the event loop to process any keyboard/mouse input first
        await inputIdler.Yield(token);

        // now we could use Task.Run for this to offload it to ThreadPool,
        // but let's pretend this code must execute on the UI thread 
        Console.SetCursorPosition(0, 0);
        Console.Write($"{nameof(CoroutineA)}: {new String('A', i)}");

        yield return i;
    }
}


Enter fullscreen mode Exit fullscreen mode

让我们也放慢速度CoroutineB,引入异步机制Delay,因为现在我们可以这样做了:



private static async IAsyncEnumerable<int> CoroutineB(
    [EnumeratorCancellation] CancellationToken token)
{
    var inputIdler = new InputIdler();
    for (int i = 0; i < 80; i++)
    {
        // yield to the event loop to process any keyboard/mouse input first
        await inputIdler.Yield(token);

        Console.SetCursorPosition(0, 1);
        Console.Write($"{nameof(CoroutineB)}: {new String('B', i)}");

        // slow down
        await Task.Delay(25, token);
        yield return i;
    }
}


Enter fullscreen mode Exit fullscreen mode

两个协程现在并发运行(仍然在同一个 UI 线程上)。我们不希望仅仅因为某个协程异步等待另一个协程CoroutineA就将其暂停。如果需要,仍然可以进行同步,稍后我会展示。CoroutineBTask.Delay

以下是调度程序代码



private static async ValueTask RunCoroutinesAsync<T>(
    int intervalMs,
    CancellationToken token,
    params Func<CancellationToken, IAsyncEnumerable<T>>[] coroutines)
{
    var tasks = coroutines.Select(async c => 
    {
        var interval = new Interval();
        await foreach (var item in c(token).WithCancellation(token))
        {
            await interval.Delay(intervalMs, token);
        }
    });

    await Task.WhenAll(tasks); 
}


Enter fullscreen mode Exit fullscreen mode

运行它:

运行基于推送的协程

同步异步协程的流程。

那么,如果CoroutineA需要与另一个进程同步呢CoroutineB?下面是一个虚构但简单的示例,其中CoroutineA只有在另一个CoroutineB进程已经完成自身工作流程一半时,另一个进程才会开始运行。此时,另一个CoroutineB进程会等待CoroutineA另一个进程赶上,然后它们各自继续运行到最后。

我们借助自定义的AsyncCoroutineProxy辅助类来实现这一点,该辅助类封装了 .NET Channel,用作来自的异步进度通知IAsyncEnumerator.MoveNextAsync队列CoroutineB

AChannel就像一根管道,我们可以将对象推入管道的一端(使用Channel.Writer.WriteAsync),并从另一端以异步流的形式获取它们(使用Channel.Reader.ReadAllAsync)。

协程流

CoroutineA



private static async IAsyncEnumerable<int> CoroutineA(
    IAsyncCoroutineProxy<int> coroutineProxy,
    [EnumeratorCancellation] CancellationToken token)
{
    var coroutineB = await coroutineProxy.AsAsyncEnumerable(token);
    var interval = new Interval();

    // await for coroutineB to advance by 40 steps
    await foreach (var stepB in coroutineB)
    {
        if (stepB >= 40) break;
        Console.SetCursorPosition(0, 0);
        // display a throber
        Console.Write($"{nameof(CoroutineA)}: {@"-\|/"[stepB % 4]}"); 
        await interval.Delay(intervalMs, token);
    }

    // now do our own thing
    for (int i = 0; i < 80; i++)
    {
        Console.SetCursorPosition(0, 0);
        Console.Write($"{nameof(CoroutineA)}: {new String('A', i)}"); 

        await interval.Delay(intervalMs, token);
        yield return i;
    }
}


Enter fullscreen mode Exit fullscreen mode

CoroutineB



private static async IAsyncEnumerable<int> CoroutineB(
    IAsyncCoroutineProxy<int> coroutineProxy,
    [EnumeratorCancellation] CancellationToken token)
{
    var coroutineA = await coroutineProxy.AsAsyncEnumerable(token);
    var interval = new Interval();

    for (int i = 0; i < 80; i++)
    {
        Console.SetCursorPosition(0, 1);
        Console.Write($"{nameof(CoroutineB)}: {new String('B', i)}");

        await interval.Delay(intervalMs, token);
        yield return i;

        if (i == 40)
        {
            // await for CoroutineA to catch up
            await foreach (var stepA in coroutineA)
            {
                if (stepA >= 40) break;
                Console.SetCursorPosition(0, 1);
                // display a throber
                Console.Write($"{nameof(CoroutineB)}: {new String('B', i)}{@"-\|/"[stepA % 4]}");
                await interval.Delay(intervalMs, token);
            }
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

调度器代码AsyncCoroutineProxyCoroutineB异步遍历(使用)的输出时await foreach,它会将接收到的项目写入Channel.Writer,然后CoroutineA从中读取它们Channel.Reader



public async Task RunAsync(Func<CancellationToken, IAsyncEnumerable<T>> coroutine, CancellationToken token)
{
    token.ThrowIfCancellationRequested();
    var channel = Channel.CreateUnbounded<T>();
    var writer = channel.Writer;
    var proxy = channel.Reader.ReadAllAsync(token);
    _proxyTcs.SetResult(proxy); 

    try
    {
        await foreach (var item in coroutine(token).WithCancellation(token))
        {
            await writer.WriteAsync(item, token);
        }
        writer.Complete();
    }
    catch (Exception ex)
    {
        writer.Complete(ex);
        throw;
    }
}


Enter fullscreen mode Exit fullscreen mode

同时运行这两个协程



private static async ValueTask RunCoroutinesAsync(CancellationToken token)
{
    var proxyA = new AsyncCoroutineProxy<int>();
    var proxyB = new AsyncCoroutineProxy<int>();

    // start both coroutines
    await Task.WhenAll(
        proxyA.RunAsync(token => CoroutineA(proxyB, token), token),
        proxyB.RunAsync(token => CoroutineB(proxyA, token), token));
}


Enter fullscreen mode Exit fullscreen mode

运行它:

运行基于推送的协程

真实案例

使用CoroutineProxyCoroutineA它们CoroutineB可以作为异步生产者/消费者相互操作,并且可以交换这些角色。

实际上,我就是这样用它们进行自动化 UI 测试的。最近,我开发了一个 Windows 桌面应用程序#DevComrade,一个旨在默认启用无格式粘贴并提升其他一些效率的业余项目。它使用Win32 模拟输入 API,将无格式文本逐个字符地异步输入到当前活动窗口中,就像人输入一样。

我需要一个自动化测试来模拟这种情况。以下是我编写的代码(完整源代码在此)。

这里有一个前台线程,它运行着一个包含TextBox控件的窗体;还有一个后台线程,它会迭代地调用某个控件SendInput。十年前,我可能会使用类似 ` ManualResetEventtf.runtime` 这样的阻塞机制WaitOne(),在测试工作流程的关键节点同步这两个线程。而现在,我可以使用异步协程来实现这一点。

对于前景线程



private enum ForegroundEvents
{
    Ready,
    TextReceived,
    Finish
}

/// <summary>
/// A foreground test workflow that creates a UI form
/// </summary>
private static async IAsyncEnumerable<(ForegroundEvents, object)> ForegroundCoroutine(
    ICoroutineProxy<(BackgroundEvents, object)> backgroundCoroutineProxy,
    [EnumeratorCancellation] CancellationToken token)
{
    Assert.IsInstanceOfType(SynchronizationContext.Current, typeof(WindowsFormsSynchronizationContext));

    // create a test form with TextBox inside
    using var cts = CancellationTokenSource.CreateLinkedTokenSource(token);

    using var form = new Form
    {
        Text = nameof(KeyboardInputTest),
        Left = 10,
        Top = 10,
        Width = 640,
        Height = 480,
        ShowInTaskbar = false
    };

    using var formClosedHandlerScope = EventHandlerScope<FormClosedEventHandler>.Create(
        (s, e) => cts.Cancel(),
        handler => form.FormClosed += handler,
        handler => form.FormClosed -= handler);

    // add a textbox 
    var textBox = new TextBox { Dock = DockStyle.Fill, Multiline = true };
    form.Controls.Add(textBox);
    form.Show();

    // show
    form.Activate();
    textBox.Focus();

    // coordinate further execution steps with the background coroutine
    await using var backgroundCoroutine =
        await backgroundCoroutineProxy.AsAsyncEnumerator(cts.Token);

    // notify the background coroutine that we're ready
    yield return (ForegroundEvents.Ready, DBNull.Value);

    // await for the background coroutine to also be ready
    var (foregroundEvent, _) = await backgroundCoroutine.GetNextAsync(cts.Token);
    Assert.IsTrue(foregroundEvent == BackgroundEvents.Ready);

    // await for the background coroutine to have fed some keystrokes
    (foregroundEvent, _) = await backgroundCoroutine.GetNextAsync(cts.Token);
    Assert.IsTrue(foregroundEvent == BackgroundEvents.TextSent);

    // await for idle input
    await InputHelpers.InputYield(delay: INPUT_IDLE_CHECK_INTERVAL, token: cts.Token);

    // notify the background coroutine about the text we've actually received
    var text = textBox.Text.Replace(Environment.NewLine, "\n");
    yield return (ForegroundEvents.TextReceived, text);
}


Enter fullscreen mode Exit fullscreen mode

关于背景讨论



private enum BackgroundEvents
{
    Ready,
    TextSent,
    Finish
}

/// <summary>
/// A background test workflow that sends keystrokes
/// </summary>
private static async IAsyncEnumerable<(BackgroundEvents, object)> BackgroundCoroutine(
    ICoroutineProxy<(ForegroundEvents, object)> foregroundCoroutineProxy,
    [EnumeratorCancellation] CancellationToken token)
{
    Assert.IsTrue(SynchronizationContext.Current is WindowsFormsSynchronizationContext);

    await using var foregroundCoroutine = await foregroundCoroutineProxy.AsAsyncEnumerator(token);

    // notify the foreground coroutine that we're ready
    yield return (BackgroundEvents.Ready, DBNull.Value);

    // await for the foreground coroutine to also be ready
    var (foregroundEvent, _) = await foregroundCoroutine.GetNextAsync(token);
    Assert.IsTrue(foregroundEvent == ForegroundEvents.Ready);

    // feed some text to the foreground window
    using var threadInputScope = AttachedThreadInputScope.Create();
    Assert.IsTrue(threadInputScope.IsAttached);

    using (WaitCursorScope.Create())
    {
        await KeyboardInput.WaitForAllKeysReleasedAsync(token);
        await KeyboardInput.FeedTextAsync(TEXT_TO_FEED, token);
    }

    // notify the foreground coroutine that we've fed some text
    yield return (BackgroundEvents.TextSent, DBNull.Value);

    // await for the foreground coroutine to reply with the text
    object text;
    (foregroundEvent, text) = await foregroundCoroutine.GetNextAsync(token);
    Assert.IsTrue(foregroundEvent == ForegroundEvents.TextReceived);
    Assert.AreEqual(text, TEXT_TO_FEED);
}


Enter fullscreen mode Exit fullscreen mode

执行测试本身的调度程序代码



[TestMethod]
public async Task Feed_text_to_TextBox_and_verify_it_was_consumed()
{
    using var cts = new CancellationTokenSource(); // TODO: test cancellation

    var foregroundCoroutineProxy = new CoroutineProxy<(ForegroundEvents, object)>();
    var backgroundCoroutineProxy = new CoroutineProxy<(BackgroundEvents, object)>();

    await using var foregroundApartment = new WinFormsApartment();
    await using var backgroundApartment = new WinFormsApartment();

    // start both coroutine, each in its own WinForms thread

    var foregroundTask = foregroundCoroutineProxy.Run(
        foregroundApartment, 
        token => ForegroundCoroutine(backgroundCoroutineProxy, token),
        cts.Token);

    var backgroundTask = backgroundCoroutineProxy.Run(
        backgroundApartment,
        token => BackgroundCoroutine(foregroundCoroutineProxy, token),
        cts.Token);

    await Task.WhenAll(foregroundTask, backgroundTask).WithAggregatedExceptions();
}


Enter fullscreen mode Exit fullscreen mode

这件事的奇怪之处在于,我们创建了foregroundCoroutineProxy(为了ForegroundCoroutine
传递给BackgroundCoroutine,以及backroundCoroutineProxy(为了BackgroundCoroutine)传递给ForegroundCoroutine

所以它看起来有点像相互异步递归,但实际上并非如此。真正的反压是由循环产生的,它驱动每个协CoroutineProxy.RunAsync的执行await foreach

注意它们如何BackgroundCoroutine相互同步彼此的状态。所有操作都是异步的,没有阻塞调用。两个协程在两个不同的线程上执行,并且实际上是并行运行的。在我们之前的示例中,我们只处理了在同一线程上的并发执行ForegroundCoroutineyield returnawait GetNextAsync()

结论

我认为,异步协程可以优雅地解决某些特定的生产者/消费者场景,尤其是在生产者和消费者角色划分不明确的情况下。当然,像Reactive ExtensionsDataflow这样成熟强大的框架也能解决类似的问题。不过,使用 Channels 的学习曲线IAsyncEnumerable应该非常低。

非常感谢您的反馈。请随时在此留言或在推特上私信我

参考

PS

在撰写本文的过程中,我还学到了一个很有用的技巧:如何使用新的IValueTaskSource接口来实现轻量级对象的源ValueTask。这有助于在等待ValueTask热异步循环时大幅减少内存分配。例如,可以查看 `<a>`、`<b>`SimpleValueTaskSource` <b>` 的源代码
InputIdlerTimerSource

文章来源:https://dev.to/noseratio/asynchronous-coroutines-with-c-8-0-and-iasyncenumerable-2e04