发布于 2026-01-06 11 阅读
0

AWS Serverless 入门 - 使用 Aurora 进行 SQL

AWS Serverless 入门 - 使用 Aurora 进行 SQL

太长不看

在本系列文章中,我将尝试讲解 AWS 上无服务器架构的基础知识,帮助您构建自己的无服务器应用程序。在上一篇文章中,我们学习了如何使用 SQS 来缓冲触发 Lambda 函数的事件。今天,我们将探讨如何使用 Aurora Serverless 进行 SQL 存储!

我们今天要做什么?

  • 创建 Aurora Serverless SQL 数据库
  • 创建与数据库交互的 Lambda 函数
  • 额外福利:构建一个超级简单的迁移系统

⬇️ 我会定期发布无服务器内容,如果您想了解更多 ⬇️

关注我的推特🚀

快速公告:我还在开发一个名为🛡 sls-mentor 🛡的库。它汇集了 30 条无服务器最佳实践,可以自动检查您的 AWS 无服务器项目(无论使用什么框架)。它是免费开源的,欢迎体验!

在 GitHub 上找到 sls-mentor ⭐️

SQL Serverless 存储:这怎么可能???

我知道,SQL引擎并非以无服务器特性著称,因为它们本质上需要服务器才能运行。AWS也不例外:其SQL服务AWS RDS是托管服务,但需要预置服务器,并且会产生固定费用。

不过别走,我还有好消息要告诉你!AWS 发布了 Aurora Serverless,它是一个兼容 MySQL 和 Postgres 的 SQL 引擎,可以自动扩展并且完全托管。它甚至可以在不使用时自动暂停:有些人(并非所有人)会说它符合无服务器的定义!

使用 AWS CDK 配置 Aurora Serverless 集群

本文将创建一个包含用户表的 SQL 数据库users。用户表包含 id、firstName 和 lastName 三个字段。然后,我们将创建两个 Lambda 函数:AddUser 函数用于在数据库中创建新用户,GetUsers 函数用于从数据库中检索所有用户。这两个函数都将通过 API 网关对外开放。

建筑外观将如下所示:

基本模式

和往常一样,本文将使用 AWS CDK。如果您还不熟悉 CDK,建议您先阅读本系列之前的文章以入门。创建新的 CDK 项目后,我们将配置一个新的 Aurora 集群。

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import path from 'path';

export class LearnServerlessStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const dbSecret = new cdk.aws_rds.DatabaseSecret(this, 'AuroraSecret', {
      username: 'admin',
    });

    const cluster = new cdk.aws_rds.ServerlessCluster(this, 'AuroraCluster', {
      engine: cdk.aws_rds.DatabaseClusterEngine.AURORA_MYSQL,
      credentials: cdk.aws_rds.Credentials.fromSecret(dbSecret),
      clusterIdentifier: 'my-aurora-cluster',
      defaultDatabaseName: 'my_database',
      enableDataApi: true,
      scaling: {
        autoPause: cdk.Duration.minutes(10),
        minCapacity: 2,
        maxCapacity: 16,
      },
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

在这段代码片段中,我们主要做了两件事:

  • 创建一个密钥来存储数据库凭证,以便稍后从 Lambda 函数连接到数据库。请注意,我没有指定密码,它将由 AWS 自动生成(您绝对不想将其存储在代码中!!!)。

  • 创建一个 ServerlessCluster,它是 Aurora Serverless 的一种构造。

    • 我们指定数据库引擎(MySQL,PostgreSQL 也可用)。
    • 我们使用之前创建的密钥来指定凭据。
    • 我们指定一个默认数据库名称:创建完成后,它将在集群中为我们创建一个数据库。
    • 我们启用了 Aurora Serverless 的一个非常重要的功能——数据 API,它允许我们使用 HTTP 请求与数据库进行交互,这将大大简化我们 Lambda 函数的开发。
    • 最后,我们设置了自动扩缩容:最小和最大容量以 ACU 为单位(了解更多),它会根据负载自动向上或向下扩容。自动暂停是指集群暂停前不活动的时间,这将帮助我们节省成本(暂停期间仅对存储收费)。

我们还可以添加很多网络功能、高级安全功能等等……但 Aurora Serverless 的优点在于它开箱即用,只需极少的配置!

使用 Lambda 函数与 Aurora Serverless 进行交互

现在,我们来创建两个 Lambda 函数来与新数据库交互。第一个函数将在数据库中创建一个新用户,第二个函数将从数据库中检索所有用户。首先,我们使用 AWS CDK 来配置 Lambda 函数。

// ...previous code, still in the constructor

const api = new cdk.aws_apigateway.RestApi(this, 'api', {});

const addUser = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'addUser', {
  entry: path.join(__dirname, 'addUser', 'handler.ts'),
  handler: 'handler',
  environment: {
    CLUSTER_ARN: cluster.clusterArn,
    SECRET_ARN: cluster.secret?.secretArn ?? '',
  },
  timeout: cdk.Duration.seconds(30),
});

cluster.grantDataApiAccess(addUser);

const getUsers = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'getUsers', {
  entry: path.join(__dirname, 'getUsers', 'handler.ts'),
  handler: 'handler',
  environment: {
    CLUSTER_ARN: cluster.clusterArn,
    SECRET_ARN: cluster.secret?.secretArn ?? '',
  },
  timeout: cdk.Duration.seconds(30),
});

