学习使用 .NET Core 和 C# 实现持久函数,以及有状态无服务器架构
由 Mux 主办的 DEV 全球展示挑战赛:展示你的项目!
欢迎在推特上关注我,我很乐意接受您对话题或改进方面的建议。/克里斯
Durable Functions 是 Azure Functions 的扩展,它允许您在无服务器环境中编写有状态函数。Durable Functions 会自动管理状态、检查点和重启。
你可能会问,那到底是什么意思?
这意味着你可以拥有像长时间运行函数那样的长时间运行函数。它还具有状态管理功能,能够记住自身运行状态,就像工作流一样。
这样如何?想象一下你遇到的情况:
你需要将某项任务分解成不同的检查点来进行管理。每个检查点都代表着任务完成的第一步。更具体地说,想象一下一个游戏,你需要加载大量不同的资源,只有当所有资源都加载完毕后,你才能开始游戏。
哦,好的,所以它就像一个工作流程框架。
没错,它允许你指定如何通过流程执行某些操作。甚至针对不同的流程,还有不同的架构模式可供选择。
听起来好像很贵,是吗?
不,并非如此。它的付费模式与 Azure Functions 使用的模式类似,只有在函数/工作流执行时才需要付费。
听起来不错,跟我说说更多细节吧。
本文将涵盖以下内容:
- 什么是持久功能?我们来探讨一下它的定义和核心概念。
- 工作原理,我们将简要解释其工作原理;资源,我们将提供一些资源,以便您深入了解。
- 在实验课上,我们将通过一个示例进行编程,以便您了解主要概念的实际应用以及它们的作用。
概念和高层次解释
在处理持久性功能时,我们需要了解一些概念。所有这些概念都发挥作用,共同使我们能够运行持久性功能。
- 协调器函数,用于定义工作流,设置工作流中应该发生什么,要执行哪些活动以及完成后会发生什么。
- 活动函数是持久功能编排中的基本工作单元。活动函数是指流程中需要编排的功能和任务。您可以根据需要创建任意数量的活动函数。请确保为它们指定描述性名称,以代表流程中的步骤。
- 客户端函数是触发式函数,用于创建新的业务流程实例。客户端函数是创建 Durable Functions 业务流程实例的入口点。
好的,我大概明白了,但是您能再解释一下吗?
当然,解释起来最好的方法是通过一个实际的例子和一幅图。那么,我们来谈谈 订单处理。在订单处理中,我们假设需要执行以下任务:
- 清点库存,
- 向顾客收费
- 创建发货单
- 既然我们已经了解了订单的处理流程,那么让我们向您展示相关图片,以便您对整个工作流程有所了解:
好的,上面我们看到客户端函数是如何被调用的。以创建订单为例,这通常是我们从应用程序访问的一个 HTTP 端点。接下来,客户端函数会启动一个编排实例。这意味着我们会获得一个 `<flow_name>` instance id,它是我们指向该特定流程的唯一引用。接下来,我们会尝试在编排实例内部执行所有操作,例如 `<flow_name>`、`<flow_name>` checking the inventory和 charging the customer `<flow_name> creating a shipment`。
工作原理
让我们更详细地探讨一下它的技术原理。编排的关键在于它通常协调的是异步操作,这意味着我们无法确切地知道某个操作何时完成。为了避免这种情况,持久化函数会在关闭时保存状态,从而产生运行开销。
当编排函数需要执行更多工作时(例如,收到响应消息或持久计时器到期),编排器会唤醒并从头开始重新执行整个函数,以重建本地状态。
等等,要重新运行所有程序吗?
不用担心,在重放过程中,如果代码尝试调用函数(或执行任何其他异步操作),持久任务框架会查询当前编排的执行历史记录。如果发现该活动函数已经执行并产生了结果,它会重放该函数的结果,然后编排器代码继续运行。
哦,好的,听起来不错。
重放会持续进行,直到函数代码执行完毕或安排了新的异步工作为止。
资源
- 免费 Azure 帐户您需要注册 Azure 帐户才能使用 Durable Functions
- 通过 JavaScript快速入门创建你的第一个持久函数,该快速入门指南将引导你完成持久函数的创建过程。
- 持久函数概念点击此处阅读更多关于概念和模式以及如何实现这些模式的信息。
- 编排器功能约束您需要了解的约束。
实验室 - 简单的活动流程
我们相信学习的最佳方式就是动手实践。那么该如何操作呢?其实很简单。使用 VS Code,我们可以安装一个插件,让这个过程变得非常轻松。
创建我们的项目
打开命令面板或键入COMMAND + SHIFT + P。
然后我们选择以下内容,以创建一个新项目
接下来我们需要选择一种语言,比如我们选择英语C#。然后我们会看到以下选项列表:
选择 Durable Functions Orchestration,然后为您的函数命名。
接下来,系统会要求您选择一个存储帐户,您需要选择“<username>” Subscription和 Storage account “<username> Resource group”。这是因为当您保存函数状态时,需要将其保存到某个位置以便稍后恢复。
您的项目现在应该如下所示:
-| obj/
-| bin/
-| .gitignore
-| <name of directory>.csproj
-| local.settings.json
-| Orchestration.cs
让我们仔细看看Orchestration.cs:
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
namespace Company.Function
{
public static class Orchestration
{
[FunctionName("Orchestration")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var outputs = new List<string>();
// Replace "hello" with the name of your Durable Activity Function.
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Seattle"));
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "London"));
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
return outputs;
}
[FunctionName("Orchestration_Hello")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
log.LogInformation($"Saying hello to {name}.");
return $"Hello {name}!";
}
[FunctionName("Orchestration_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
string instanceId = await starter.StartNewAsync("Orchestration", null);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return starter.CreateCheckStatusResponse(req, instanceId);
}
}
}
上方图片展示了创建项目时系统自动生成的三个函数。每个函数都使用了FunctionName("<name of function>")装饰器进行修饰。运行时环境正是通过这种方式识别这些函数,并据此生成端点等功能。
解释文物
好的,我们创建了三个不同的函数或工件,分别是 `a` orchestrator function、`b` HTTP start/client function和 `c` activity function。这一切是如何运作的呢?
一切都始于一个 HttpStart 启动所有程序的函数。该函数随后启动 Orchestrator,Orchestrator 又会启动 Orchestrator 中指定的 Activity 函数。听起来有点理论化,但让我们深入了解一下这些组件,看看代码中发生了什么。
HttpStart
如上所述,这是整个流程的启动函数。让我们来看看它的源代码,并讨论一下它的作用:
[FunctionName("Orchestration_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
string instanceId = await starter.StartNewAsync("Orchestration", null);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return starter.CreateCheckStatusResponse(req, instanceId);
}
从上面的内容可以看出,我们有三个输入参数:
- `req`参数的类型为
IHttpRequestMessage`<string>`,表示传入的 Web 请求。请注意,此参数带有 `HttpTrigger` 装饰器,并有两个输入参数:`AuthorizationLevel` 以及字符串 `"get"` 和 `"post"`。这指示此方法监听使用 HTTP 动词 GET 和 POST 的 Web 请求。 - starter,类型为
IDurableOrchestrationClient,这是允许我们控制总体流程的客户端。 - 该实例允许我们记录
ILogger不同的消息,Azure 可以订阅这些消息,并且在我们开发项目时,这些消息也会在终端中显示。
接下来,我们的初始实例调用 StartNewAsync() 会生成一个 instanceId。这 instanceId 是一个指向此特定函数调用的引用或处理程序。对于本演示来说,这并不重要,但在第二个演示中我们将使用该信息。
最后,我们通过调用来创建 HTTP 响应 CreateCheckStatusResponse()。
编曲家
接下来我们来看看 Orchestrator。所有有趣的事情都发生在这里,我们在这里设置流程,决定何时调用哪个函数以及为什么调用。让我们来看一下代码:
[FunctionName("Orchestration")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var outputs = new List<string>();
// Replace "hello" with the name of your Durable Activity Function.
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Seattle"));
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "London"));
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
return outputs;
}
首先我们看到的是一个方法 FunctionName("Orchestration")。它有一个类型为 context 的输入参数IDurableOrchestrationContext。这个参数不仅可以控制流程,还可以调用辅助函数和活动函数,而活动函数负责执行繁重的工作。当 context 被调用时 CallActivitySync(),它会调用一个指定名称的活动函数 Orchestration_Hello,并传递一个字符串。
编曲_Hello
接下来是活动函数。所有繁重的工作都在这里完成。
[FunctionName("Orchestration_Hello")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
log.LogInformation($"Saying hello to {name}.");
return $"Hello {name}!";
}
我们看到它立即返回,但这可能是一个耗时较长的活动。关键在于,无论它是在一毫秒内完成还是需要一些时间,都无关紧要,编排函数仍然必须等待它完成。想象一下,这个函数可以执行更多操作,例如与数据库交互或发出 HTTP 请求。
调试
你可能觉得到目前为止一切都理解了,但只有当你看到调试流程实际运行时,才会真正明白。所以接下来我们要做的就是,从 VS Code 启动我们的持久函数,这样你就能看到断点是如何触发的,以及何时触发的。
现在我们准备进行调试,让我们 通过VS Code 菜单中的“调试”选项 来启动调试。Run/ Start
终端应该会打印出类似这样的信息:
Http Functions:
Orchestration_HttpStart: [GET,POST] http://localhost:7071/api/Orchestration_HttpStart
接下来,我们需要按照上述步骤,通过调用客户端函数路由来启动整个流程 http://localhost:7071/api/Orchestration_HttpStart。我们需要通过调用提到的 URL 来开始整个过程。
1)首先发生的是我们的 HttpStart 函数被调用:
我们让调试器继续运行。
2)接下来,它 Orchestration function 正在被击中。
接下来我们进入下一个断点,可以看到我们的活动。
3)接下来,该函数 Orchestration_Hello 将被执行。
4) 我们推进了断点,发现自己又回到了编排函数中。
如您所见,活动函数和协调器之间会一直这样循环往复,直到协调器完成为止。
HttpStart 响应
在我们的浏览器中,最终会跳转到类似这样的页面,这是通过名为 的方法返回的 HTTP 响应 HttpStart。
// 20200721125052
// http://localhost:7071/api/Orchestration_HttpStart
{
"id": "151c37c2bbc14f659ffb0a4d3ed9e54e",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/151c37c2bbc14f659ffb0a4d3ed9e54e?taskHub=TestHubName&connection=Storage&code=/XsKEv/uaBc41kmo1ayzMMhI7mh5fNazn4azktrpMbJtuEnSnpm1lA==",
"sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/151c37c2bbc14f659ffb0a4d3ed9e54e/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=/XsKEv/uaBc41kmo1ayzMMhI7mh5fNazn4azktrpMbJtuEnSnpm1lA==",
"terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/151c37c2bbc14f659ffb0a4d3ed9e54e/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=/XsKEv/uaBc41kmo1ayzMMhI7mh5fNazn4azktrpMbJtuEnSnpm1lA==",
"purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/151c37c2bbc14f659ffb0a4d3ed9e54e?taskHub=TestHubName&connection=Storage&code=/XsKEv/uaBc41kmo1ayzMMhI7mh5fNazn4azktrpMbJtuEnSnpm1lA=="
}
此时此刻,我们最想知道的是,我们最终产出了什么?答案就在名为 的网址中 statusQueryGetUri。让我们点击该链接:
// 20200721122529
// http://localhost:7071/runtime/webhooks/durabletask/instances/e1d7e237acb74834a02854050805712e?taskHub=TestHubName&connection=Storage&code=/XsKEv/uaBc41kmo1ayzMMhI7mh5fNazn4azktrpMbJtuEnSnpm1lA==
{
"name": "Orchestration",
"instanceId": "e1d7e237acb74834a02854050805712e",
"runtimeStatus": "Completed",
"input": null,
"customStatus": null,
"output": [
"Hello Tokyo!",
"Hello Seattle!",
"Hello London!"
],
"createdTime": "2020-07-21T11:18:28Z",
"lastUpdatedTime": "2020-07-21T11:24:25Z"
}
如上所示,我们 Orchestration 函数返回的响应是一个数组,其中包含了所有活动函数的响应,如下所示:
"output": [
"Hello Tokyo!",
"Hello Seattle!",
"Hello London"
]
最终导致这种情况是因为我们编写代码的方式,我们是这样写的:
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Seattle"));
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "London"));
概括
关于持久函数还有很多东西需要学习,但我已经能听到你们中的一些人打鼾了,所以我们将把应用程序模式和扇出/扇入模式的实现等主题留到下一部分来讲。
所以,我希望你对此感到兴奋。
文章来源:https://dev.to/azure/learn-durable-functions-with-net-core-and-c-stateful-serverless-41f7







