发布于 2026-01-06 1 阅读
0

使用 .NET Core 和 C# 学习持久函数,有状态的无服务器 DEV 全球展示挑战赛,由 Mux 呈现:展示你的项目!

学习使用 .NET Core 和 C# 实现持久函数,以及有状态无服务器架构

由 Mux 主办的 DEV 全球展示挑战赛:展示你的项目!

欢迎在推特上关注我,我很乐意接受您对话题或改进方面的建议。/克里斯

Durable Functions 是 Azure Functions 的扩展,它允许您在无服务器环境中编写有状态函数。Durable Functions 会自动管理状态、检查点和重启。

你可能会问,那到底是什么意思?

这意味着你可以拥有像长时间运行函数那样的长时间运行函数。它还具有状态管理功能,能够记住自身运行状态,就像工作流一样。

这样如何?想象一下你遇到的情况:

你需要将某项任务分解成不同的检查点来进行管理。每个检查点都代表着任务完成的第一步。更具体地说,想象一下一个游戏,你需要加载大量不同的资源,只有当所有资源都加载完毕后,你才能开始游戏。

哦,好的,所以它就像一个工作流程框架。

没错,它允许你指定如何通过流程执行某些操作。甚至针对不同的流程,还有不同的架构模式可供选择。

听起来好像很贵,是吗?

不,并非如此。它的付费模式与 Azure Functions 使用的模式类似,只有在函数/工作流执行时才需要付费。

听起来不错,跟我说说更多细节吧。

本文将涵盖以下内容:

  • 什么是持久功能?我们来探讨一下它的定义和核心概念。
  • 工作原理,我们将简要解释其工作原理;资源,我们将提供一些资源,以便您深入了解。 
  • 在实验课上,我们将通过一个示例进行编程,以便您了解主要概念的实际应用以及它们的作用。

 概念和高层次解释

在处理持久性功能时,我们需要了解一些概念。所有这些概念都发挥作用,共同使我们能够运行持久性功能。

  • 协调器函数,用于定义工作流,设置工作流中应该发生什么,要执行哪些活动以及完成后会发生什么。
  • 活动函数是持久功能编排中的基本工作单元。活动函数是指流程中需要编排的功能和任务。您可以根据需要创建任意数量的活动函数。请确保为它们指定描述性名称,以代表流程中的步骤。
  • 客户端函数是触发式函数,用于创建新的业务流程实例。客户端函数是创建 Durable Functions 业务流程实例的入口点。

好的,我大概明白了,但是您能再解释一下吗?

当然,解释起来最好的方法是通过一个实际的例子和一幅图。那么,我们来谈谈 订单处理。在订单处理中,我们假设需要执行以下任务:

  1. 清点库存,
  2. 向顾客收费
  3. 创建发货单
  4. 既然我们已经了解了订单的处理流程,那么让我们向您展示相关图片,以便您对整个工作流程有所了解:

订单处理

好的,上面我们看到客户端函数是如何被调用的。以创建订单为例,这通常是我们从应用程序访问的一个 HTTP 端点。接下来,客户端函数会启动一个编排实例。这意味着我们会获得一个 `<flow_name>`  instance id,它是我们指向该特定流程的唯一引用。接下来,我们会尝试在编排实例内部执行所有操作,例如 `<flow_name>`、`<flow_name>` checking the inventory和 charging the customer `<flow_name>  creating a shipment`。

工作原理
让我们更详细地探讨一下它的技术原理。编排的关键在于它通常协调的是异步操作,这意味着我们无法确切地知道某个操作何时完成。为了避免这种情况,持久化函数会在关闭时保存状态,从而产生运行开销。

当编排函数需要执行更多工作时(例如,收到响应消息或持久计时器到期),编排器会唤醒并从头开始重新执行整个函数,以重建本地状态。

等等,要重新运行所有程序吗?

不用担心,在重放过程中,如果代码尝试调用函数(或执行任何其他异步操作),持久任务框架会查询当前编排的执行历史记录。如果发现该活动函数已经执行并产生了结果,它会重放该函数的结果,然后编排器代码继续运行。

哦,好的,听起来不错。

重放会持续进行,直到函数代码执行完毕或安排了新的异步工作为止。

资源

 实验室 - 简单的活动流程

我们相信学习的最佳方式就是动手实践。那么该如何操作呢?其实很简单。使用 VS Code,我们可以安装一个插件,让这个过程变得非常轻松。

创建我们的项目

打开命令面板或键入COMMAND + SHIFT + P

然后我们选择以下内容,以创建一个新项目

接下来我们需要选择一种语言,比如我们选择英语C#。然后我们会看到以下选项列表:

