发布于 2026-01-06 7 阅读
0

我如何构建一个基于实时 COVID-19 数据流的事件驱动型 NodeJS 应用 研究:背景和需求 决策 下一步 AWS AI LIVE!

我如何基于实时 COVID-19 数据流构建事件驱动型 NodeJS 应用

研究:背景及要求

决定

下一步

AWS AI 直播!

在我们身处的这个前所未有的时期,我们齐心协力,共同努力,将资源用于新冠肺炎疫情救助工作,造福公众。每个人都贡献了自己的专长。有些人利用3D打印机制作个人防护装备,有些人开发软件,还有些人为亲友或社区提供技术支持。在这篇文章中,我想分享一下我是如何利用实时新冠肺炎疫情数据流,构建一个基于NodeJS的事件驱动型应用程序,并使用一个简单的消息传递协议,该协议也可用于构建面向公众的应用程序。

在这个应用中,我将使用Solace向公众开放的各种信息流,任何人都可以订阅。关于如何使用这些信息流的文档可以在这个GitHub仓库中找到。

GitHub 标志 SolaceLabs / covid19-stream-processors

通过 Solace 平台,您可以获取 JHU 和 CovidTracking.com 提供的 COVID-19 数据流信息及处理示例应用。

研究:背景及要求

在开发这个应用程序之前,我列出了三个想要实现的基本需求:

  1. 实时数据更新

  2. 轻量级应用程序(我不想持续轮询或检查新数据)

  3. 对任何数据变化做出反应

选项

从更高层面来看,我可以选择两种不同的架构:

  1. 一种同步的 REST 驱动方法或

  2. 事件驱动架构(EDA)方法

选项 1:REST

第一种方案可以利用很多在线资源,包括API或 CSV 数据集,例如约翰·霍普金斯大学在其GitHub 代码库中发布的那些。虽然这是一个可行的方案,提供了大量的在线示例和资源,但我想要更实时的解决方案,因为……

a) 数据在首次发布时最有价值(见下图1);

b) 我想要一个能够响应数据更新而不是持续轮询更新的应用程序。例如,约翰·霍普金斯大学 GitHub 代码库中的 CSV 文件每天更新​​一到两次。如果我使用这个数据库,我就必须持续轮询并检查更新。

Gartner 流处理 图1:数据价值随时间推移而降低

此外,由于我要处理的是大型数据集,我希望仅在数据发生更改时才做出响应。因此,REST 方案并非轻量级实现。这就否定了要求 1 和 2。

方案二:EDA

借助事件驱动架构,我可以使用发布/订阅模式来构建我的应用程序。你可能会问,什么是发布/订阅?简而言之,它指的是拥有一个数据“发布者”(例如 COVID-19 数据源)和一个数据“订阅者”(例如我的 Node.js 应用程序),后者仅在新数据发布时才做出响应

Solace向公众开放的PubSub+ COVID-19 数据代理,会将 COVID-19 数据更新发布到不同的数据流中。因此,希望开发事件驱动型应用(物联网、移动/Web 应用)的应用程序开发人员可以通过订阅任何可用主题来获取数据流。由于此数据的使用与框架、平台和语言无关,我可以使用任何支持这些协议的消息传递协议(MQTT、AMQP、JMS)或开放 API(Python、JS、NodeJS 等)。我甚至可以使用 REST!

Solace COVID架构图 图 2:端到端架构概览

决定

因此,在评估了上述两种方案后,我决定采用探索性数据分析(EDA)方法来构建我的应用程序。此外,由于我希望使用轻量级的消息传递 API,获取实时 COVID-19 更新并对这些更新做出快速响应,EDA 显然是最佳选择。

让我们进入正题;嗯,我是说开始编码。

编码 gif

基于 Solace PubSub+ 代理支持的语言和协议,我决定使用 MQTT,因为它有原生的 NodeJS API。

Solace 语言和协议支持 图 3:Solace 支持的语言和协议

1. 初始设置

接下来,我们来创建一个 NodeJS 项目。打开一个新的终端窗口,执行以下命令,该命令会创建一个新的项目目录,初始化项目并安装 mqtt 包。


mkdir covidproject && cd "$_" && npm init -y && npm i mqtt 

Enter fullscreen mode Exit fullscreen mode

2. 连接到经纪商

创建一个新文件


touch index.js 

Enter fullscreen mode Exit fullscreen mode

用你喜欢的文本编辑器打开它。插入以下内容。