cluster.grantDataApiAccess(getUsers);

api.root.addResource('add-user').addMethod('POST', new cdk.aws_apigateway.LambdaIntegration(addUser));
api.root.addResource('get-users').addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(getUsers));
Enter fullscreen mode Exit fullscreen mode

这里,我们创建了两个 Lambda 函数,并将它们接入 API。AddUsers 函数由 POST 请求触发,GetUsers 函数由 GET 请求触发。重要提示:我们还授予 Lambda 函数访问集群数据 API 的权限,以便它们可以与数据库交互。此外,我们将集群 ARN 和密钥 ARN 作为环境变量传递,以便在 Lambda 函数代码中使用。我设置了 30 秒的超时时间,因为当数据库暂停时,数据 API 的响应速度可能会较慢。

现在,到了有趣的部分:Lambda 函数的代码。我们先从 addUser 函数开始。

import { ExecuteStatementCommand, RDSDataClient } from '@aws-sdk/client-rds-data';
import { v4 as uuid } from 'uuid';

const rdsDataClient = new RDSDataClient({});

export const handler = async ({ body }: { body: string }): Promise<{ statusCode: number; body: string }> => {
  const secretArn = process.env.SECRET_ARN;
  const resourceArn = process.env.CLUSTER_ARN;

  if (secretArn === undefined || resourceArn === undefined) {
    throw new Error('Missing environment variables');
  }

  const { firstName, lastName } = JSON.parse(body) as { firstName?: string; lastName?: string };

  if (firstName === undefined || lastName === undefined) {
    return {
      statusCode: 400,
      body: 'Missing firstName or lastName',
    };
  }

  const userId = uuid();

  await rdsDataClient.send(
    new ExecuteStatementCommand({
      secretArn,
      resourceArn,
      database: 'my_database',
      sql: sql: 'CREATE TABLE IF NOT EXISTS users (id VARCHAR(255) NOT NULL PRIMARY KEY, firstName VARCHAR(255) NOT NULL, lastName VARCHAR(255) NOT NULL); INSERT INTO users (id, firstName, lastName) VALUES (:id, :firstName, :lastName);',
      parameters: [
        { name: 'id', value: { stringValue: userId } },
        { name: 'firstName', value: { stringValue: firstName } },
        { name: 'lastName', value: { stringValue: lastName } },
      ],
    }),
  );

  return {
    statusCode: 200,
    body: JSON.stringify({
      userId,
    }),
  };
};
Enter fullscreen mode Exit fullscreen mode

这段代码片段主要做了三件事:

  • 获取我们之前传递的环境变量,并检查它们是否已定义。
  • 解析请求体,并提取名字和姓氏。
  • 在数据库中执行两条 SQL 命令my_databaseusers如果表不存在则创建表,然后将新用户插入到表中。

⚠️ 请记住:我们可以执行简单的 rdsDataClient 命令,因为我们在集群上启用了数据 API,否则情况会复杂得多。

现在,让我们来看一下 getUsers 函数的代码。

