发布于 2026-01-06 1 阅读
0

使用 AWS Serverless 大规模管理 Webhook

使用 AWS Serverless 大规模管理 Webhook

Webhook 仍然是许多组织与第三方服务通信的首选方式。由于 Webhook 调用是事件驱动的,因此使用 Serverless 构建 Webhook 管理系统是一个不错的选择。在本文中,我将讨论如何使用 AWS Serverless 构建这样的 Webhook 管理系统。

在应用跟踪系统 (ATS) 中,每当发生与候选人相关的事件(例如:创建候选人、申请职位、候选人状态更改等)时,可能需要调用已注册的 webhook。

该实现方案分为 3 个部分。

  1. Webhook注册。
  2. 针对给定候选事件的 Webhook 调用。
  3. 查看 webhook 调用历史记录。

建筑学

架构 - Webhook 管理系统

状态机
状态机 - Webhook 管理系统

工作原理

Webhook 注册

  1. API 网关中公开了一个 API,该 API 代理到一个 lambda 函数(RegisterWebhookFunction)。
  2. 此 API 接受 companyId、URL 和 eventType(例如:candidate.created)。
  3. 此 Lambda 函数会生成一个哈希令牌,用于验证传入的 Webhook 调用。这些数据将保存到 DynamoDB 表中,生成的哈希令牌将作为响应返回。

调用 webhook

  1. 为了模拟“candidate.create”事件,我使用了一个 API 端点,该端点接受 companyId 和其他候选人的个人资料数据。
  2. CreateCandidate lambda 函数将接收此数据,并将数据保存到同一个 DyanamoDB 表中。
  3. 如您所见,创建候选人并不需要使用 API 端点,但这里唯一重要的是将候选人数据添加到 DynamoDB 表中。
  4. 候选数据保存到 DynamoDB 表后,将使用 DynamoDB Streams 执行TriggerResourceCreated函数。
  5. TriggerResourceCreated函数对 DynamoDB 流进行了筛选,检查事件名称和资源类型。在本例中,它检查事件名称是否为“INSERT”且资源类型是否为“candidate”。如果满足条件,则执行 lambda 函数,并将事件发布到 Event Bridge 事件总线。
  6. 已配置一个基于 EventBridge 规则执行的步骤函数。EventBridge 事件的详细信息对象将作为步骤函数执行的输入数据。
在阶跃函数中
  1. 步骤函数的输入中,仅包含 companyId、eventType 和 resourceType(例如:candidate)以及 resourceId(例如:candidateId)。
  2. 首先,Step Function 使用 DynamoDB 集成,通过 companyId 和 eventType 获取 webhook。
  3. 如果 webhook 存在,则数据进入转换状态;否则,将跳过执行。
  4. Webhook 数据转换完成后,下一步是根据 resourceId 和 resourceType 获取资源数据。为此,需要使用FetchResourceData lambda 函数。
  5. 如果资源数据不存在,则执行失败。
  6. 如果资源数据存在,则会触发CreateWebhookCall lambda 函数来准备 webhook 调用。这将在 DynamoDB 表中创建一条记录,其中包含 webhook 有效负载和验证令牌,状态为“待处理”。
  7. 然后将这些资源数据以及 webhook 数据发送到 SQS 队列。
  8. CallWebhook 函数会使用此 SQS 队列调用 Webhook,并将资源数据包含在有效负载中。此外,还会包含一个验证令牌,用于从外部验证 Webhook 调用的真实性。该验证令牌将使用注册 Webhook 时生成的哈希令牌。
  9. 在状态机中,此步骤已启用重试机制,因此如果发生错误,它将重试 3 次。此外,每次重试之间将有 1 分钟的等待时间。
  10. 如果经过所有重试后 webhook 调用仍然失败,则状态执行将使用sendTaskFailure回调通知,并附带一些有关失败原因的额外数据。
  11. 如果 webhook 调用成功,状态执行将通过sendTaskSuccess回调通知。
  12. 无论哪种情况,下一步,Step 函数都会集成 DynamoDB,以使用其状态(成功/失败)和来自 webhook 调用的附加信息来更新 webhook 调用数据。

获取 webhook 调用历史记录

  1. 提供了一个 API 端点,用于代理 lambda 函数,以从 DynamoDB 表中获取特定 companyId 的数据。
  2. 将返回所有已发出的 webhook 调用列表。

如何设置

您可以使用 AWS SAM 在您自己的 AWS 账户中进行设置。此外,为了测试/演示该应用程序,我还包含了一个使用 VueJS 构建的单页应用程序(位于前端目录中)。

  1. 该应用程序的源代码可在https://github.com/pubudusj/webhook_management获取。
  2. 要设置后端,请sam build && sam deploy -g在后端目录中运行。
  3. VUE_APP_API_BASE_URL后端设置完成后,复制 ApiBaseUrl 参数并将其复制到前端目录的 .env 文件中,作为环境变量的值。
  4. 运行npm run serve此程序可在本地计算机上进行测试或run run build --production输出构建结果。

演示

您可以在https://webhooks.pubudu.dev查看此功能的实际应用。

您可以创建一个 Webhook,提供 URL 和公司 ID。然后,为同一个公司 ID 创建一个用户。之后,如果您使用同一个公司 ID 搜索历史记录,您会看到一条状态为“待处理”的记录。如果您再次搜索,可以看到状态已更改。

请注意:由于失败的 webhook 调用会进行重试,因此大约需要 2-3 分钟才能将 webhook 调用标记为失败。成功的调用几乎会立即更新。

请注意:token可以通过将有效负载中的值与输出结果进行匹配来验证 Webhook 调用hmacSHA256(candidateId + createdAt, signingToken)。这里的 singingToken 是创建 Webhook 时生成的哈希值。

要点/经验教训

  1. 这里我使用了 DynamoDB 单表设计。因此,DynamoDB 流中会生成大量事件。幸运的是,随着最近 DynamoDB 流数据过滤功能的推出,Lambda 函数现在可以根据特定模式仅使用过滤后的事件。(以前,这需要在 Lambda 函数内部处理。)
  2. 在 Step Functions 的 DynamoDB 集成中,GetItem 集成会返回包含所有数据类型键的数据。除非您确切了解数据和对象结构,否则无法在 Step Functions 中对其进行筛选。因此,我不得不使用 Lambda 函数从 DynamoDB 获取数据,而不是直接集成。
  3. 由于该系统依赖于外部 webhook URL 的可用性,因此重试机制在 Step Functions 内部使用。设置重试次数和重试间隔时间非常简单。
  4. 这里我使用 Step Functions Workflow Studio 创建并导出状态机。
  5. 对于 SF-DynamoDB 集成,我使用了原生优化集成,而不是新的 SDK 集成。两者性能应该相同。
  6. 为了演示目的,我在同一堆栈中创建了事件总线,但在实际场景中,这可以是其他服务共享的公共事件总线。

欢迎提出反馈意见

请尝试一下,并告诉我你的想法。

继续建设!继续分享!

文章来源:https://dev.to/aws-builders/manage-webhooks-at-scale-with-aws-serverless-fof