了解使用 RabbitMQ 的消息队列系统
作者:Alexander Nnakwue ✏️
介绍
如今,由于微服务架构的广泛应用,企业级应用被构建成具有特定功能的解耦模块/服务。队列系统为这些服务提供了一种通信机制,通过缓冲区的形式在不同点(源/输出)之间交换或传输数据。这种通信既可以在同一应用/进程内进行,也可以在不同的服务之间进行,具体取决于实际情况。
消息代理是一种允许应用程序通过队列机制进行通信的工具。它们为数据提供了一种临时或间歇性的存储方式,防止数据在传输过程中丢失。这些系统可以采用发布/订阅模式,其中一个应用程序或进程作为消息/数据的发布者或生产者,而另一个应用程序或进程则作为消息/数据的订阅者或消费者。
接下来,我们将探索使用RabbitMQ 的队列系统。RabbitMQ 是一款高性能的开源消息代理,支持多种消息传递协议。我们将使用高级消息队列协议 (AMQP),因为它原生内置于 RabbitMQ,并且是该代理支持的核心协议。此外,它也很容易集成到不同编程语言的客户端或实现中。
本文将涵盖以下内容:
- RabbitMQ 入门
- 使用 RabbitMQ 时需要了解的重要概念
- 我们使用云托管版本设置 RabbitMQ 实例
- 本示例应用程序演示如何使用流行的 Node.js 客户端库amqplib实现消息队列。
先决条件
为了方便学习本教程,建议您安装 Node.js 和 npm。相关安装说明请参阅 Node.js官方文档。请注意,虽然我们可以在本地计算机上搭建 RabbitMQ 服务器,但本教程将搭建云端 RabbitMQ 实例或服务器。
为什么?托管实例或服务通常无需维护,因为它们已经配置完毕。它们还提供便捷的仪表盘监控,拥有优化良好的集群,而且通常还提供免费方案用于开发。
RabbitMQ 入门
根据其官网介绍,RabbitMQ 是最流行的开源消息代理之一。借助 RabbitMQ,我们可以定义队列,向这些队列推送消息,并随后从中消费消息。消息代理在此至关重要,因为它们为消息生产者和消费者应用程序或进程之间提供了一个连接点或接口。
在实际应用中,我们可以通过定义一个队列(通常是一个字符串)、通过交换机向预定义的队列发送消息,然后从中消费消息来利用 RabbitMQ 的强大功能。但在继续之前,我们需要了解一些在使用 RabbitMQ 和队列系统时可能会遇到的术语。
使用 RabbitMQ 时需要了解的重要概念
- 生产者:生产者根据队列名称向队列发送或推送消息。
- 队列:队列是一种用于传输和存储消息或缓冲区信息的媒介。
- 消费者:消费者从代理服务器订阅、接收或消费消息,然后在另一个进程或应用程序中处理或使用这些消息。
- 交换中心:交换中心是经纪商的入口点,它接收来自发布者的消息并将其路由到相应的队列。
- 消息代理:消息代理本质上为应用程序生成的数据提供存储机制。这些数据通常供另一个应用程序使用,该应用程序通过给定的参数或连接字符串连接到消息代理。
- 通道: 通道通过单一且共享的TCP连接,提供了一种与代理服务器建立轻量级连接的机制。这是因为创建多个与代理服务器的连接会消耗大量资源。
- 虚拟主机(Vhost): 虚拟主机允许单个代理托管多个隔离的环境。
注:关于绑定、接受和拒绝消息(确认)等其他重要概念的详细信息,请点击此处查看。接下来,我们还将学习如何将数据或消息推送到队列以及如何从队列中消费数据或消息。
要在本地运行我们的配置,我们可以按照提供的说明,在各种操作系统上下载RabbitMQ到我们的机器上。但是,正如前面提到的,我们将使用流行的云托管版本CloudAMPQ来设置一个托管的 RabbitMQ 实例。
首先,我们可以点击首页上的“注册”按钮并配置账户。我们将使用免费套餐来创建新实例。完成所有步骤后,我们就可以开始使用创建的实例了。
我们可以列出当前实例,以便直观地查看从应用程序连接到集群所需的参数。这些参数AMPQ url包括 `<container_name>` Host、User & Vhost`<container_name>` 和 `<container_name> Password`。稍后,我们将使用这些参数从应用程序连接到集群。请注意,我们可以直接从仪表板复制此 URL。URL 的格式如下所示:
amqp://user:pass@host:port/vhost
此外,我们还可以从界面上获得其他字段的视觉提示,包括打开的连接数、消息数等,如下所示:
对于云端和本地部署,RabbitMQ 提供了一个 Web 浏览器,方便用户管理队列、连接、通道、用户权限等。我们的管理界面截图如下所示:
RabbitMQ 的特性和用例
如前所述,消息队列本质上允许不同的应用程序(例如微服务)通过相互发送消息进行通信。RabbitMQ 的功能包括:
- 支持多种可配置的消息传递协议
- 有很多库可用于多种编程语言
- 支持完全分布式且高度可扩展的系统,并集成负载均衡功能。这意味着消息将以优化的方式路由到相应的队列。
- 提供多种交换类型,适用于发布/订阅系统和消息广播。
- 支持多种插件
- 提供通过仪表板进行管理和监控的功能。
- 易于部署(在不同区域实现高可用性),并采用高度可扩展的集群设计,可满足企业级应用需求。更多信息,请参阅文档的此部分。
使用 RabbitMQ 和 Node.js 设置我们的应用程序
为了更好地理解如何向队列发送消息以及如何从队列中消费消息,让我们来完善一下应用程序。在开始之前,我们可以先创建一个项目文件夹。然后,我们可以npm init在项目目录中运行命令来初始化一个package.json文件。接下来,我们可以安装项目所需的所有依赖项:
npm install amqplib restify dotenv concurrently --save
如前所述,我们使用了 Node.js 的 RabbitMQ 客户端库amqplib。我们还安装了restify,它将负责为我们的应用程序搭建一个基本的服务器。此外,我们还安装了dotenv来加载环境变量。最后, 包concurrently将帮助我们同时运行多个命令。package.json完成后,我们的文件应该如下所示:
{
"name": "logrocket-rabbit-tutorial",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"server": "node app/server.js",
"consumer": "node app/Consumer/consumer.js",
"dev": "concurrently \"npm run server\" \"npm run consumer\""
},
"author": "Alexander Nnakwue",
"license": "MIT",
"dependencies": {
"amqplib": "^0.5.5",
"concurrently": "^5.0.1",
"dotenv": "^8.2.0",
"restify": "^8.5.0"
}
}
如上所示,该concurrently软件包帮助我们启动服务器,服务器会调用生产者脚本,该脚本会将一个随机的 JSON 对象发送到指定的队列。然后,消费者订阅队列中的消息。在我们的用例中,我们使用的是默认交换机(直接交换机),这意味着消息将根据我们指定的队列名称进行路由。
注意:为了让应用程序能够向绑定到其队列的多个或不同的消费者应用程序或微服务发送消息(发布),我们可以指定一种不同的交换类型——通常是扇出交换。要了解有关扇出交换的更多信息,请访问文档的此部分。
要连接到我们之前设置的集群,我们可以复制控制面板中提供的连接参数,并创建一个env文件来存储它们。以下是一个示例(不包含实际凭据):
APP_PORT=3000
USER_= user
PASS= pass
HOST= host
VHOST= vhost
QUEUE_NAME= queue_name
然后,我们可以参考env上面的文件来配置 RabbitMQ 集群的连接字符串、端口和队列,如下所示:
const path = require('path');
require('dotenv').config({path:path.resolve(__dirname, '../.env')})
const config= {
port: process.env.APP_PORT,
rabbit: {
connectionString: `amqp://${process.env.USER_}:${process.env.PASS}@${process.env.HOST}/${process.env.VHOST}`,
queue: process.env.QUEUE_NAME
}
}
module.exports = config;
完成上述设置后,我们就可以根据提供的队列名称向队列发送消息了。相关代码producer.js如下所示,可以在文件中找到:
#!/usr/bin/env node
const amqp = require('amqplib');
const config = require('../config');
const publishToQueue = async (queue, message, durable = false) => {
try {
const cluster = await amqp.connect(config.rabbit.connectionString);
const channel = await cluster.createChannel();
await channel.assertQueue(queue, durable= false);
await channel.sendToQueue(queue, Buffer.from(message));
console.info(' [x] Sending message to queue', queue, message);
} catch (error) {
// handle error response
console.error(error, 'Unable to connect to cluster!');
process.exit(1);
}
}
module.exports = publishToQueue;
这里,我们导出一个函数publishToQueue。顾名思义,它接受一个队列名称、要推送到队列的消息内容(在本例中,路由键就是队列名称)以及一个可选参数。durable当此参数设置为 true 时,可以确保在代理重启或故障时消息不会丢失。
更多信息,我们可以查看队列的属性。在上面的代码中,我们连接到集群,创建了一个通道,使用所需的属性断言/创建了队列(使用该assertQueue方法),最后向队列发送了消息。
我们导出了此方法并在server.js文件中调用它,以便在应用程序启动后,我们可以开始向指定的队列推送消息。这与实际应用场景中的工作方式非常相似,在实际场景中,我们会根据发生的事件将消息推送到队列,或者立即将应用程序生成的消息放入队列。文件server.js如下所示:
const restify = require('restify');
const server = restify.createServer({
name: 'LogRocket RabbitMQ Tutorial',
version: '1.0.0'
});
const config = require('./config');
const produce = require('./Producer/producer');
const rawdata = require('./sample.json');
const sampleData = JSON.stringify(rawdata);
produce(config.rabbit.queue, sampleData, durable = false);
server.listen(config.port, function () {
console.log('%s listening at %s', server.name, server.url);
});
如上所示,我们在服务器文件中搭建了一个简单的 Restify 服务器,并导入了生产者脚本和随机 JSON 数据。然后,我们调用了生产者函数,并传入了所有必需的参数,如上所示。最后,我们的服务器正在监听之前在.env文件中指定的端口。
我们可以继续编写消费者脚本,它会从队列中读取和消费消息。在实际应用中,当我们从队列中消费消息后,我们可以发送确认信息,让代理知道消费者已经完成了它的工作。
此外,我们还可以将数据写入数据库以供后续使用,甚至可以根据需要,在执行预期操作之前对数据进行即时重新处理。文件consumer.js如下所示:
#!/usr/bin/env node
const amqp = require('amqplib');
const config = require('../config');
const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
const cluster = await amqp.connect(config.rabbit.connectionString);
const channel = await cluster.createChannel();
await channel.assertQueue(queue, durable=false);
if (prefetch) {
channel.prefetch(prefetch);
}
console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)
try {
channel.consume(queue, message => {
if (message !== null) {
console.log(' [x] Received', JSON.parse(message.content.toString()));
channel.ack(message);
return null;
} else {
console.log(error, 'Queue is empty!')
channel.reject(message);
}
}, {noAck: isNoAck})
} catch (error) {
console.log(error, 'Failed to consume messages from Queue!')
cluster.close();
}
}
consumeFromQueue(config.rabbit.queue);
在consumer.js上面的文件中,我们首先来了解一下传递给函数的参数。该prefetch参数主要控制在队列连接了多个消费者的情况下(例如扇出队列),有多少条消息会被路由到消费者。消息确认,顾名思义,用于确认消费者是否已成功传递或处理消息。
这在出现网络问题或应用程序崩溃的情况下确实非常重要,因为代理会知道消息尚未被订阅的消费者确认,因此会将其重新排队等待下一个消费者连接。更多详细信息,请查看此链接。
本教程的代码仓库位于此 GitHub 仓库中。仓库中还包含一个自述文件,解释了如何运行该应用程序。服务器启动后的输出如下所示:
结论
在本教程中,我们学习了如何向队列发送消息以及如何从队列中消费消息。虽然还有其他更高级、更复杂的用例,例如多个消费者通过定义的交换机订阅从队列中拉取消息,但我们当前的示例用例反映了理解队列系统实际工作原理所需的基本概念。
您还可以了解更多关于 RabbitMQ 的其他用例和更高级的队列系统概念。如有任何疑问或反馈,请通过Twitter联系我。谢谢!
仅需 200 个 ✅:监控生产环境中失败和缓慢的网络请求
部署基于 Node 的 Web 应用或网站并不难,难的是确保 Node 实例持续为应用提供资源。如果您希望确保对后端或第三方服务的请求都能成功,不妨试试 LogRocket。
LogRocket就像 Web 应用的 DVR,它会记录网站上发生的一切。您无需猜测问题原因,即可汇总并报告有问题的网络请求,从而快速了解根本原因。
LogRocket 会对您的应用进行检测,记录页面加载时间、首字节到达时间、慢网络请求等基准性能指标,并记录 Redux、NgRx 和 Vuex 的操作/状态。立即免费开始监控。
本文《使用 RabbitMQ 理解消息队列系统》最初发表于LogRocket 博客。
文章来源:https://dev.to/bnevilleoneill/understanding-message-queuing-systems-using-rabbitmq-4bh5

