发布于 2026-01-06 0 阅读
0

NSQ教程:使用NSQ构建一个简单的消息队列

NSQ教程:使用NSQ构建一个简单的消息队列

亲爱的程序员朋友们,欢迎来到我的NSQ技术文章系列。关于这项技术的教程并不多,所以我决定写一篇。希望你们喜欢!

为什么选择 NSQ?

NSQ是一个用 Go 语言编写的实时分布式消息平台,由知名服务提供商 bit.ly 创建。

与类似系统(例如 RabbitMQ)相比,NSQ 简洁明了,易于使用,并拥有直观的管理界面。如果您之前从未接触过任何消息队列系统,NSQ 是理解其原理的最佳选择。

消息队列的概念:

消息队列是发布/订阅架构模式的一种实现,用于系统不同部分(应用程序、服务等)之间的通信。

图片替代文字

简单来说,当某个事件发生时(例如,创建新用户),就会向消息队列发布一条消息。任何对该事件感兴趣的服务都会订阅该消息。

消息发布后,感兴趣的服务(消费者)会收到消息并执行一些操作(例如,向新用户发送电子邮件)。

1. 下载 NSQ

请访问https://nsq.io/deployment/installing.html并下载适用于您操作系统的 nsq 二进制文件。

打开解压后的文件夹,您可以看到不同的可执行文件:

  • nsqlookupd.exe
  • nsqd.exe
  • nsqadmin.exe
  • ……还有很多其他的,但它们对我们来说并不那么重要。

2. 运行 nsqlookupd

在您喜欢的 shell/命令行终端中打开解压后的目录并运行:



./nsqlookupd


Enter fullscreen mode Exit fullscreen mode

您应该看到以下输出:



$ ./nsqlookupd
[nsqlookupd] 2019/10/21 13:21:18.830625 INFO: nsqlookupd v1.2.0 (built w/go1.12.9)
[nsqlookupd] 2019/10/21 13:21:18.832649 INFO: TCP: listening on [::]:4160
[nsqlookupd] 2019/10/21 13:21:18.832649 INFO: HTTP: listening on [::]:4161


Enter fullscreen mode Exit fullscreen mode

这表明 nsqlookupd 正在运行,并且有两个接口:
一个使用 TCP,端口为 4160;另一个使用 HTTP,端口为 4161。

为了验证它是否有效,我们可以打开浏览器并访问http://localhost:4161/topics



{
    topics: [ ]
}


Enter fullscreen mode Exit fullscreen mode

这是你应该得到的答案,没问题。目前我们还没有任何已注册的主题。

您还可以获取特定主题的所有频道、生产者、nsqd 节点,创建主题、频道等。更多信息请参阅此处的文档。

基本上,nsqlookupd是一种发现服务,可以帮助消费者找到特定主题的 nsqd 生产者。

nsqlookupd是管理拓扑信息的守护进程。客户端查询 nsqlookupd 以发现特定主题的 nsqd 生产者,而 nsqd 节点则广播主题和通道信息。

3. 运行 nsqd

现在在 shell 中运行 nsqd:



./nsqd


Enter fullscreen mode Exit fullscreen mode

您应该看到以下输出:



[nsqd] 2019/10/21 13:39:56.997863 INFO: nsqd v1.2.0 (built w/go1.12.9)
[nsqd] 2019/10/21 13:39:56.998861 INFO: ID: 791
[nsqd] 2019/10/21 13:39:57.000861 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2019/10/21 13:39:57.011825 INFO: HTTP: listening on [::]:4151
[nsqd] 2019/10/21 13:39:57.011825 INFO: TCP: listening on [::]:4150


Enter fullscreen mode Exit fullscreen mode

4. 发布消息

现在是时候发布我们的第一条消息到队列了。打开Postman或任何其他可以发起 HTTP 请求的工具,并向http://localhost:4151/pub?topic=test发送POST请求,请求
体为 JSON 格式。



{
  "text": "some message"
}


Enter fullscreen mode Exit fullscreen mode

/pub 是一个用于创建消息的 NSQ 端点。它需要一个名为“topic”的查询参数。“topic”代表消息的名称,任何发布到同一主题的消息都会被该主题的所有监听器消费。📨

如果请求返回 200 OK,我们将自动创建新主题。您将在 nsqd 控制台中收到相关通知:



[nsqd] 2019/10/21 13:49:04.740353 INFO: TOPIC(test): created
[nsqd] 2019/10/21 13:49:04.740353 INFO: NSQ: persisting topic/channel metadata to nsqd.dat


Enter fullscreen mode Exit fullscreen mode

另一行文字表示,有关已创建主题的信息已保存到元数据 nsqd.dat 文件中。

用任何文本编辑器打开 bin 目录下的 nsqd.dat 文件,即可看到您的主题。但我们有更好的方法来查看和管理主题。是时候使用NSQ Admin 了

5. 启动 NSQ 管理

