发布于 2026-01-06 0 阅读
0

使用 Node.js 实现 Redis 发布/订阅:让我们来编写代码

在 Node.js 中使用 Redis 发布/订阅

让我们开始编程吧

发布/订阅模式中,发布者并非被编程为向特定接收者发送消息(有效载荷)。这些消息由发布者发送到特定的频道,接收者可以订阅一个或多个频道来接收这些消息。

假设你有一个单体后端,现在你想在这个后端添加一个新功能,例如发送电子邮件。与其让这个后端直接负责发送电子邮件,不如让它成为一个发布者,将邮件发送到一个渠道,供另一个后端(接收者)接收,由接收者负责发送电子邮件(例如新闻简报)。

这个过程的实现非常简单,因此在今天的示例中,我决定创建一个简单的 API,以便接收我们请求的主体,并将其发送到特定的通道,供接收器使用console.log()

我今天决定使用的框架是tinyhttp,它的 API 类似于 Express,我并没有使用这个框架的特殊原因,而且示例代码很容易复制到其他框架中。

tinnyhttp 框架没有集成 body-parser,所以我将安装milliparsec,它不仅比著名的 body-parser 更轻量级,而且是异步的,速度更快。

我今天将使用的 Redis 客户端是ioredis,因为它的 API 直观、非常强大且性能良好。

让我们开始编程吧

正如您可能已经了解到的,我们将有两个后端。其中一个后端我们将称为发布端(pub),它将是我们的 API。另一个后端将称为订阅端(sub),它将是我们的接收器。

我们先来安装pub依赖项:

npm i @tinyhttp/app milliparsec ioredis
Enter fullscreen mode Exit fullscreen mode

现在让我们创建一个简单的 API:

// @/pub/index.js

import { App } from "@tinyhttp/app";
import { json } from "milliparsec";

const app = new App();

app.use(json());

app.get("/", (req, res) => {
  return res.json({ msg: "I hope this runs 😅" });
});

app.listen(3000);
Enter fullscreen mode Exit fullscreen mode

现在我们可以将ioredis导入到我们的项目中,然后创建我们的客户端。

// @/pub/index.js

import { App } from "@tinyhttp/app";
import { json } from "milliparsec";
import Redis from "ioredis";

const app = new App();
const redis = new Redis();

app.use(json());

app.get("/", (req, res) => {
  return res.json({ msg: "I hope this runs 😅" });
});

app.listen(3000);
Enter fullscreen mode Exit fullscreen mode

现在,我们将在端点上创建一个发布者,为此我们将使用该方法。此方法接受两个参数,第一个参数是我们要发送消息的频道redis.pubish()名称,第二个参数是同一条消息

另外,我想补充一点,在这种情况下,我们的消息有效负载将是一个 JSON 对象,但消息本身必须是字符串类型。因此,我们需要将 JSON 转换为字符串。

除此之外,我们还将把端点的 http 动词从 GET 改为 POST。

// @/pub/index.js

import { App } from "@tinyhttp/app";
import { json } from "milliparsec";
import Redis from "ioredis";

const app = new App();
const redis = new Redis();

app.use(json());

app.post("/", (req, res) => {
  redis.publish("send-user-data", JSON.stringify({ ...req.body }));
  return res.sendStatus(200);
});

app.listen(3000);
Enter fullscreen mode Exit fullscreen mode

这样一来,我们的酒吧就完工了,现在我们可以开始建造潜艇了。

首先,我们来安装以下依赖项:

npm i ioredis
Enter fullscreen mode Exit fullscreen mode

首先,我们来创建一个函数来保持后端运行。

// @/sub/index.js

const main = () => {
  console.log("I hope it runs 😅")
};

main();
Enter fullscreen mode Exit fullscreen mode

现在我们可以将ioredis导入到我们的项目中,然后创建我们的客户端。

// @/sub/index.js

import Redis from "ioredis";

const redis = new Redis();

const main = () => {
  console.log("I hope it runs 😅")
};

main();
Enter fullscreen mode Exit fullscreen mode

我们现在正在开发订阅,也就是接收器。因此,我们需要使用redis.subscrive()方法创建一个订阅者。在这个例子中,我们只有两个参数,第一个参数是我们要订阅的频道,第二个参数是回调函数

回调函数包含两个参数,第一个是错误信息,第二个是频道数量。频道数量用于统计订阅者订阅了多少个频道,在本例中为 1。

// @/sub/index.js

import Redis from "ioredis";

const redis = new Redis();

const main = () => {
  redis.subscribe("send-user-data", (err, count) => {
    // ...
  });

  // ...
};

main();
Enter fullscreen mode Exit fullscreen mode

如果发生错误,您需要记录该错误;但如果没有发生错误,您需要记录我们的订阅者订阅的频道数量。

// @/sub/index.js

import Redis from "ioredis";

const redis = new Redis();

const main = () => {
  redis.subscribe("send-user-data", (err, count) => {
    if (err) console.error(err.message);
    console.log(`Subscribed to ${count} channels.`);
  });

  // ...
};

main();
Enter fullscreen mode Exit fullscreen mode

现在我们只需要创建一个监听器来感知即将进入通道队列的消息。为此,我们将使用一个redis.on()接收两个参数的方法,第一个参数是事件名称,在本例中即为消息,第二个参数是回调函数

回调函数接受两个参数,第一个参数是消息来自的通道,第二个参数是消息本身。

// @/sub/index.js

import Redis from "ioredis";

const redis = new Redis();

const main = () => {
  redis.subscribe("send-user-data", (err, count) => {
    if (err) console.error(err.message);
    console.log(`Subscribed to ${count} channels.`);
  });

  redis.on("message", (channel, message) => {
    // ...
  });
};

main();
Enter fullscreen mode Exit fullscreen mode

之后,我们需要记录消息来源的通道,最后记录收到的消息内容。但需要注意的是,收到的消息是一个字符串,因此我们需要获取对应的 JSON 对象来进行转换。如下所示:

// @/sub/index.js

import Redis from "ioredis";

const redis = new Redis();

const main = () => {
  redis.subscribe("send-user-data", (err, count) => {
    if (err) console.error(err.message);
    console.log(`Subscribed to ${count} channels.`);
  });

  redis.on("message", (channel, message) => {
    console.log(`Received message from ${channel} channel.`);
    console.log(JSON.parse(message));
  });
};

main();
Enter fullscreen mode Exit fullscreen mode

现在,当使用类似 Postman 的工具测试我们的 API 时,您可以在请求正文中发送一个包含所需属性的 json 对象。

使用 Postman 测试 API

然后你的终端上应该会显示类似这样的内容:

终端提示符日志

结论

和往常一样,希望您觉得这篇文章有趣。如果您发现文章中有任何错误,请在评论区指出。🧑🏻‍💻

祝你今天过得愉快!💪🤩

文章来源:https://dev.to/franciscomendes10866/using-redis-pub-sub-with-node-js-13k3