选择 Durable Functions Orchestration,然后为您的函数命名。

接下来,系统会要求您选择一个存储帐户,您需要选择“<username>” Subscription和 Storage account “<username>  Resource group”。这是因为当您保存函数状态时,需要将其保存到某个位置以便稍后恢复。

您的项目现在应该如下所示:

-| obj/
-| bin/
-| .gitignore
-| <name of directory>.csproj
-| local.settings.json
-| Orchestration.cs

让我们仔细看看Orchestration.cs

using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace Company.Function
{
    public static class Orchestration
    {
        [FunctionName("Orchestration")]
        public static async Task<List<string>> RunOrchestrator(
            [OrchestrationTrigger] IDurableOrchestrationContext context)
        {
            var outputs = new List<string>();

            // Replace "hello" with the name of your Durable Activity Function.
            outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Tokyo"));
            outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Seattle"));
            outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "London"));

            // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
            return outputs;
        }

        [FunctionName("Orchestration_Hello")]
        public static string SayHello([ActivityTrigger] string name, ILogger log)
        {
            log.LogInformation($"Saying hello to {name}.");
            return $"Hello {name}!";
        }

        [FunctionName("Orchestration_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
            [DurableClient] IDurableOrchestrationClient starter,
            ILogger log)
        {
            // Function input comes from the request content.
            string instanceId = await starter.StartNewAsync("Orchestration", null);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
    }
}

上方图片展示了创建项目时系统自动生成的三个函数。每个函数都使用了FunctionName("<name of function>")装饰器进行修饰。运行时环境正是通过这种方式识别这些函数,并据此生成端点等功能。 

 解释文物

好的,我们创建了三个不同的函数或工件,分别是 `a`  orchestrator function、`b` HTTP start/client function和 `c`  activity function。这一切是如何运作的呢?

一切都始于一个 HttpStart 启动所有程序的函数。该函数随后启动 Orchestrator,Orchestrator 又会启动 Orchestrator 中指定的 Activity 函数。听起来有点理论化,但让我们深入了解一下这些组件,看看代码中发生了什么。

 HttpStart

如上所述,这是整个流程的启动函数。让我们来看看它的源代码,并讨论一下它的作用:

[FunctionName("Orchestration_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
  [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
  [DurableClient] IDurableOrchestrationClient starter,
   ILogger log)
{
  // Function input comes from the request content.
  string instanceId = await starter.StartNewAsync("Orchestration", null);

  log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
  return starter.CreateCheckStatusResponse(req, instanceId);
}

从上面的内容可以看出,我们有三个输入参数:

  • `req`参数的类型为IHttpRequestMessage`<string>`,表示传入的 Web 请求。请注意,此参数带有 `HttpTrigger` 装饰器,并有两个输入参数:`AuthorizationLevel` 以及字符串 `"get"` 和 `"post"`。这指示此方法监听使用 HTTP 动词 GET 和 POST 的 Web 请求。  
  • starter,类型为IDurableOrchestrationClient,这是允许我们控制总体流程的客户端。
  • 该实例允许我们记录ILogger不同的消息,Azure 可以订阅这些消息,并且在我们开发项目时,这些消息也会在终端中显示。

接下来,我们的初始实例调用 StartNewAsync() 会生成一个 instanceId。这 instanceId 是一个指向此特定函数调用的引用或处理程序。对于本演示来说,这并不重要,但在第二个演示中我们将使用该信息。
最后,我们通过调用来创建 HTTP 响应 CreateCheckStatusResponse()

编曲家

接下来我们来看看 Orchestrator。所有有趣的事情都发生在这里,我们在这里设置流程,决定何时调用哪个函数以及为什么调用。让我们来看一下代码:

[FunctionName("Orchestration")]
public static async Task<List<string>> RunOrchestrator(
  [OrchestrationTrigger] IDurableOrchestrationContext context)
{
  var outputs = new List<string>();

  // Replace "hello" with the name of your Durable Activity Function.
  outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Tokyo"));
  outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Seattle"));
  outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "London"));

  // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
  return outputs;
}

首先我们看到的是一个方法 FunctionName("Orchestration")。它有一个类型为 context 的输入参数IDurableOrchestrationContext。这个参数不仅可以控制流程,还可以调用辅助函数和活动函数,而活动函数负责执行繁重的工作。当 context 被调用时 CallActivitySync(),它会调用一个指定名称的活动函数 Orchestration_Hello,并传递一个字符串。

编曲_Hello

接下来是活动函数。所有繁重的工作都在这里完成。

 