import { ExecuteStatementCommand, RDSDataClient } from '@aws-sdk/client-rds-data';

const rdsDataClient = new RDSDataClient({});

export const handler = async (): Promise<{ statusCode: number; body: string }> => {
  const secretArn = process.env.SECRET_ARN;
  const resourceArn = process.env.CLUSTER_ARN;

  if (secretArn === undefined || resourceArn === undefined) {
    throw new Error('Missing environment variables');
  }

  const { records } = await rdsDataClient.send(
    new ExecuteStatementCommand({
      secretArn,
      resourceArn,
      database: 'my_database',
      sql: 'SELECT * FROM users;',
    }),
  );

  const users =
    records?.map(([{ stringValue: id }, { stringValue: firstName }, { stringValue: lastName }]) => ({
      id,
      firstName,
      lastName,
    })) ?? [];

  return {
    statusCode: 200,
    body: JSON.stringify(users, null, 2),
  };
};
Enter fullscreen mode Exit fullscreen mode

在这个处理程序中,我们主要做三件事:

  • 获取我们之前传递的环境变量,并检查它们是否已定义。
  • 使用 RDS 数据 API,向users数据库名称的表中执行 SELECT 语句。my_database
  • 将查询结果格式化,并以 JSON 响应的形式返回。

完成了!现在可以部署和测试我们的API了。

npm run cdk deploy
Enter fullscreen mode Exit fullscreen mode

我使用简单的 POST 请求来添加用户:添加用户

以及一个用于检索所有用户的 GET 请求:获取用户

有哪些可以改进的地方?

这段代码可以运行,但并不完美。以下是一些可以改进的地方:

  • 查询和类型管理:我们在这里使用纯 SQL,但我们可以使用像TypeORM这样的 ORM来获得更好的类型,并为我们的表提供精确的模式。
  • 迁移:请注意,每次添加新用户时,如果表不存在,我们都会尝试创建该表users。这并非最佳方案,如果我们想向表中添加新列该怎么办?我们应该使用迁移系统来管理数据库模式的演变。

今天不打算介绍 TypeORM 等 ORM 框架,但我们可以了解一下迁移。

使用 DynamoDB 创建一个超级简单的迁移系统

什么是数据库迁移?简单来说,它们是描述数据库模式演变的指令。迁移必须按顺序执行,并且每个迁移操作只能执行一次。迁移可以包括创建或删除表、创建或删除列、插入数据等等。

为了简化迁移操作,我们可以将已运行的迁移记录存储在 DynamoDB 表中。然后,我们可以检查哪些迁移已经运行过,并运行那些尚未运行的迁移。让我们创建一个新的 CDK 堆栈来实现这一点。

完成后,最终的架构图将如下所示:

最终架构

首先,让我们更新 CDK 代码,创建一个 DynamoDB 表和一个 runMigrations Lambda 函数。

const migrationsTable = new cdk.aws_dynamodb.Table(this, 'migrationsTable', {
  partitionKey: {
    name: 'PK',
    type: cdk.aws_dynamodb.AttributeType.STRING,
  },
  sortKey: {
    name: 'SK',
    type: cdk.aws_dynamodb.AttributeType.STRING,
  },
  billingMode: cdk.aws_dynamodb.BillingMode.PAY_PER_REQUEST,
});

const runMigrations = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'runMigrations', {
  entry: path.join(__dirname, 'runMigrations', 'handler.ts'),
  handler: 'handler',
  environment: {
    DYNAMODB_TABLE_NAME: migrationsTable.tableName,
    CLUSTER_ARN: cluster.clusterArn,
    SECRET_ARN: cluster.secret?.secretArn ?? '',
  },
  timeout: cdk.Duration.seconds(180),
});

migrationsTable.grantReadWriteData(runMigrations);
cluster.grantDataApiAccess(runMigrations);
Enter fullscreen mode Exit fullscreen mode

我们授予 runMigrations Lambda 对 DynamoDB 表的完全访问权限以及对 RDS 数据 API 的访问权限。

现在到了有趣的部分,即 runMigrations Lambda 函数的代码。

import { DynamoDBClient, GetItemCommand, PutItemCommand } from '@aws-sdk/client-dynamodb';
import { ExecuteStatementCommand, RDSDataClient } from '@aws-sdk/client-rds-data';

