发布于 2026-01-06 2 阅读
0

NodeJS 与 Kafka:轻松构建异步程序

NodeJS 与 Kafka:轻松构建异步程序

📍 同步和异步编程

😶‍🌫️ 在当今软件开发如此复杂的时代,我们需要巧妙的架构来应对这些挑战。例如,如果我们开发了一个应用程序,之后决定让它处理成千上万甚至数百万的用户,那么我们就必须做出明智的决策。微服务架构就是一个很好的选择。

🙃 另一方面,有些服务会使我们的系统运行缓慢。

编程梗

👉 以在线购买机票✈️为例。只有完成付款后,预订才能成功,这称为同步编程。另一方面,预订不应该依赖于电子邮件,例如预订邮件是否送达。

🧐 因此,电子邮件和预订并非直接相关,预订完成也不应该取决于电子邮件是否已发送。所以,我们可以让电子邮件在后台或异步运行。

😊 请参考下图以获得更清晰的说明:

  • 同步编程同步编程
  • 使用消息队列进行异步编程消息队列

这里我们可以看到使用了消息队列,生产者提交事件,不同的消费者消费事件。注意:这只是一个简单的示例。

📍消息队列

消息队列用于服务间的通信,并提供异步行为。它主要用于无服务器架构和微服务架构。

消息队列

🤩 市面上有各种各样的MQ产品:

  • Apache Kafka
  • Azure 调度程序
  • 纳斯特尔
  • Apache Qpid
  • RabbitMQ

🚀今天我们将学习如何使用 NodeJS 实现 Kafka,以便您可以轻松地将其应用到您的项目中。

**您可以参考这个Github仓库**

📍实施

🙃 供您参考:

Kafka主题

Kafka 将在Docker 容器内运行,生产者会将事件添加到 Kafka 主题(队列)中,消费者则会消费这些事件。

  • Docker Compose 文件
version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    container_name: 'kafka'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_LISTENERS=PLAINTEXT://:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
Enter fullscreen mode Exit fullscreen mode

☺️ 这里我们在 Docker 容器内运行 Kafka 镜像,并将 9092 端口暴露给外部访问。Zookeeper 是 Kafka 运行所需的依赖项。

  • eventType.js 文件
import avro from 'avsc';

export default avro.Type.forSchema({
  type: 'record',
  fields: [
    {
      name: 'category',
      type: { type: 'enum', symbols: ['DOG', 'CAT'] }
    },
    {
      name: 'noise',
      type: 'string',
    }
  ]
});
Enter fullscreen mode Exit fullscreen mode

这是我们为提交到 Kafka 主题中的事件定义的模式。

  • 生产者 index.js
import Kafka from 'node-rdkafka';
import eventType from '../eventType.js';

const stream = Kafka.Producer.createWriteStream({
  'metadata.broker.list': 'localhost:9092'
}, {}, {
  topic: 'test'
});

stream.on('error', (err) => {
  console.error('Error in our kafka stream');
  console.error(err);
});

function queueRandomMessage() {
  const category = getRandomAnimal();
  const noise = getRandomNoise(category);
  const event = { category, noise };
  const success = stream.write(eventType.toBuffer(event));     
  if (success) {
    console.log(`message queued (${JSON.stringify(event)})`);
  } else {
    console.log('Too many messages in the queue already..');
  }
}

function getRandomAnimal() {
  const categories = ['CAT', 'DOG'];
  return categories[Math.floor(Math.random() * categories.length)];
}

function getRandomNoise(animal) {
  if (animal === 'CAT') {
    const noises = ['meow', 'purr'];
    return noises[Math.floor(Math.random() * noises.length)];
  } else if (animal === 'DOG') {
    const noises = ['bark', 'woof'];
    return noises[Math.floor(Math.random() * noises.length)];
  } else {
    return 'silence..';
  }
}

setInterval(() => {
  queueRandomMessage();
}, 3000);
Enter fullscreen mode Exit fullscreen mode

在这里,我们每 3000 毫秒生成一条随机消息,它是异步的,因为 setInterval() 是 Node API(用 C++ 编写)提供的方法,本质上是异步的。

  • 消费者索引.js
import Kafka from 'node-rdkafka';
import eventType from '../eventType.js';

var consumer = new Kafka.KafkaConsumer({
  'group.id': 'kafka',
  'metadata.broker.list': 'localhost:9092',
}, {});

consumer.connect();

consumer.on('ready', () => {
  console.log('consumer ready..')
  consumer.subscribe(['test']);
  consumer.consume();
}).on('data', function(data) {
  console.log(`received message: ${eventType.fromBuffer(data.value)}`);
});
Enter fullscreen mode Exit fullscreen mode

消费者将消费测试主题的消息,并相应地打印出来。

注意:- 要运行生产者和消费者

  • 运行npm run start:producer
  • 运行npm run start:consumer

🥳 今天就到这里啦。如果觉得有用,请保存并留言。你可以参考这个简单的例子来构建自己的项目。

文章来源:https://dev.to/singhdevhub/nodejs-with-kafka-build-async-programs-with-ease-11n3