现在在 shell 中运行 nsqadmin:



./nsqadmin


Enter fullscreen mode Exit fullscreen mode

您将在控制台中看到错误❌



[nsqadmin] 2019/10/21 14:18:04.255018 FATAL: failed to instantiate nsqadmin - --nsqd-http-address or --lookupd-http-address required


Enter fullscreen mode Exit fullscreen mode

错误提示您需要向 nsqd 或 nsqdlookup 提供地址。让我们来操作一下!



 ./nsqadmin --nsqd-http-address localhost:4151


Enter fullscreen mode Exit fullscreen mode

现在您将看到一条消息,提示 nsqadmin 正在运行:



 [nsqadmin] 2019/10/21 14:21:41.223806 INFO: nsqadmin v1.2.0 (built w/go1.12.9)
 [nsqadmin] 2019/10/21 14:21:41.224804 INFO: HTTP: listening on [::]:4171


Enter fullscreen mode Exit fullscreen mode

在浏览器中打开此地址:http://localhost:4171

你应该能看到一个名为“test”的主题。另外,如果你切换到“节点”选项卡,可以看到我们的nsqd实例正在运行并已连接。👍

如果您点击“查找”选项卡,将会看到一条警告。这是因为我们现在直接连接到了nsqd,而没有使用nsqdlookup,NSQ的创建者并不推荐使用nsqdlookup。

现在使用特定的 lookupd 地址运行此命令:



$ ./nsqadmin --lookupd-http-address localhost:4161


Enter fullscreen mode Exit fullscreen mode

打开 NSQ 管理界面,点击“查找”选项卡……看起来没问题。但再检查一下“节点”选项卡。等等……节点数为零?为什么?

目前我们已将nsqadmin连接到nsqlookupd,但nsqd实例未连接到任何设备。因此,我们的连接链断开了💥!

正确的依赖关系应该是nsqadmin -> nsqlookupd <- nsqd。我们来修复一下。

只需关闭nsqd实例,然后再次运行并指定nsqlookupd地址即可:



./nsqd -lookupd-tcp-address localhost:4160


Enter fullscreen mode Exit fullscreen mode

这次我们应该使用 lookupd 的 TCP 地址,端口是 4160。

刷新管理界面,一切应该就恢复正常了。两个标签页都运行完美!✨

6. 创建消费者应用程序

我们需要一个基础应用程序来处理这些消息。让我们为此创建一个简单的Node.js应用程序。

创建一个名为“新文件夹”的任意名称,然后运行以下命令:



npm init -y
npm i express nsqjs


Enter fullscreen mode Exit fullscreen mode

需要使用 Express 库来创建 HTTP 服务器,而 nsqjs 是 NSQ 团队提供的官方客户端库。链接在此。

创建 server.js 文件



const express = require('express')
const nsq = require('nsqjs')
const app = express()
const port = 3000

const reader = new nsq.Reader('test', 'test', {
  lookupdHTTPAddresses: 'localhost:4161'
})

reader.connect()

reader.on('message', msg => {
  console.log('Received message [%s]: %s', msg.id, msg.body.toString())
  msg.finish()
})

app.listen(port, () => console.log(`NSQ Consumer is listening on port ${port}!`))


Enter fullscreen mode Exit fullscreen mode

在我们的项目目录中运行:



node server.js


Enter fullscreen mode Exit fullscreen mode

您现在将收到所有已排队的消息。消费者应用控制台应显示以下内容:



NSQ Consumer is listening on port 3000!
Received message [0c6020dfa34cf000]: {
  "text": "some message"
}


Enter fullscreen mode Exit fullscreen mode

这是因为我们的消息在队列中等待,直到被处理为止。

在 NSQ 管理界面中,如果您选择“节点”,您会看到新的 ClientHost 已连接几秒钟了。

7. 测试接收消息

保持 server.js 运行,现在使用 POSTMAN 向主题“test”发送请求以发布新消息。

POST http://localhost:4151/pub?topic=test
并附带请求体



{
    "text": "CONNNCTED!!! YEAH!!"
}


Enter fullscreen mode Exit fullscreen mode

你应该能立即在控制台中看到它。恭喜!🎉 你的消息队列系统已经可以正常工作了。🖅🖅🖅

⚠️ 注意:如果您在 NSQ 管理界面中点击“计数器”,您会发现它现在不再是零了。

如果您向其他主题发送消息,您将看不到它,因为我们的消费者应用程序只订阅了一个主题,即“测试”。


🚀 如果你觉得这篇文章对你有帮助,请点赞并关注我,以便获取更多内容。谢谢亲爱的程序员!😏

照片由[Anastasia Dulgier](https://unsplash.com/@dulgier?utm_source=medium&utm_medium=referral)拍摄,来自Unsplash。
文章来源:https://dev.to/vguleaev/nsq-tutorial-build-a-simple-message-queue-using-nsq-43eh