[FunctionName("Orchestration_Hello")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
  log.LogInformation($"Saying hello to {name}.");
  return $"Hello {name}!";
}

我们看到它立即返回,但这可能是一个耗时较长的活动。关键在于,无论它是在一毫秒内完成还是需要一些时间,都无关紧要,编排函数仍然必须等待它完成。想象一下,这个函数可以执行更多操作,例如与数据库交互或发出 HTTP 请求。

 调试

你可能觉得到目前为止一切都理解了,但只有当你看到调试流程实际运行时,才会真正明白。所以接下来我们要做的就是,从 VS Code 启动我们的持久函数,这样你就能看到断点是如何触发的,以及何时触发的。

现在我们准备进行调试,让我们 通过VS Code 菜单中的“调试”选项 来启动调试。Run/ Start

终端应该会打印出类似这样的信息:

Http Functions:
  Orchestration_HttpStart: [GET,POST] http://localhost:7071/api/Orchestration_HttpStart

接下来,我们需要按照上述步骤,通过调用客户端函数路由来启动整个流程 http://localhost:7071/api/Orchestration_HttpStart。我们需要通过调用提到的 URL 来开始整个过程​​。

1)首先发生的是我们的 HttpStart 函数被调用:

HTTP 启动断点

我们让调试器继续运行。

2)接下来,它 Orchestration function 正在被击中。

管弦乐

接下来我们进入下一个断点,可以看到我们的活动。

3)接下来,该函数 Orchestration_Hello 将被执行。

活动功能断点

4) 我们推进了断点,发现自己又回到了编排函数中。

回到编曲家

如您所见,活动函数和协调器之间会一直这样循环往复,直到协调器完成为止。

HttpStart 响应

在我们的浏览器中,最终会跳转到类似这样的页面,这是通过名为 的方法返回的 HTTP 响应 HttpStart

// 20200721125052
// http://localhost:7071/api/Orchestration_HttpStart

{
  "id": "151c37c2bbc14f659ffb0a4d3ed9e54e",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/151c37c2bbc14f659ffb0a4d3ed9e54e?taskHub=TestHubName&connection=Storage&code=/XsKEv/uaBc41kmo1ayzMMhI7mh5fNazn4azktrpMbJtuEnSnpm1lA==",
  "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/151c37c2bbc14f659ffb0a4d3ed9e54e/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=/XsKEv/uaBc41kmo1ayzMMhI7mh5fNazn4azktrpMbJtuEnSnpm1lA==",
  "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/151c37c2bbc14f659ffb0a4d3ed9e54e/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=/XsKEv/uaBc41kmo1ayzMMhI7mh5fNazn4azktrpMbJtuEnSnpm1lA==",
  "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/151c37c2bbc14f659ffb0a4d3ed9e54e?taskHub=TestHubName&connection=Storage&code=/XsKEv/uaBc41kmo1ayzMMhI7mh5fNazn4azktrpMbJtuEnSnpm1lA=="
}


 

此时此刻,我们最想知道的是,我们最终产出了什么?答案就在名为 的网址中 statusQueryGetUri。让我们点击该链接:

// 20200721122529
// http://localhost:7071/runtime/webhooks/durabletask/instances/e1d7e237acb74834a02854050805712e?taskHub=TestHubName&connection=Storage&code=/XsKEv/uaBc41kmo1ayzMMhI7mh5fNazn4azktrpMbJtuEnSnpm1lA==

{
  "name": "Orchestration",
  "instanceId": "e1d7e237acb74834a02854050805712e",
  "runtimeStatus": "Completed",
  "input": null,
  "customStatus": null,
  "output": [
    "Hello Tokyo!",
    "Hello Seattle!",
    "Hello London!"
  ],
  "createdTime": "2020-07-21T11:18:28Z",
  "lastUpdatedTime": "2020-07-21T11:24:25Z"
}

如上所示,我们 Orchestration 函数返回的响应是一个数组,其中包含了所有活动函数的响应,如下所示:

"output": [
  "Hello Tokyo!",
  "Hello Seattle!",
  "Hello London"
]

最终导致这种情况是因为我们编写代码的方式,我们是这样写的:

outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "Seattle"));
outputs.Add(await context.CallActivityAsync<string>("Orchestration_Hello", "London"));

 概括

关于持久函数还有很多东西需要学习,但我已经能听到你们中的一些人打鼾了,所以我们将把应用程序模式和扇出/扇入模式的实现等主题留到下一部分来讲。

所以,我希望你对此感到兴奋。

文章来源:https://dev.to/azure/learn-durable-functions-with-net-core-and-c-stateful-serverless-41f7