NodeJS 与 Kafka:轻松构建异步程序
📍 同步和异步编程
😶🌫️ 在当今软件开发如此复杂的时代,我们需要巧妙的架构来应对这些挑战。例如,如果我们开发了一个应用程序,之后决定让它处理成千上万甚至数百万的用户,那么我们就必须做出明智的决策。微服务架构就是一个很好的选择。
🙃 另一方面,有些服务会使我们的系统运行缓慢。
👉 以在线购买机票✈️为例。只有完成付款后,预订才能成功,这称为同步编程。另一方面,预订不应该依赖于电子邮件,例如预订邮件是否送达。
🧐 因此,电子邮件和预订并非直接相关,预订完成也不应该取决于电子邮件是否已发送。所以,我们可以让电子邮件在后台或异步运行。
😊 请参考下图以获得更清晰的说明:
- 同步编程

- 使用消息队列进行异步编程

这里我们可以看到使用了消息队列,生产者提交事件,不同的消费者消费事件。注意:这只是一个简单的示例。
📍消息队列
消息队列用于服务间的通信,并提供异步行为。它主要用于无服务器架构和微服务架构。
🤩 市面上有各种各样的MQ产品:
- Apache Kafka
- Azure 调度程序
- 纳斯特尔
- Apache Qpid
- RabbitMQ
🚀今天我们将学习如何使用 NodeJS 实现 Kafka,以便您可以轻松地将其应用到您的项目中。
**您可以参考这个Github仓库**
📍实施
🙃 供您参考:
Kafka 将在Docker 容器内运行,生产者会将事件添加到 Kafka 主题(队列)中,消费者则会消费这些事件。
- Docker Compose 文件
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
container_name: 'kafka'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT://:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
☺️ 这里我们在 Docker 容器内运行 Kafka 镜像,并将 9092 端口暴露给外部访问。Zookeeper 是 Kafka 运行所需的依赖项。
- eventType.js 文件
import avro from 'avsc';
export default avro.Type.forSchema({
type: 'record',
fields: [
{
name: 'category',
type: { type: 'enum', symbols: ['DOG', 'CAT'] }
},
{
name: 'noise',
type: 'string',
}
]
});
这是我们为提交到 Kafka 主题中的事件定义的模式。
- 生产者 index.js
import Kafka from 'node-rdkafka';
import eventType from '../eventType.js';
const stream = Kafka.Producer.createWriteStream({
'metadata.broker.list': 'localhost:9092'
}, {}, {
topic: 'test'
});
stream.on('error', (err) => {
console.error('Error in our kafka stream');
console.error(err);
});
function queueRandomMessage() {
const category = getRandomAnimal();
const noise = getRandomNoise(category);
const event = { category, noise };
const success = stream.write(eventType.toBuffer(event));
if (success) {
console.log(`message queued (${JSON.stringify(event)})`);
} else {
console.log('Too many messages in the queue already..');
}
}
function getRandomAnimal() {
const categories = ['CAT', 'DOG'];
return categories[Math.floor(Math.random() * categories.length)];
}
function getRandomNoise(animal) {
if (animal === 'CAT') {
const noises = ['meow', 'purr'];
return noises[Math.floor(Math.random() * noises.length)];
} else if (animal === 'DOG') {
const noises = ['bark', 'woof'];
return noises[Math.floor(Math.random() * noises.length)];
} else {
return 'silence..';
}
}
setInterval(() => {
queueRandomMessage();
}, 3000);
在这里,我们每 3000 毫秒生成一条随机消息,它是异步的,因为 setInterval() 是 Node API(用 C++ 编写)提供的方法,本质上是异步的。
- 消费者索引.js
import Kafka from 'node-rdkafka';
import eventType from '../eventType.js';
var consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
}, {});
consumer.connect();
consumer.on('ready', () => {
console.log('consumer ready..')
consumer.subscribe(['test']);
consumer.consume();
}).on('data', function(data) {
console.log(`received message: ${eventType.fromBuffer(data.value)}`);
});
消费者将消费测试主题的消息,并相应地打印出来。
注意:- 要运行生产者和消费者
- 运行npm run start:producer
- 运行npm run start:consumer
🥳 今天就到这里啦。如果觉得有用,请保存并留言。你可以参考这个简单的例子来构建自己的项目。
文章来源:https://dev.to/singhdevhub/nodejs-with-kafka-build-async-programs-with-ease-11n3


