我如何基于实时 COVID-19 数据流构建事件驱动型 NodeJS 应用
研究:背景及要求
决定
下一步
AWS AI 直播!
在我们身处的这个前所未有的时期,我们齐心协力,共同努力,将资源用于新冠肺炎疫情救助工作,造福公众。每个人都贡献了自己的专长。有些人利用3D打印机制作个人防护装备,有些人开发软件,还有些人为亲友或社区提供技术支持。在这篇文章中,我想分享一下我是如何利用实时新冠肺炎疫情数据流,构建一个基于NodeJS的事件驱动型应用程序,并使用一个简单的消息传递协议,该协议也可用于构建面向公众的应用程序。
在这个应用中,我将使用Solace向公众开放的各种信息流,任何人都可以订阅。关于如何使用这些信息流的文档可以在这个GitHub仓库中找到。
SolaceLabs / covid19-stream-processors
通过 Solace 平台,您可以获取 JHU 和 CovidTracking.com 提供的 COVID-19 数据流信息及处理示例应用。
研究:背景及要求
在开发这个应用程序之前,我列出了三个想要实现的基本需求:
-
实时数据更新
-
轻量级应用程序(我不想持续轮询或检查新数据)
-
对任何数据变化做出反应
选项
从更高层面来看,我可以选择两种不同的架构:
-
一种同步的 REST 驱动方法或
-
事件驱动架构(EDA)方法
选项 1:REST
第一种方案可以利用很多在线资源,包括API或 CSV 数据集,例如约翰·霍普金斯大学在其GitHub 代码库中发布的那些。虽然这是一个可行的方案,提供了大量的在线示例和资源,但我想要更实时的解决方案,因为……
a) 数据在首次发布时最有价值(见下图1);
b) 我想要一个能够响应数据更新而不是持续轮询更新的应用程序。例如,约翰·霍普金斯大学 GitHub 代码库中的 CSV 文件每天更新一到两次。如果我使用这个数据库,我就必须持续轮询并检查更新。
此外,由于我要处理的是大型数据集,我希望仅在数据发生更改时才做出响应。因此,REST 方案并非轻量级实现。这就否定了要求 1 和 2。
方案二:EDA
借助事件驱动架构,我可以使用发布/订阅模式来构建我的应用程序。你可能会问,什么是发布/订阅?简而言之,它指的是拥有一个数据“发布者”(例如 COVID-19 数据源)和一个数据“订阅者”(例如我的 Node.js 应用程序),后者仅在新数据发布时才做出响应。
Solace向公众开放的PubSub+ COVID-19 数据代理,会将 COVID-19 数据更新发布到不同的数据流中。因此,希望开发事件驱动型应用(物联网、移动/Web 应用)的应用程序开发人员可以通过订阅任何可用主题来获取数据流。由于此数据的使用与框架、平台和语言无关,我可以使用任何支持这些协议的消息传递协议(MQTT、AMQP、JMS)或开放 API(Python、JS、NodeJS 等)。我甚至可以使用 REST!
决定
因此,在评估了上述两种方案后,我决定采用探索性数据分析(EDA)方法来构建我的应用程序。此外,由于我希望使用轻量级的消息传递 API,获取实时 COVID-19 更新并对这些更新做出快速响应,EDA 显然是最佳选择。
让我们进入正题;嗯,我是说开始编码。
基于 Solace PubSub+ 代理支持的语言和协议,我决定使用 MQTT,因为它有原生的 NodeJS API。
1. 初始设置
接下来,我们来创建一个 NodeJS 项目。打开一个新的终端窗口,执行以下命令,该命令会创建一个新的项目目录,初始化项目并安装 mqtt 包。
mkdir covidproject && cd "$_" && npm init -y && npm i mqtt
2. 连接到经纪商
创建一个新文件
touch index.js
用你喜欢的文本编辑器打开它。插入以下内容。
var mqtt = require('mqtt')
var host = "tcp://mr2r9za6fwi0wf.messaging.solace.cloud:1883"
var config = {
username: "covid-public-client",
password: "covid19",
}
var client = mqtt.connect(host, config)
client.on('connect', () => {
console.log("Connected to COVID PubSub+ Broker!")
})
您上面的操作是初始化了一个mqtt客户端,并使用主机和配置变量连接到了代理 URL。MQTT 对象随后会返回一些信号,您的客户端应用程序可以利用这些信号来实现回调行为。在本例中,我们监听的是“connect”信号client.on(‘connect’)。我们稍后会介绍更多信号。
注意:代理 URL 和凭据来自此处:https://github.com/SolaceLabs/covid19-stream-processors#1-connection-information
现在通过在终端执行以下命令来测试您的连接。
node index.js
你应该能看到Connected to COVID PubSub+ Broker!输出结果。瞧!
3. 订阅该主题
现在你已经连接到经纪商,你只需要订阅主题即可。
var topics = [
"jhu/csse/covid19/raw",
]
client.on('connect', () => {
console.log("Connected to COVID PubSub+ Broker!")
topics.forEach( (topic) => {
console.log("Subscribing to topic: ", topic)
client.subscribe(topic)
})
})
4. 收听收到的消息
我们要监听的第二个信号如下message:
client.on('message', (topic, message) => {
console.log("Received message on Topic: ", topic,"\nMessage:\n", JSON.parse(message.toString()))
})
请注意,从代理服务器接收到的消息是二进制格式的。为了将其转换为人类可读的格式message.toString(),我们使用 JSON 解析器进行解析。请注意,消息是根据仓库中定义的模式以 JSON 格式发送的。
最终申请表如下所示:
var mqtt = require('mqtt')
var host = "tcp://mr2r9za6fwi0wf.messaging.solace.cloud:1883"
var config = {
username: "covid-public-client",
password: "covid19",
}
var topics = [
"jhu/csse/covid19/raw",
]
var client = mqtt.connect(host, config)
client.on('connect', () => {
console.log("Connected to COVID PubSub+ Broker!")
topics.forEach( (topic) => {
console.log("Subscribing to topic: ", topic)
client.subscribe(topic)
})
})
client.on('message', (topic, message) => {
console.log("Received message on Topic: ", topic,"\nMessage:\n", JSON.parse(message.toString()))
})
大功告成!您的应用程序现已连接到代理,并订阅数组中定义的一个或多个主题,且仅在发布新消息时才会做出反应。
示例运行
我已修改我的应用程序,使其能够订阅加拿大安大略省的所有病例更新(死亡、现存、确诊和康复病例)、约旦的康复病例以及英国所有省份的确诊病例,订阅方式为使用以下主题流test。
var topics = [
"jhu/csse/covid19/test/cases/+/update/Canada/Ontario/#",
"jhu/csse/covid19/test/cases/recovered/update/Jordan/#",
"jhu/csse/covid19/test/cases/confirmed/update/United Kingdom/#"
]
请注意,MQTT 通配符(“+”和“#”)分别用于主题级别匹配和多级匹配。
您可以访问https://www.marcd.dev/COVIDStreamViewer/mqtt/mqttListener.html并订阅jhu/csse/covid19/raw主题以获取示例流查看器。
下一步
查看主题层次结构时,您可以订阅不同的主题,并使用MQTT 通配符进一步自定义客户端应用程序如何使用事件流。
我非常乐意看到您的想法,欢迎随时分享并向SolaceLabs 的 GitHub 代码库提交 pull request!如果您有任何疑问,请在下方评论区留言。也欢迎您阅读这篇博文,了解我如何尝试用相同的方法构建一个 Python 应用程序!
文章来源:https://dev.to/solacedevs/how-i-built-an-event-driven-nodejs-app-on-realtime-covid-19-data-streams-3i98