var mqtt = require('mqtt') 

var host = "tcp://mr2r9za6fwi0wf.messaging.solace.cloud:1883" 

var config = { 

    username: "covid-public-client", 

    password: "covid19", 

} 

var client  = mqtt.connect(host, config) 

client.on('connect', () => { 

    console.log("Connected to COVID PubSub+ Broker!") 

}) 

Enter fullscreen mode Exit fullscreen mode

您上面的操作是初始化了一个mqtt客户端,并使用主机和配置变量连接到了代理 URL。MQTT 对象随后会返回一些信号,您的客户端应用程序可以利用这些信号来实现回调行为。在本例中,我们监听的是“connect”信号client.on(‘connect’)。我们稍后会介绍更多信号。

注意:代理 URL 和凭据来自此处:https://github.com/SolaceLabs/covid19-stream-processors#1-connection-information

现在通过在终端执行以下命令来测试您的连接。


node index.js 

Enter fullscreen mode Exit fullscreen mode

你应该能看到Connected to COVID PubSub+ Broker!输出结果。瞧!

3. 订阅该主题

现在你已经连接到经纪商,你只需要订阅主题即可。


var topics = [ 

    "jhu/csse/covid19/raw", 

] 

client.on('connect', () => { 

    console.log("Connected to COVID PubSub+ Broker!") 

    topics.forEach( (topic) => { 

        console.log("Subscribing to topic: ", topic) 

        client.subscribe(topic) 

    }) 

}) 



Enter fullscreen mode Exit fullscreen mode

4. 收听收到的消息

我们要监听的第二个信号如下message


client.on('message', (topic, message) => { 

    console.log("Received message on Topic: ", topic,"\nMessage:\n", JSON.parse(message.toString()))

}) 



Enter fullscreen mode Exit fullscreen mode

请注意,从代理服务器接收到的消息是二进制格式的。为了将其转换为人类可读的格式message.toString(),我们使用 JSON 解析器进行解析。请注意,消息是根据仓库中定义的模式以 JSON 格式发送的。

最终申请表如下所示:


var mqtt = require('mqtt') 



var host = "tcp://mr2r9za6fwi0wf.messaging.solace.cloud:1883" 

var config = { 

    username: "covid-public-client", 

    password: "covid19", 

} 



var topics = [ 

   "jhu/csse/covid19/raw", 

] 



var client  = mqtt.connect(host, config) 



client.on('connect', () => { 

    console.log("Connected to COVID PubSub+ Broker!") 

    topics.forEach( (topic) => { 

        console.log("Subscribing to topic: ", topic) 

        client.subscribe(topic) 

    }) 

}) 



client.on('message', (topic, message) => { 

    console.log("Received message on Topic: ", topic,"\nMessage:\n", JSON.parse(message.toString()))

}) 

Enter fullscreen mode Exit fullscreen mode

大功告成!您的应用程序现已连接到代理,并订阅数组中定义的一个或多个主题,且仅在发布新消息时才会做出反应。

示例运行

我已修改我的应用程序,使其能够订阅加拿大安大略省的所有病例更新(死亡、现存、确诊和康复病例)、约旦的康复病例以及英国所有省份的确诊病例,订阅方式为使用以下主题流test

var topics = [
    "jhu/csse/covid19/test/cases/+/update/Canada/Ontario/#",
    "jhu/csse/covid19/test/cases/recovered/update/Jordan/#",
    "jhu/csse/covid19/test/cases/confirmed/update/United Kingdom/#"
]
Enter fullscreen mode Exit fullscreen mode

请注意,MQTT 通配符(“+”和“#”)分别用于主题级别匹配和多级匹配。

替代文字
替代文字

您可以访问https://www.marcd.dev/COVIDStreamViewer/mqtt/mqttListener.html并订阅jhu/csse/covid19/raw主题以获取示例流查看器。

下一步

查看主题层次结构时,您可以订阅不同的主题,并使用MQTT 通配符进一步自定义客户端应用程序如何使用事件流。

我非常乐意看到您的想法,欢迎随时分享并向SolaceLabs 的 GitHub 代码库提交 pull request!如果您有任何疑问,请在下方评论区留言。也欢迎您阅读这篇博文,了解我如何尝试用相同的方法构建一个 Python 应用程序

文章来源:https://dev.to/solacedevs/how-i-built-an-event-driven-nodejs-app-on-realtime-covid-19-data-streams-3i98