ESP32 到 AWS:基于 Golang 的 IoT Core、DynamoDB 和 Lambda 函数的完整物联网解决方案
介绍
介绍
在本篇博文中,我们将把 ESP32 微控制器连接到 AWS IoT Core。我们将使用 DynamoDB 进行数据存储,并触发 AWS Lambda 函数,所有代码均使用 Golang 编写。本指南非常适合任何对使用 ESP32 和 AWS 构建物联网项目感兴趣的人。我们将逐步讲解,即使您是这些技术的新手,也能轻松上手。让我们开始吧!
所采用的技术和方法
在这个项目中,我们运用了一系列技术和方法,以实现ESP32微控制器与AWS服务之间的无缝集成。以下是我们使用的技术概览:
硬件方面:
- PlatformIO:一个用于物联网开发的开源生态系统。
- Arduino:我们ESP32微控制器的编程平台。
- C++(CPP):用于 ESP32 开发的编程语言。
- knolleary/pubsubclient:MQTT 消息传递客户端库。
- LittleFS:一种用于在 ESP32 上处理文件的文件系统。
云端和后端:
- AWS IoT Core:管理和连接物联网设备。
- DynamoDB:AWS 的 NoSQL 数据库服务,用于处理设备数据。
- Lambda 函数:AWS 服务,用于响应事件运行后端代码。
- SAM CLI:用于管理 AWS 资源的 AWS CLI 工具。
- Golang:用于编写 AWS Lambda 函数的编程语言。
创建 PlatformIO 项目:第一步
首先,我们创建一个项目文件夹。我们将使用 PlatformIO 在该文件夹内创建一个新项目。我们将项目命名为esp32_to_aws_hardware_example。然后,我们将在 VSCode 中打开该项目。
mkdir esp32_to_aws_hardware_example
cd esp32_to_aws_hardware_example
code .
按下 Ctrl F1+C 打开命令面板,然后输入 Ctrl+C PlatformIO: New Terminal。这将在 VSCode 中打开一个新的终端。我们将使用此终端运行本指南中的所有命令。
因此,我们可以使用以下命令创建一个新项目:
首先,我们需要删除所有缓存文件。然后我们就可以创建一个新项目了。此命令是可选的,您可以跳过它。
pio system prune -f
项目初始化:
platformio init
向 PlatformIO 添加库和板信息
在 platformio.ini 文件中,我们可以添加库和开发板信息。我们需要将以下库添加到我们的项目中:
[env:esp32doit-devkit-v1]
platform = espressif32
board = esp32doit-devkit-v1
framework = arduino
monitor_speed=115200
upload_port=/dev/cu.usbserial-0001
lib_extra_dirs = lib
board_build.filesystem = littlefs
lib_deps =
bblanchon/ArduinoJson@^6.18.5
knolleary/pubsubclient@^2.8
接下来,我想谈谈 `lib`lib_extra_dirs和 ` board_build.filesystemlittleFS` 参数。我们需要lib_extra_dirs在 platformio.ini 文件中添加 `lib` 参数才能使用 lib 文件夹中的库。我们需要board_build.filesystem在 platformio.ini 文件中添加 `littleFS` 参数才能使用 LittleFS 文件系统。
安装软件包
要安装这些库,我们需要运行以下命令:
pio pkg install
退出项目并重启 VSCode。然后我们就可以在项目中看到 platformIO 任务了:
初始化完成。现在我们可以开始编写代码了。
笔记:
当我尝试使用 PlatformIO 的界面创建新项目时,遇到了问题。所以我改用命令行创建项目。我不明白为什么会失败。如果您知道原因,请在评论区告诉我。
使用 SSL 证书在 AWS IoT Core 中设置您的第一个“事物”
在本部分中,我们将在 AWS IoT Core 中设置一个“设备”。这个“设备”代表我们在 AWS 云中的 ESP32 微控制器。我们还将下载安全通信所需的 SSL 证书。具体操作如下:
1.登录 AWS IoT Core:
- 首先,登录到您的 AWS 管理控制台并导航到 IoT Core。
2.制定策略:
- 在 AWS IoT Core 控制台中,导航至“安全”,然后选择“策略”。
- 点击“创建保单”。
- 给你的政策起个名字,例如
iot_policy: - 设置策略文档的JSON结构如下:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["iot:*"],
"Resource": "*"
}
]
}
iot:*此策略允许您的“事物”对所有资源执行所有操作*。
3.创建“事物”:
- 转到“管理”部分,然后单击“事物”。
- 选择“创建”开始创建新“事物”。
- 选择“创建单个项目”。
- 点击“下一步”。
- 按照以下步骤为您的“事物”命名并完成创建。在本指南中,我们将为我们的“事物”命名
esp32_test1。 - 点击“下一步”。
- 选择“自动生成新证书(推荐)”。
- 选择您在上一步中创建的策略(iot_policy)。
- 点击“创建事物”。
4.下载 SSL 证书:
- 创建好设备后,AWS IoT Core 会提示您创建证书。点击“创建证书”。
- 请下载以下三个必要的文件:
- 证书文件(
blablabla-certificate.pem.crt)。 - 私钥文件(
blablabla-private.pem.key)。 - Amazon Root CA 1 (
AmazonRootCA1.pem) 文件。
- 证书文件(
- 请务必妥善保存这些文件,因为它们对于 ESP32 与 AWS IoT Core 的安全通信至关重要。
文件名太长了,你可以修改一下。我也这么做了。我把文件名改成了这样:
blalblablabla-certificate.pem.crt到certificate.pem.crtblalblablabla-private.pem.key到private.pem.keyAmazonRootCA1.pem到aws_cert_ca.pem
更改文件名后,我们的项目中可以使用以下 3 个文件:
certificate.pem.crtprivate.pem.keyaws_cert_ca.pem
配置 ESP32 文件系统以连接 AWS IoT Core
在本部分,我们将配置 ESP32 文件系统以连接到 AWS IoT Core。我们将使用 LittleFS 文件系统来存储 SSL 证书和 AWS IoT Core 端点。以下是具体步骤:
1.创建一个名为“data”的文件夹:
在项目的根目录中,创建一个名为 . 的文件夹data。该文件夹将包含我们需要存储在 ESP32 文件系统中的所有文件。
注意:该data文件夹必须位于项目的根目录下。否则,ESP32 将无法找到这些文件。
mqtt_config.json 文件将包含 AWS IoT Core 端点、端口、客户端 ID 和要发布数据的主题。
{
"port": 8883,
"host": "YOUR_AWS_IOT_CORE_ENDPOINT",
"clientId": "esp_test1",
"publishTopic": "esp32/sensor/test"
}
要获取主机信息,我们需要前往 AWS IoT Core 控制台并点击相应的Settings选项卡。然后我们就可以看到终端节点信息了。
连接到 AWS IoT Core 时,使用端口 8883 进行安全连接。
clientId 是我们在 AWS IoT Core 中创建的事物的名称。
publishTopic 是我们将要发布数据的主题名称。
wifi_config.json 文件将包含 ssid 和密码。
{
"ssid": "your_wifi_name",
"password": "your_wifi_password"
}
编写用于 AWS IoT Core 连接的 ESP32 代码
在本部分,我们将编写用于连接到 AWS IoT Core 的 ESP32 代码。我们将使用 PubSubClient 库,通过 MQTT 协议连接到 AWS IoT Core。
首先,我们需要创建 WiFi、MQTT 和证书配置的模型。我们将在 lib 文件夹下的 model 文件夹中创建以下模型。
WifiCredentialModel.h
// lib/model/WifiCredentialModel.h
// WifiCredentialModel class definition.
#ifndef WIFICREDENTIALMODEL_H
#define WIFICREDENTIALMODEL_H
#include <Arduino.h>
class WifiCredentialModel
{
public:
String ssid;
String password;
WifiCredentialModel() : ssid(""), password(""){};
WifiCredentialModel(String ssid, String password) : ssid(ssid), password(password){};
bool isEmpty()
{
return ssid == "" || password == "";
}
};
#endif
MqttCredentialModel.h
// lib/model/MqttCredentialModel.h
// MqttCredentialModel class definition.
#ifndef MQTTCREDENTIALMODEL_H
#define MQTTCREDENTIALMODEL_H
#include <Arduino.h>
class MqttCredentialModel
{
public:
int port;
String host;
String clientId;
String publishTopic;
MqttCredentialModel() : port(0), host(""), clientId(""), publishTopic(""){};
MqttCredentialModel(int port, String host, String clientId, String publishTopic) : port(port), host(host), clientId(clientId), publishTopic(publishTopic){};
bool isEmpty()
{
return port == 0 || host == "" || clientId == "" || publishTopic == "";
}
};
#endif
CertificateCredentialModel.h
// lib/model/CertificateCredentialModel.h
// CertificateCredentialModel definition.
#ifndef CERTIFICATECREDENTIALMODEL_H
#define CERTIFICATECREDENTIALMODEL_H
#include <Arduino.h>
class CertificateCredentialModel
{
public:
String ca;
String certificate;
String privateKey;
CertificateCredentialModel() : ca(""), certificate(""), privateKey(""){};
CertificateCredentialModel(String ca, String certificate, String privateKey) : ca(ca), certificate(certificate), privateKey(privateKey){};
bool isEmpty()
{
return ca == "" || certificate == "" || privateKey == "";
}
};
#endif
现在我们可以创建一个服务,该服务将从文件中读取 WiFi、MQTT 和证书配置,并返回相应的模型。我们将在 lib 文件夹下的 service 文件夹中创建以下服务。
ConfigService.h
// lib/service/ConfigService.h
// ConfigService class definition.
#ifndef CONFIGSERVICE_H
#define CONFIGSERVICE_H
#include <Arduino.h>
#include "../model/CertificateCredentialModel.h"
#include "../model/MqttCredentialModel.h"
#include "../model/WifiCredentialModel.h"
#include <FS.h>
class ConfigService
{
private:
fs::FS &fileSystem;
public:
ConfigService(fs::FS &fileSystem) : fileSystem(fileSystem){};
WifiCredentialModel getWifiCredential();
MqttCredentialModel getMqttCredential();
CertificateCredentialModel getCertificateCredential();
};
#endif
ConfigService.cpp
// lib/service/ConfigService.cpp
// ConfigService class implementation
#include "ConfigService.h"
#include <ArduinoJson.h>
#include <FS.h>
String readFile(fs::FS &fs, const char *path)
{
Serial.printf("Reading file: %s\r\n", path);
File file = fs.open(path, FILE_READ);
if (!file)
{
Serial.println("Failed to open file for reading");
return "";
}
return file.readString();
}
CertificateCredentialModel ConfigService::getCertificateCredential()
{
String ca = readFile(fileSystem, "/certs/aws_cert_ca.pem");
String certificate = readFile(fileSystem, "/certs/certificate.pem.crt");
String privateKey = readFile(fileSystem, "/certs/private.pem.key");
return CertificateCredentialModel(ca, certificate, privateKey);
}
WifiCredentialModel ConfigService::getWifiCredential()
{
File wifiConfigFile = fileSystem.open("/wifi_config.json", "r");
if (!wifiConfigFile)
{
Serial.println("Failed to open wifi_config.json file");
return WifiCredentialModel();
}
DynamicJsonDocument doc(1024);
DeserializationError error = deserializeJson(doc, wifiConfigFile);
if (error)
{
Serial.println("Failed to read file, using default configuration");
wifiConfigFile.close();
return WifiCredentialModel();
}
String ssid = doc["ssid"];
String password = doc["password"];
wifiConfigFile.close();
return WifiCredentialModel(ssid, password);
}
MqttCredentialModel ConfigService::getMqttCredential()
{
File mqttConfigFile = fileSystem.open("/mqtt_config.json", "r");
if (!mqttConfigFile)
{
Serial.println("Failed to open mqtt_config.json file");
return MqttCredentialModel();
}
DynamicJsonDocument doc(1024);
DeserializationError error = deserializeJson(doc, mqttConfigFile);
if (error)
{
Serial.println("Failed to read file, using default configuration");
mqttConfigFile.close();
return MqttCredentialModel();
}
String host = doc["host"];
int port = doc["port"];
String clientId = doc["clientId"];
String publishTopic = doc["publishTopic"];
mqttConfigFile.close();
return MqttCredentialModel(port, host, clientId, publishTopic);
}
我们读取数据文件夹中的文件。然后,我们使用 ArduinoJson 库反序列化 JSON 数据。最后,我们返回使用读取的数据创建的模型。
在 main.cpp 中整合所有内容:完成 ESP32-AWS 连接
在本部分,我们将编写 main.cpp 文件。我们将使用 PubSubClient 库通过 MQTT 连接到 AWS IoT Core。我们将使用之前创建的 ConfigService 从文件中读取 WiFi、MQTT 和证书配置。我们将使用之前创建的模型来存储这些配置。
main.cpp
#define LED_PIN 2
#include <Arduino.h>
#include <LittleFS.h>
#include <PubSubClient.h>
#include <WiFiClientSecure.h>
#include "../service/ConfigService.h"
WiFiClientSecure espClient;
PubSubClient client(espClient);
MqttCredentialModel mqttCredential;
WifiCredentialModel wifiCredential;
CertificateCredentialModel certificateCredential;
void setup()
{
Serial.begin(115200);
Serial.println("Starting...");
if (!LittleFS.begin())
{
Serial.println("An Error has occurred while mounting LittleFS");
return;
}
ConfigService configService(LittleFS);
wifiCredential = configService.getWifiCredential();
if (wifiCredential.isEmpty())
{
Serial.println("Wifi credential is empty");
return;
}
mqttCredential = configService.getMqttCredential();
if (mqttCredential.isEmpty())
{
Serial.println("Mqtt credential is empty");
return;
}
certificateCredential = configService.getCertificateCredential();
if (certificateCredential.isEmpty())
{
Serial.println("Certificate credential is empty");
return;
}
WiFi.begin(wifiCredential.ssid.c_str(), wifiCredential.password.c_str());
while (WiFi.status() != WL_CONNECTED)
{
digitalWrite(LED_PIN, HIGH);
delay(50);
digitalWrite(LED_PIN, LOW);
delay(50);
digitalWrite(LED_PIN, HIGH);
delay(50);
digitalWrite(LED_PIN, HIGH);
delay(1000);
Serial.print(".");
}
Serial.println("WiFi connected");
// Set the certificates to the client
espClient.setCACert(certificateCredential.ca.c_str());
espClient.setCertificate(certificateCredential.certificate.c_str());
espClient.setPrivateKey(certificateCredential.privateKey.c_str());
client.setServer(mqttCredential.host.c_str(), mqttCredential.port);
while (!client.connected())
{
Serial.println("Connecting to AWS IoT...");
if (client.connect(mqttCredential.clientId.c_str()))
{
Serial.println("Connected to AWS IoT");
}
else
{
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
void loop() {}
测试 ESP32-AWS 连接
要测试 esp32-aws 连接,我们需要构建代码并上传文件夹到 esp32。我们可以使用 platformIO 任务来完成此操作。我们需要运行以下任务。
- 构建文件系统映像
- 上传文件系统映像
- 构建代码
- 上传代码
- 监视器
要构建文件系统镜像,我们需要运行“构建文件系统镜像”任务:
点击“构建文件系统映像”任务后,我们可以在终端中看到以下输出:
现在您可以将文件系统镜像上传到 esp32。为此,我们需要运行“上传文件系统镜像”任务:
上传文件系统镜像任务的输出如下:
现在我们可以构建代码了。为此,我们需要运行构建任务:
ESP32 已准备好上传代码。为此,我们需要运行“上传代码”任务,但我选择运行“上传并监控”任务,以便在代码上传后查看输出。
点击“上传和监控”任务后,我们可以在终端中看到以下输出:
如您所见,ESP32 已连接到 AWS IoT Core。现在我们可以向 AWS IoT Core 发送数据了。
将数据发送到 AWS IoT Core
在本部分,我们将向 AWS IoT Core 发送数据。我有一个 dht11 传感器,我想将温度和湿度数据发送到 AWS IoT Core。我将使用以下库从传感器读取数据:
Adafruit Unified SensorDHT sensor library
我们需要在项目中添加以下库:
lib_deps =
bblanchon/ArduinoJson@^6.18.5
knolleary/pubsubclient@^2.8
adafruit/DHT sensor library@^1.4.6
adafruit/Adafruit Unified Sensor@^1.1.14
添加这些库之后,PlatformIO 会自动下载这些库并将其添加到 lib 文件夹中。如果未自动添加,则需要打开 PlatformIO 终端并运行以下命令:
pio pkg install
首先,我们需要为传感器数据创建一个模型。我们将在 lib 文件夹下的 model 文件夹中创建有效载荷模型。
PayloadModel.h
// lib/model/PayloadModel.h
// PayloadModel class definition.
#ifndef PAYLOADMODEL_H
#define PAYLOADMODEL_H
#include <Arduino.h>
#include <ArduinoJson.h>
class PayloadModel
{
private:
String clientId;
bool isClientIdValid;
float humidity;
bool isHumidityValid;
float temperature;
bool isTemperatureValid;
public:
PayloadModel()
{
clientId = "";
isClientIdValid = false;
humidity = 0;
isHumidityValid = false;
temperature = 0;
isTemperatureValid = false;
};
void setClientId(String clientId, bool isClientIdValid)
{
this->clientId = clientId;
this->isClientIdValid = isClientIdValid;
};
void setHumidity(float humidity, bool isHumidityValid)
{
this->humidity = humidity;
this->isHumidityValid = isHumidityValid;
};
void setTemperature(float temperature, bool isTemperatureValid)
{
this->temperature = temperature;
this->isTemperatureValid = isTemperatureValid;
};
char *toJson()
{
static char buffer[512];
DynamicJsonDocument doc(256);
if (this->isClientIdValid)
{
doc["clientId"] = this->clientId;
}
else
{
doc["clientId"] = nullptr;
}
if (this->isHumidityValid)
{
doc["humidity"] = this->humidity;
}
else
{
doc["humidity"] = nullptr;
}
if (this->isTemperatureValid)
{
doc["temperature"] = this->temperature;
}
else
{
doc["temperature"] = nullptr;
}
serializeJson(doc, buffer);
return buffer;
};
};
#endif
对于 payloadModel,我们需要创建一个 JSON 字符串。我们将使用 ArduinoJson 库来创建该 JSON 字符串。
现在我们可以在 main.cpp 文件中实现 payloadModel 和 dht11 传感器。
main.cpp
#define LED_PIN 2
#define DHT_PIN 15
#define DHT_TYPE DHT11
unsigned long previousSensorMillis = 0;
const long sensorInterval = 10000;
#include <Arduino.h>
#include <LittleFS.h>
#include <PubSubClient.h>
#include <WiFiClientSecure.h>
#include "../service/ConfigService.h"
#include "../model/PayloadModel.h"
#include <DHT.h>
WiFiClientSecure espClient;
PubSubClient client(espClient);
MqttCredentialModel mqttCredential;
WifiCredentialModel wifiCredential;
CertificateCredentialModel certificateCredential;
PayloadModel payloadModel;
char *payload;
DHT dht(DHT_PIN, DHT_TYPE);
void setup()
{
Serial.begin(115200);
Serial.println("Starting...");
if (!LittleFS.begin())
{
Serial.println("An Error has occurred while mounting LittleFS");
return;
}
ConfigService configService(LittleFS);
wifiCredential = configService.getWifiCredential();
if (wifiCredential.isEmpty())
{
Serial.println("Wifi credential is empty");
return;
}
mqttCredential = configService.getMqttCredential();
if (mqttCredential.isEmpty())
{
Serial.println("Mqtt credential is empty");
return;
}
certificateCredential = configService.getCertificateCredential();
if (certificateCredential.isEmpty())
{
Serial.println("Certificate credential is empty");
return;
}
WiFi.begin(wifiCredential.ssid.c_str(), wifiCredential.password.c_str());
while (WiFi.status() != WL_CONNECTED)
{
digitalWrite(LED_PIN, HIGH);
delay(50);
digitalWrite(LED_PIN, LOW);
delay(50);
digitalWrite(LED_PIN, HIGH);
delay(50);
digitalWrite(LED_PIN, HIGH);
delay(1000);
Serial.print(".");
}
Serial.println("WiFi connected");
// Set the certificates to the client
espClient.setCACert(certificateCredential.ca.c_str());
espClient.setCertificate(certificateCredential.certificate.c_str());
espClient.setPrivateKey(certificateCredential.privateKey.c_str());
client.setServer(mqttCredential.host.c_str(), mqttCredential.port);
while (!client.connected())
{
Serial.println("Connecting to AWS IoT...");
if (client.connect(mqttCredential.clientId.c_str()))
{
Serial.println("Connected to AWS IoT");
}
else
{
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
// set the payload model
payloadModel = PayloadModel();
payloadModel.setClientId(mqttCredential.clientId, true);
}
void loop()
{
unsigned long currentMillis = millis();
if (currentMillis - previousSensorMillis >= sensorInterval)
{
digitalWrite(LED_PIN, HIGH);
previousSensorMillis = currentMillis;
float humidity = dht.readHumidity();
float temperature = dht.readTemperature();
payloadModel.setHumidity(humidity, !isnan(humidity));
payloadModel.setTemperature(temperature, !isnan(temperature));
payload = payloadModel.toJson();
Serial.println("Publish message: ");
Serial.println(payload);
client.publish(mqttCredential.publishTopic.c_str(), payload);
}
digitalWrite(LED_PIN, LOW);
}
这段代码从传感器读取温度和湿度数据,并每 10 秒将其作为 json 数据发送到 AWS IoT Core。
要测试代码,请运行“上传并监控”任务。代码上传后,我们可以在终端中看到以下输出:
我们可以看到,esp32 正在向 AWS IoT Core 发送数据。
我们可以检查 ESP32 是否向 AWS IoT Core 发送数据。为此,我们需要打开 AWS IoT Core 控制台,然后点击相应的选项Test卡。之后,我们可以订阅我们在 mqtt_config.json 文件中创建的主题。
订阅该主题后,我们可以看到 esp32 发送的数据。
我们可以看到的数据如下:
{
"clientId": "esp_test1",
"humidity": 55,
"temperature": 27.60000038
}
一切运转正常。那么接下来呢?
构建 SAM CloudFormation 模板:集成 DynamoDB、IoT Core Rule 和 Lambda
在本部分中,我们将构建一个 SAM CloudFormation 模板。我们将使用该模板创建 DynamoDB 表、IoT Core 规则和 Lambda 函数。以下是具体步骤:
1.为 Lambda 函数创建角色:
LambdaFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: "lambda-function-policy"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: "*"
- PolicyName: DynamoDBCRUDPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- dynamodb:PutItem
Resource: "*"
我们为该角色添加了两条策略。第一条策略用于日志记录,第二条策略用于 DynamoDB。Lambda 函数会将数据写入 DynamoDB,因此我们需要将 DynamoDB 策略添加到该角色。目前,`dynamodb:PutItem` 策略足以满足我们的需求,但我们也可以向该策略添加其他 DynamoDB 操作。
2.创建 Lambda 函数:
MQTTSubscribeHandler:
Type: AWS::Serverless::Function
Properties:
Role: !GetAtt LambdaFunctionRole.Arn
FunctionName: "esp32_to_aws_mqtt-subscribe-handler"
CodeUri: ./functions/mqtt-subscribe-handler
Handler: app.lambda_handler
Runtime: go1.x
Architectures:
- x86_64
此函数将由 IoT Core 规则触发。该函数会将数据写入 DynamoDB。
3.创建 DynamoDB 表:
ThingData:
Type: AWS::DynamoDB::Table
Properties:
TableName: "ThingDataTable"
AttributeDefinitions:
- AttributeName: clientId
AttributeType: S
- AttributeName: createdAt
AttributeType: S
KeySchema:
- AttributeName: clientId
KeyType: HASH
- AttributeName: createdAt
KeyType: RANGE
BillingMode: PAY_PER_REQUEST
我们将使用此表来存储 esp32 发送的数据。
clientId是分区键,createdAt是排序键。
clientId+createdAt是复合主键。我们可以使用此键保存具有相同 clientId 但 createdAt 不同的数据。
4.创建物联网核心规则:
IoTTopicRule:
Type: AWS::IoT::TopicRule
Properties:
TopicRulePayload:
RuleDisabled: false
Sql: "SELECT * FROM 'esp32/sensor/test'"
Actions:
- Lambda:
FunctionArn: !GetAtt MQTTSubscribeHandler.Arn
当数据发送到该esp32/sensor/test主题时,此规则将触发 lambda 函数。
5.创建调用 Lambda 函数的权限:
MQTTSubscribeHandlerPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt MQTTSubscribeHandler.Arn
Principal: iot.amazonaws.com
SourceArn: !GetAtt IoTTopicRule.Arn
此权限将允许 IoT Core 规则调用 lambda 函数。
6.最终确定 SAM 模板:
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: >
esp32-to-aws-cloud-example
Sample SAM Template for esp32-to-aws-cloud-example
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 5
MemorySize: 128
Environment:
Variables:
ThingDataTable: "ThingDataTable"
Resources:
LambdaFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: "lambda-function-policy"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: "*"
- PolicyName: DynamoDBCRUDPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- dynamodb:PutItem
Resource: "*"
MQTTSubscribeHandler:
Type: AWS::Serverless::Function
Properties:
Role: !GetAtt LambdaFunctionRole.Arn
FunctionName: "esp32_to_aws_mqtt-subscribe-handler"
CodeUri: ./functions/mqtt-subscribe-handler
Handler: app.lambda_handler
Runtime: go1.x
Architectures:
- x86_64
ThingData:
Type: AWS::DynamoDB::Table
Properties:
TableName: "ThingDataTable"
AttributeDefinitions:
- AttributeName: clientId
AttributeType: S
- AttributeName: createdAt
AttributeType: S
KeySchema:
- AttributeName: clientId
KeyType: HASH
- AttributeName: createdAt
KeyType: RANGE
BillingMode: PAY_PER_REQUEST
IoTTopicRule:
Type: AWS::IoT::TopicRule
Properties:
TopicRulePayload:
RuleDisabled: false
Sql: "SELECT * FROM 'esp32/sensor/test'"
Actions:
- Lambda:
FunctionArn: !GetAtt MQTTSubscribeHandler.Arn
MQTTSubscribeHandlerPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt MQTTSubscribeHandler.Arn
Principal: iot.amazonaws.com
SourceArn: !GetAtt IoTTopicRule.Arn
使用 Golang 构建 AWS IoT Lambda 函数
在本部分,我们将编写 Lambda 函数代码。我们将使用适用于 Go 的 AWS 开发工具包来编写 Lambda 函数。
首先,我们需要为 lambda 函数创建一个文件夹。我们将文件夹命名为 `<folder_name>` mqtt-subscribe-handler。然后,我们将main.go在该文件夹中创建一个文件。
项目文件夹结构如下:
mqtt-subscribe-handlerlambda 函数位于该functions文件夹下。
lib文件夹包含我们创建的模型和服务。
注意:我们将使用 `<module>` 文件夹go.work,以便在 Lambda 函数中使用模型、服务和库。lib该文件夹有自己的 `go.mod` 模块,因此,如果我们想在 Lambda 函数中使用它,它必须与 Lambda 函数位于同一根级别,但实际情况并非如此。为了解决这个问题,我们需要将 `<module>`lib和 Lambda 函数go.work`<module>` 文件中。请记住这些信息,如果您想使用 Go 语言重构 AWS Lambda 函数的服务和模型,则需要注意这一点。
go 1.21.4
use (
./functions/mqtt-subscribe-handler
./lib
)
让我们开始编写代码吧。
- lib/constants/dbNames.go
package constants
const THING_TABLE_NAME_KEY = "ThingDataTable"
在使用 DynamoDB 时,我们需要用到表名。因此,我们需要创建一个表名常量。它与模板文件中的全局变量相关。
Globals:
Function:
Timeout: 5
MemorySize: 128
Environment:
Variables:
ThingDataTable: "ThingDataTable"
- lib/helper/logger.go
package helper
import (
"encoding/json"
"log"
)
func EventLogger(event interface{}) {
eventJSON, err := json.Marshal(event)
if err != nil {
log.Printf("Error marshalling event: %v", err)
}
log.Printf("Event:\n%s", string(eventJSON))
}
此辅助函数将用于以 json 格式记录事件。
- lib/model/thingModel.go
package model
type ThingPayload struct {
ClientId string `json:"clientId"`
Temperature float64 `json:"temperature"`
Humidity float64 `json:"humidity"`
}
type ThingData struct {
ThingPayload
CreatedAt string `json:"createdAt"`
}
ThingPayload 是有效负载模型,我们将在 Lambda 函数中将其用作事件。ThingData 用于 DynamoDB,我们将使用 ThingData 将数据保存到 DynamoDB。
ThingPayload 和 ThingData 具有共同的属性,因此我们可以将 ThingPayload 用作 ThingData 中的嵌入式结构。
- lib/service/thingDataDbService.go
package service
import (
"lib/constants"
"lib/model"
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)
type ThingDataDbService struct {
tableName string
dynamoDbProvider *dynamodb.DynamoDB
}
func (s ThingDataDbService) CreateThing(thing model.ThingData) error {
attributeValue, err := dynamodbattribute.MarshalMap(thing)
if err != nil {
return err
}
input := &dynamodb.PutItemInput{
Item: attributeValue,
TableName: aws.String(s.tableName),
}
_, err = s.dynamoDbProvider.PutItem(input)
return err
}
func NewThingDataDbService(session *session.Session) ThingDataDbService {
dynamoDBProvider := dynamodb.New(session)
return ThingDataDbService{
tableName: os.Getenv(constants.THING_TABLE_NAME_KEY),
dynamoDbProvider: dynamoDBProvider,
}
}
CreateThing 函数将用于把数据保存到 DynamoDB。
如您所见,我们使用了os.Getenv(constants.THING_TABLE_NAME_KEY)获取表名的方法。这与模板文件中的全局变量有关。
创建完模型和服务之后,我们就可以开始编写 lambda 函数了。
- functions/mqtt-subscribe-handler/main.go
package main
import (
"fmt"
"lib/helper"
"lib/model"
"lib/service"
"time"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws/session"
)
func handler(event model.ThingPayload) {
helper.EventLogger(event)
currentTime := time.Now().UTC().Format(time.RFC3339)
sess, err := session.NewSession()
if err != nil {
fmt.Println("Error creating session: ", err)
return
}
thingDataDbService := service.NewThingDataDbService(sess)
thingData := model.ThingData{
ThingPayload: event,
CreatedAt: currentTime,
}
err = thingDataDbService.CreateThing(thingData)
if err != nil {
fmt.Println("Error creating thing: ", err)
return
}
}
func main() {
lambda.Start(handler)
}
首先,我们记录事件。然后,我们获取当前时间。之后,我们创建一个会话。接着,我们创建一个 thingDataDbService。最后,我们创建一个 thingData 并将其保存到 DynamoDB 中。
部署 SAM 模板
在本部分,我们将部署 SAM 模板。我们将使用 AWS CLI 来部署 SAM 模板。我们将使用 Makefile 来运行 AWS CLI 命令。以下是具体步骤:
1.创建 Makefile:
.PHONY: build
build:
sam build
clean-deploy:
rm -rf .aws-sam/cache
sam build
sam deploy --no-confirm-changeset
我创建了 clean-deploy 目标来清除缓存并重新部署模板,因为有时 lib 文件夹中的更改不会反映在 Lambda 函数中。所以我们需要清除缓存并重新部署模板。
2.部署 SAM 模板:
make clean-deploy
运行此命令后,我们可以在终端中看到以下输出:
如图所示,堆栈已成功创建。
测试我们的 ESP32-AWS 架构
为了进行测试,我认为只需查看 DynamoDB 表即可。因此,我们需要打开 DynamoDB 控制台并点击相应的选项Tables卡。然后我们就可以看到该ThingDataTable表了。
我们可以看到,ESP32发送的数据已保存到DynamoDB中。
一切看起来都运行正常。
结论
本文介绍了如何将 ESP32 连接到 AWS IoT Core,如何向 AWS IoT Core 发送数据,如何创建 SAM 模板以创建 DynamoDB 表、IoT Core 规则和 Lambda 函数,如何编写 Lambda 函数将数据保存到 DynamoDB,如何部署 SAM 模板以及如何测试我们的架构。
希望您喜欢这篇文章。如果您有任何问题或建议,请随时在评论区留言。
存储库
本文提供的 CloudFormation 模板和 Lambda 函数代码示例(GitHub)。