// Array describing the migrations we want to run
const migrations: { id: string; statement: string }[] = [
  {
    id: 'migration-1',
    statement:
      'CREATE TABLE IF NOT EXISTS users (id VARCHAR(255) NOT NULL PRIMARY KEY, firstName VARCHAR(255) NOT NULL, lastName VARCHAR(255) NOT NULL);',
  },
  {
    id: 'migration-2',
    statement: `INSERT INTO users (id, firstName, lastName) VALUES ('1', 'John', 'Doe');`,
  },
];

const rdsDataClient = new RDSDataClient({});
const dynamoDBClient = new DynamoDBClient({});

export const handler = async (): Promise<void> => {
  const secretArn = process.env.SECRET_ARN;
  const resourceArn = process.env.CLUSTER_ARN;
  const dynamoDBTableName = process.env.DYNAMODB_TABLE_NAME;

  if (secretArn === undefined || resourceArn === undefined || dynamoDBTableName === undefined) {
    throw new Error('Missing environment variables');
  }

  // Run migrations in order
  for (const { id, statement } of migrations) {
    // Check if migration has already been executed
    const { Item: migration } = await dynamoDBClient.send(
      new GetItemCommand({
        TableName: dynamoDBTableName,
        Key: {
          PK: { S: 'MIGRATION' },
          SK: { S: id },
        },
      }),
    );

    if (migration !== undefined) {
      continue;
    }

    // Execute migration
    await rdsDataClient.send(
      new ExecuteStatementCommand({
        secretArn,
        resourceArn,
        database: 'my_database',
        sql: statement,
      }),
    );

    // Mark migration as executed
    await dynamoDBClient.send(
      new PutItemCommand({
        TableName: dynamoDBTableName,
        Item: {
          PK: { S: 'MIGRATION' },
          SK: { S: id },
        },
      }),
    );

    console.log(`Migration ${id} executed successfully`);
  }
};
Enter fullscreen mode Exit fullscreen mode

代码看起来很复杂,但实际上很简单。

  • 首先,我们在代码中定义一个要运行的迁移列表。每个迁移都由一个唯一的 ID 和一个 SQL 语句描述。
  • 然后,在处理程序内部:
    • 我们解析所需的环境变量。
    • 我们按顺序遍历要运行的迁移。
    • 对于每个迁移,我们都会检查该迁移是否存在于 DynamoDB 表中,以此来判断它是否已经运行过。
    • 如果迁移尚未运行,我们将使用 RDS 数据 API 执行迁移。
    • 最后,我们将迁移结果保存到 DynamoDB 表中,以此标记迁移已执行完毕。

就这些!如果你愿意,可以将此 Lambda 函数连接到 API,或者保持手动触发。你还可以向列表中添加更多迁移,它们将按顺序执行。

结论

本文是对 Aurora Serverless 的入门介绍。我们了解了如何创建集群、如何连接到集群以及如何运行非常简单的迁移。掌握了这些基本知识,您已经可以完成很多工作,例如创建多个数据库、执行复杂的请求等等。

如果您想更进一步,我强烈建议您了解一下TypeORMPrisma等 ORM 框架。它们能极大地帮助您与数据库交互、创建迁移,并且与 Aurora Serverless 兼容。

我计划继续以双月刊的形式更新这个系列文章。我已经介绍了如何创建简单的 Lambda 函数和 REST API,以及如何与 DynamoDB 数据库和 S3 存储桶进行交互。您可以在我的代码仓库中关注更新进度!接下来,我将介绍前端部署、类型安全、更高级的模式等新主题……如果您有任何建议,欢迎随时联系我!

如果您能点赞并分享这篇文章给您的朋友和同事,我将不胜感激。这对我扩大读者群非常有帮助。另外,别忘了订阅,以便在下一篇文章发布时收到通知!

如果你想和我保持联系,这是我的推特账号。我经常发布或转发一些关于AWS和无服务器架构的有趣内容,欢迎关注我!

关注我的推特🚀

文章来源:https://dev.to/slsbytheodo/learn-serverless-on-aws-step-by-step-sql-with-aurora-5hn1