使用 C# 和 IAsyncEnumerable 实现异步协程
使用 C# 实现异步协程IAsyncEnumerable
使用 C# 实现异步协程IAsyncEnumerable
本文基于我最近在悉尼 Alt.Net 用户组聚会上发表的演讲。
包含完整源代码的仓库可以在这里找到。
更新:在.NET Conf 2010上就此主题发表演讲是一次令人兴奋的机会和经历。我的演讲视频现已上传至 YouTube:
演讲幻灯片也已提供。
介绍
TLDR,直接跳到现实生活中的例子。
协程是协同执行的函数,这一概念已经存在了几十年。根据维基百科,协程与线程非常相似。然而,协程是协同执行多任务的,而线程通常是抢占式执行多任务的。
协程适用于类似脚本的场景,在这些场景中,代码执行流程可以在每个逻辑步骤之后暂停和恢复。在内部,协程使用某种编程语言语法糖来生成状态机方法。
在 C# 世界中,它们因Unity 游戏开发平台而流行起来,Unity 使用了IEnumerator-style 方法,并且yield return为此而使用。
在 C# 8 之前,无法在同一个方法中同时使用 `and`await和yield return`or`,这使得在协程中使用异步操作变得困难。现在,随着编译器对 `or` 的支持IAsyncEnumerable,这种操作可以自然而然地实现,我们将在本文中探讨此选项。
这里列出的代码的执行环境是 Windows Forms .NET Core 3.1 应用程序,但相同的技术可以用于任何运行 C# 代码的地方。
基于拉取的协程方法IEnumerable/IEnumerator
这种方法已经使用了十多年,自yieldC# 2.0 引入以来就一直存在。以下是如何在 Unity 视频游戏中以方法形式实现淡入淡出效果IEnumerator(摘自 Unity文档)。使用该方法yield return可以将for循环“分散”到多个帧渲染迭代中:
IEnumerator Fade()
{
for (float ft = 1f; ft >= 0; ft -= 0.1f)
{
Color c = renderer.material.color;
c.a = ft;
renderer.material.color = c;
yield return null;
}
}
传统上,我们使用yield return同步生成数据序列,以便使用 LINQ 操作符进行后续处理。相比之下,协程关注的是代码而非数据,我们将其yield return作为一种工具,将代码拆分成多个独立执行的块。
这很方便,因为我们可以使用所有常规的控制流语句(if`if`、for` whileif`、foreach`if`、` usingif` 等)
,而通常情况下,我们需要使用一连串的回调函数。不过,这里也有一个明显的限制:C# 不允许在代码块yield内使用try {}`if` 语句。
让我们创建自己的示例。我们希望CoroutineA这两个CoroutineB函数在主 UI 线程上协同执行。在实际应用中,它们可能负责绘制动画效果、进行后台拼写检查、语法高亮或其他特定的ViewModelUI 更新。
为了简单起见,这里我们只使用控制台来显示一些可视化的进度输出:
private static IEnumerable<int> CoroutineA()
{
for (int i = 0; i < 80; i++)
{
Console.SetCursorPosition(0, 0);
Console.Write($"{nameof(CoroutineA)}: {new String('A', i)}");
yield return i;
}
}
private static IEnumerable<int> CoroutineB()
{
for (int i = 0; i < 80; i++)
{
Console.SetCursorPosition(0, 1);
Console.Write($"{nameof(CoroutineB)}: {new String('B', i)}");
yield return i;
}
}
执行流程可以用下图表示:
为了协同运行这两个协程,我们需要一个调度器,也称为协程驱动程序。它的作用是推动每个协程的执行流程从一个协程跳转到下一个协程yield return。这可以通过定时器间隔、用户输入事件,甚至像IObservableReactiveX 工作流中的订阅之类的机制来实现。
在这个简单的示例中,我们将使用 Windows Forms 定时器。调度器会主动“拉取”后续操作的执行,方法是在每个事件yield发生时调用相应的函数。`completion`是一个辅助函数,用于将两个流合并为一个流。IEnumerator.MoveNextTickCoroutineCombinatorIEnumerable
以下是调度程序代码:
private static async ValueTask RunCoroutinesAsync(CancellationToken token)
{
// combine two IEnumerable sequences into one and get an IEnumerator for it
using var combined = CoroutineCombinator<int>.Combine(
CoroutineA,
CoroutineB)
.GetEnumerator();
var tcs = new TaskCompletionSource<bool>();
using var rego = token.Register(() => tcs.TrySetCanceled(), useSynchronizationContext: true);
using var timer = new System.Windows.Forms.Timer { Interval = 50 };
timer.Tick += (s, e) =>
{
try
{
// upon each timer tick,
// pull/execute the next slice
// of the combined coroutine code flow
if (!combined.MoveNext())
{
tcs.TrySetResult(true);
}
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
};
timer.Start();
await tcs.Task;
}
运行它:
与其使用IEnumerable/ yield return,我们也可以尝试用async/来实现同样的效果await:
private static async Task CoroutineA()
{
for (int i = 0; i < 80; i++)
{
Console.SetCursorPosition(0, 0);
Console.Write($"{nameof(CoroutineA)}: {new String('A', i)}");
// Task.Yield behavior depends, see https://stackoverflow.com/a/23441833
await Task.Yield();
}
}
然而,这样做语义上会有所不同。我们只会收到整个方法完成的通知,而不会收到中间步骤的通知(与之前不同yield return)。因此,我们将无法精确控制执行流程在特定点的暂停和恢复方式await。虽然可以实现自定义的 `await`TaskScheduler或C#SynchronizationContext的`awaitable`来控制它,但这会增加代码的复杂性和运行时开销。
理想情况下,我们应该使用async/await来等待实际异步 API 的结果,而不是用于暂停执行流程,而应该使用yield return来暂停执行流程。
在 C# 8 之前,无法在同一个方法中将两者结合起来,但现在我们可以做到了。
基于推送的协程方法(或异步拉取)IAsyncEnumerable/IAsyncIEnumerator
2018 年,C# 8.0 引入了对异步流的支持,并带来了一些新的语言和运行时特性,例如:
如果您不熟悉异步流的概念,我强烈建议您阅读Stephen Toub所著的《在 C# 8 中使用异步枚举进行迭代》。
简而言之,类似于IEnumerable使用 `get()` 生成数据流以供 `prugs()` 函数使用IEnumerator.MoveNext, `get()` 函数IAsyncEnumerable用于生成事件流,这些事件流可以通过等待 `get()` 函数的结果来异步使用IAsyncEnumerator.MoveNextAsync。我有一篇相关的博客文章:“使用 ReactiveX 或 Channels 将 C# 事件作为异步流”。
因此,类比于IEnumerable,我们可以使用IAsyncEnumerable-methods 来实现内部带有异步调用的协程。
在深入探讨实际示例之前,让我们先用`time` 重现一下之前基于IEnumerable`-based`CoroutineA和`-time` 的操作。我们仍然希望按照固定的定时器间隔运行延续操作,但同时也要确保在 UI 线程上执行任何微任务之前,UI 线程的消息队列中没有待处理的用户输入。以下代码的作用就在于此:CoroutineBIAsyncEnumerableyield returninputIdler.Yield()
private static async IAsyncEnumerable<int> CoroutineA(
[EnumeratorCancellation] CancellationToken token)
{
var inputIdler = new InputIdler();
for (int i = 0; i < 80; i++)
{
// yield to the event loop to process any keyboard/mouse input first
await inputIdler.Yield(token);
// now we could use Task.Run for this to offload it to ThreadPool,
// but let's pretend this code must execute on the UI thread
Console.SetCursorPosition(0, 0);
Console.Write($"{nameof(CoroutineA)}: {new String('A', i)}");
yield return i;
}
}
让我们也放慢速度CoroutineB,引入异步机制Delay,因为现在我们可以这样做了:
private static async IAsyncEnumerable<int> CoroutineB(
[EnumeratorCancellation] CancellationToken token)
{
var inputIdler = new InputIdler();
for (int i = 0; i < 80; i++)
{
// yield to the event loop to process any keyboard/mouse input first
await inputIdler.Yield(token);
Console.SetCursorPosition(0, 1);
Console.Write($"{nameof(CoroutineB)}: {new String('B', i)}");
// slow down
await Task.Delay(25, token);
yield return i;
}
}
两个协程现在并发运行(仍然在同一个 UI 线程上)。我们不希望仅仅因为某个协程异步等待另一个协程CoroutineA就将其暂停。如果需要,仍然可以进行同步,稍后我会展示。CoroutineBTask.Delay
以下是调度程序代码:
private static async ValueTask RunCoroutinesAsync<T>(
int intervalMs,
CancellationToken token,
params Func<CancellationToken, IAsyncEnumerable<T>>[] coroutines)
{
var tasks = coroutines.Select(async c =>
{
var interval = new Interval();
await foreach (var item in c(token).WithCancellation(token))
{
await interval.Delay(intervalMs, token);
}
});
await Task.WhenAll(tasks);
}
运行它:
同步异步协程的流程。
那么,如果CoroutineA需要与另一个进程同步呢CoroutineB?下面是一个虚构但简单的示例,其中CoroutineA只有在另一个CoroutineB进程已经完成自身工作流程一半时,另一个进程才会开始运行。此时,另一个CoroutineB进程会等待CoroutineA另一个进程赶上,然后它们各自继续运行到最后。
我们借助自定义的AsyncCoroutineProxy辅助类来实现这一点,该辅助类封装了 .NET Channel,用作来自的异步进度通知IAsyncEnumerator.MoveNextAsync队列CoroutineB。
AChannel就像一根管道,我们可以将对象推入管道的一端(使用Channel.Writer.WriteAsync),并从另一端以异步流的形式获取它们(使用Channel.Reader.ReadAllAsync)。
CoroutineA:
private static async IAsyncEnumerable<int> CoroutineA(
IAsyncCoroutineProxy<int> coroutineProxy,
[EnumeratorCancellation] CancellationToken token)
{
var coroutineB = await coroutineProxy.AsAsyncEnumerable(token);
var interval = new Interval();
// await for coroutineB to advance by 40 steps
await foreach (var stepB in coroutineB)
{
if (stepB >= 40) break;
Console.SetCursorPosition(0, 0);
// display a throber
Console.Write($"{nameof(CoroutineA)}: {@"-\|/"[stepB % 4]}");
await interval.Delay(intervalMs, token);
}
// now do our own thing
for (int i = 0; i < 80; i++)
{
Console.SetCursorPosition(0, 0);
Console.Write($"{nameof(CoroutineA)}: {new String('A', i)}");
await interval.Delay(intervalMs, token);
yield return i;
}
}
CoroutineB:
private static async IAsyncEnumerable<int> CoroutineB(
IAsyncCoroutineProxy<int> coroutineProxy,
[EnumeratorCancellation] CancellationToken token)
{
var coroutineA = await coroutineProxy.AsAsyncEnumerable(token);
var interval = new Interval();
for (int i = 0; i < 80; i++)
{
Console.SetCursorPosition(0, 1);
Console.Write($"{nameof(CoroutineB)}: {new String('B', i)}");
await interval.Delay(intervalMs, token);
yield return i;
if (i == 40)
{
// await for CoroutineA to catch up
await foreach (var stepA in coroutineA)
{
if (stepA >= 40) break;
Console.SetCursorPosition(0, 1);
// display a throber
Console.Write($"{nameof(CoroutineB)}: {new String('B', i)}{@"-\|/"[stepA % 4]}");
await interval.Delay(intervalMs, token);
}
}
}
}
当调度器代码AsyncCoroutineProxyCoroutineB异步遍历(使用)的输出时await foreach,它会将接收到的项目写入Channel.Writer,然后CoroutineA从中读取它们Channel.Reader:
public async Task RunAsync(Func<CancellationToken, IAsyncEnumerable<T>> coroutine, CancellationToken token)
{
token.ThrowIfCancellationRequested();
var channel = Channel.CreateUnbounded<T>();
var writer = channel.Writer;
var proxy = channel.Reader.ReadAllAsync(token);
_proxyTcs.SetResult(proxy);
try
{
await foreach (var item in coroutine(token).WithCancellation(token))
{
await writer.WriteAsync(item, token);
}
writer.Complete();
}
catch (Exception ex)
{
writer.Complete(ex);
throw;
}
}
同时运行这两个协程:
private static async ValueTask RunCoroutinesAsync(CancellationToken token)
{
var proxyA = new AsyncCoroutineProxy<int>();
var proxyB = new AsyncCoroutineProxy<int>();
// start both coroutines
await Task.WhenAll(
proxyA.RunAsync(token => CoroutineA(proxyB, token), token),
proxyB.RunAsync(token => CoroutineB(proxyA, token), token));
}
运行它:
真实案例
使用CoroutineProxy,CoroutineA它们CoroutineB可以作为异步生产者/消费者相互操作,并且可以交换这些角色。
实际上,我就是这样用它们进行自动化 UI 测试的。最近,我开发了一个 Windows 桌面应用程序#DevComrade,一个旨在默认启用无格式粘贴并提升其他一些效率的业余项目。它使用Win32 模拟输入 API,将无格式文本逐个字符地异步输入到当前活动窗口中,就像人输入一样。
我需要一个自动化测试来模拟这种情况。以下是我编写的代码(完整源代码在此)。
这里有一个前台线程,它运行着一个包含TextBox控件的窗体;还有一个后台线程,它会迭代地调用某个控件SendInput。十年前,我可能会使用类似 ` ManualResetEventtf.runtime` 这样的阻塞机制WaitOne(),在测试工作流程的关键节点同步这两个线程。而现在,我可以使用异步协程来实现这一点。
对于前景线程:
private enum ForegroundEvents
{
Ready,
TextReceived,
Finish
}
/// <summary>
/// A foreground test workflow that creates a UI form
/// </summary>
private static async IAsyncEnumerable<(ForegroundEvents, object)> ForegroundCoroutine(
ICoroutineProxy<(BackgroundEvents, object)> backgroundCoroutineProxy,
[EnumeratorCancellation] CancellationToken token)
{
Assert.IsInstanceOfType(SynchronizationContext.Current, typeof(WindowsFormsSynchronizationContext));
// create a test form with TextBox inside
using var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
using var form = new Form
{
Text = nameof(KeyboardInputTest),
Left = 10,
Top = 10,
Width = 640,
Height = 480,
ShowInTaskbar = false
};
using var formClosedHandlerScope = EventHandlerScope<FormClosedEventHandler>.Create(
(s, e) => cts.Cancel(),
handler => form.FormClosed += handler,
handler => form.FormClosed -= handler);
// add a textbox
var textBox = new TextBox { Dock = DockStyle.Fill, Multiline = true };
form.Controls.Add(textBox);
form.Show();
// show
form.Activate();
textBox.Focus();
// coordinate further execution steps with the background coroutine
await using var backgroundCoroutine =
await backgroundCoroutineProxy.AsAsyncEnumerator(cts.Token);
// notify the background coroutine that we're ready
yield return (ForegroundEvents.Ready, DBNull.Value);
// await for the background coroutine to also be ready
var (foregroundEvent, _) = await backgroundCoroutine.GetNextAsync(cts.Token);
Assert.IsTrue(foregroundEvent == BackgroundEvents.Ready);
// await for the background coroutine to have fed some keystrokes
(foregroundEvent, _) = await backgroundCoroutine.GetNextAsync(cts.Token);
Assert.IsTrue(foregroundEvent == BackgroundEvents.TextSent);
// await for idle input
await InputHelpers.InputYield(delay: INPUT_IDLE_CHECK_INTERVAL, token: cts.Token);
// notify the background coroutine about the text we've actually received
var text = textBox.Text.Replace(Environment.NewLine, "\n");
yield return (ForegroundEvents.TextReceived, text);
}
关于背景讨论:
private enum BackgroundEvents
{
Ready,
TextSent,
Finish
}
/// <summary>
/// A background test workflow that sends keystrokes
/// </summary>
private static async IAsyncEnumerable<(BackgroundEvents, object)> BackgroundCoroutine(
ICoroutineProxy<(ForegroundEvents, object)> foregroundCoroutineProxy,
[EnumeratorCancellation] CancellationToken token)
{
Assert.IsTrue(SynchronizationContext.Current is WindowsFormsSynchronizationContext);
await using var foregroundCoroutine = await foregroundCoroutineProxy.AsAsyncEnumerator(token);
// notify the foreground coroutine that we're ready
yield return (BackgroundEvents.Ready, DBNull.Value);
// await for the foreground coroutine to also be ready
var (foregroundEvent, _) = await foregroundCoroutine.GetNextAsync(token);
Assert.IsTrue(foregroundEvent == ForegroundEvents.Ready);
// feed some text to the foreground window
using var threadInputScope = AttachedThreadInputScope.Create();
Assert.IsTrue(threadInputScope.IsAttached);
using (WaitCursorScope.Create())
{
await KeyboardInput.WaitForAllKeysReleasedAsync(token);
await KeyboardInput.FeedTextAsync(TEXT_TO_FEED, token);
}
// notify the foreground coroutine that we've fed some text
yield return (BackgroundEvents.TextSent, DBNull.Value);
// await for the foreground coroutine to reply with the text
object text;
(foregroundEvent, text) = await foregroundCoroutine.GetNextAsync(token);
Assert.IsTrue(foregroundEvent == ForegroundEvents.TextReceived);
Assert.AreEqual(text, TEXT_TO_FEED);
}
执行测试本身的调度程序代码:
[TestMethod]
public async Task Feed_text_to_TextBox_and_verify_it_was_consumed()
{
using var cts = new CancellationTokenSource(); // TODO: test cancellation
var foregroundCoroutineProxy = new CoroutineProxy<(ForegroundEvents, object)>();
var backgroundCoroutineProxy = new CoroutineProxy<(BackgroundEvents, object)>();
await using var foregroundApartment = new WinFormsApartment();
await using var backgroundApartment = new WinFormsApartment();
// start both coroutine, each in its own WinForms thread
var foregroundTask = foregroundCoroutineProxy.Run(
foregroundApartment,
token => ForegroundCoroutine(backgroundCoroutineProxy, token),
cts.Token);
var backgroundTask = backgroundCoroutineProxy.Run(
backgroundApartment,
token => BackgroundCoroutine(foregroundCoroutineProxy, token),
cts.Token);
await Task.WhenAll(foregroundTask, backgroundTask).WithAggregatedExceptions();
}
这件事的奇怪之处在于,我们创建了foregroundCoroutineProxy(为了ForegroundCoroutine)
传递给BackgroundCoroutine,以及backroundCoroutineProxy(为了BackgroundCoroutine)传递给ForegroundCoroutine。
所以它看起来有点像相互异步递归,但实际上并非如此。真正的反压是由循环产生的,它驱动每个协程CoroutineProxy.RunAsync的执行。await foreach
注意它们如何BackgroundCoroutine相互同步彼此的状态。所有操作都是异步的,没有阻塞调用。两个协程在两个不同的线程上执行,并且实际上是并行运行的。在我们之前的示例中,我们只处理了在同一线程上的并发执行ForegroundCoroutine。yield returnawait GetNextAsync()
结论
我认为,异步协程可以优雅地解决某些特定的生产者/消费者场景,尤其是在生产者和消费者角色划分不明确的情况下。当然,像Reactive Extensions或Dataflow这样成熟强大的框架也能解决类似的问题。不过,使用 Channels 的学习曲线IAsyncEnumerable应该非常低。
非常感谢您的反馈。请随时在此留言或在推特上私信我。
参考
- 协程- 维基百科
- 协程- Unity
- IResult 和协程- Caliburn.Micro
- Async/await 可以替代协程吗? - StackOverflow
- 教程:使用 C# 8.0 和 .NET Core 3.0 生成和使用异步流
- 在 C# 8 中使用异步枚举进行迭代
- 使用 ReactiveX 或 Channels 将 C# 事件作为异步流
PS
在撰写本文的过程中,我还学到了一个很有用的技巧:如何使用新的IValueTaskSource接口来实现轻量级对象的源ValueTask。这有助于在等待ValueTask热异步循环时大幅减少内存分配。例如,可以查看 `<a>`、`<b>`和SimpleValueTaskSource` <b>` 的源代码。InputIdlerTimerSource




