发布于 2026-01-06 1 阅读
0

这就是你需要消息代理的原因。

这就是你需要消息代理的原因。

在我大学好友(也是我未来 memphis.dev 的联合创始人)和我共同开发的开源项目中,你会发现一个名为“Makhela”的项目,这是一个希伯来语单词,意思是合唱团。
为了方便起见,我们将使用“Choir”(合唱团)。

“Choir”是一个开源的开源情报(OSINT)项目,专注于利用LDA和主题建模等人工智能模型,收集社交媒体个人资料之间基于上下文的关联。该项目使用Python编写,旨在解释特定领域内全球用户以及该领域内高影响力人物的讨论内容,并关注边缘群体正在发生的事情。在概念验证或最小可行产品(MVP)阶段,我们使用了单一数据源——Twitter,该数据源易于集成。

下图是“合唱团”背后的“大脑”。该大脑会根据语料库的增量变化和新摄取的数据,自主地生长和分析新的顶点和边。

每个顶点代表一个人物档案,每条边强调(a)人物之间的联系。(b)相似的颜色代表相似的主题。

马赫拉图

紫色= 主题 1;
蓝色= 主题 2
;黄色= 边缘主题

经过相当数量的研究、开发时间以及大量的故障排除和调试,事情开始好转。

我们需要解决的问题包括:

  • 了解个人资料之间的联系
  • 构建一个排名算法,用于添加更多有影响力的人
  • 将传入数据的模式转换为分析端能够处理的格式。
  • 近乎实时至关重要——用外部数据丰富每条推文
  • 适应“Twitter”速率限制
  • 每次上游或架构更改都会导致分析功能崩溃。
  • 收集和分析之间的同步,这两者是两个不同的组成部分。
  • 基础设施
  • 规模

与任何初创公司或早期项目一样,我们以最小可行产品(MVP)的形式构建了“Choir”,完全基于“Twitter”平台,其界面如下所示 -

Makhela建筑事务所 1

马赫拉收藏家

“Collector”是一个单体式的、用Python编写的应用程序,它基本上每隔几个小时以固定的时间间隔批量收集和提炼数据,以便进行分析和可视化。

然而,随着收集的数据量及其复杂性的增长,问题开始出现。每次批处理周期分析都耗时数小时,而就收集的数据量而言(最多也就几百兆字节!),这完全没有道理。关于其他挑战,我们将在后续章节中详细讨论。

几个月后,用户开始使用“Choir”了!!!他们
不仅使用,还积极参与互动、付费并提出功能需求。
这简直是所有创作者的梦想!

但随后我们突然意识到这一点。

(a)“Twitter”不是宇宙的中心,我们需要将“合唱团”扩展到更多来源。

(b)代码中的任何微小改动都会破坏整个流程。

(c)单体架构对于数据驱动型应用程序的性能而言是致命的。

对于所有急于启动并开始获得良好发展势头的项目来说,推动增长和扩大用户群是你的首要、第二和第三要务。

此时此刻,你最不想做的就是回头重建你的框架。你想保持目前的势头。

抱着这样的想法,我们说“以后再添加更多数据源并进行重构”。这真是一个大错特错的决定。


扩展数据驱动型应用程序面临的挑战

  1. 每个新的数据源都需要不同的模式转换。

  2. 每次模式更改都会对下游管道中的其他阶段产生连锁反应。

  3. 增量/递增式数据采集。虽然您可以等待整个批次数据采集完成后再将其保存到数据库,但应用程序经常会崩溃。想象一下,您正在执行一个非常缓慢的数据采集过程,在采集到最后一条记录时,采集过程崩溃了。

  4. 在单体架构中,很难扩展那些需要更多功率的特定功能。

  5. 分析功能通常需要修改、升级和算法才能获得更好的结果,而这些修改、升级和算法是通过使用或需要来自收集器的不同密钥来实现的。

虽然没有快速的解决办法,但我们可以构建一个框架来支持这些需求。


解决方案

选项 1 – 将整个现有流程复制到另一个来源,例如“Facebook”。

另一种建筑方案

除了复制收集器之外,我们还需要——

  • 维护两种不同的模式(噩梦)
  • 分析功能完全不同。Facebook 和 Twitter 上的个人资料之间的联系不同,需要不同的客观关系。
  • 分析器应该能够以联合的方式分析数据,而不是单独分析数据;因此,源 X 的任何微小变化都会直接影响分析器,并经常导致其崩溃。
  • 双重维护

这样的例子不胜枚举……
因此,它无法扩展。

方案二——来了!使用消息代理!

我想先划定一个基准线。消息代理本身并非解决方案,而是一个支持框架或工具,用于实现分支式、可扩展的数据驱动架构。

什么是消息代理?

“消息代理是一种用于消息验证、转换和路由的架构模式。它协调应用程序之间的通信[模糊],最大限度地减少应用程序为了交换消息而需要相互了解的程度,从而有效地实现解耦。[4]”。维基百科。

首先,让我们把它翻译成我们更容易理解的形式。

消息代理是一个临时数据存储区。为什么是临时的呢?因为其中的每条数据都会在用户设定的时间后被删除。因此,消息代理中的数据被称为“消息”。每条消息的大小通常为几字节到几兆字节。

在消息代理周围,我们可以找到生产者和消费者。

生产者 = 向消息代理发送消息的“对象”。
消费者 = 从消息代理消费消息的“对象”。

“事物”指的是与消息代理连接并交换数据的系统/服务/应用程序/物联网/某种目标。

需要注意的是,同一个服务/系统/应用程序可以同时扮演生产者和消费者的角色。

消息队列都源自同一家族,但代理和队列之间存在着关键区别。

  1. MQ 使用发布和订阅机制。MQ 本身会将数据推送给消费者,而不是反过来(消费者从​​代理服务器拉取数据)。

  2. 排序功能已承诺启用。消息将按接收顺序推送。某些系统要求必须如此。

  3. 发布者(制作者)与订阅者的比例为 1:1。也就是说,现代版本可以通过一些功能(例如交换等)来实现这一目标。

知名的消息代理/队列包括 Apache Kafka、RabbitMQ、Apache Pulsar 以及我们自己的 Memphis.dev。Kafka的应用场景涵盖事件流处理到实时数据处理。考虑到 Memphis.dev 易于部署且对开发者友好,您可以考虑使用它来替代 Kafka。

你还在吗?太棒了!

因此,让我们了解一下使用消息代理是如何帮助“Choir”实现规模化的。


而不是这样做——

Makhela建筑事务所 1

通过将应用程序解耦为更小的微服务,并使用消息代理来协调流程,它最终变成了这样——

消息代理架构

从左上角开始,插入到系统中的每一条数据(推文/帖子)都会自动触发整个流程,并在不同的阶段之间流动。

  1. 数据采集​​。三个采集器并行搜索社区中新增的每个用户资料。如果需要更多数据源/社交网络,我们已经在后台开发,一旦准备就绪,便会开始监听传入的事件。该系统支持无限扩展的数据源,允许在不影响其他数据源的情况下处理特定数据源,支持微扩展以单独提升每个数据源的性能,等等。

  2. 转换。数据采集完成后,结果将进入下一阶段——“模式转换”。在此阶段,模式转换服务会将事件的模式转换为分析函数可以解读的格式。这实现了模式管理的“单一数据源”,因此,如果上游发生变更,只需联系此服务即可调试问题。在更完善的设计中,它还可以与外部模式注册表集成,从而进一步简化维护工作。

架构变更

  1. 分析。每个事件片段都会经过转换,并以分析函数可以解读的形式发送给分析函数。在“合唱团”项目中,我们使用了不同的AI模型。由于无法扩展规模,因此转向逐个事件进行分析无疑大有裨益。

  2. 保存。在“合唱团”和数据库类型之间创建抽象,并能够将多个插入操作批量到一个批次中,而不是每个事件都发出请求。


我撰写本文的主要目的是强调尽早实现消息代理模式和技术的重要性,以避免将来进行痛苦的重构。消息代理默认能够帮助您构建可扩展的架构,因为它消除了紧耦合的约束。

是的,您的路线图和新增功能固然重要;是的,它需要一个学习过程;是的,对于您目前的阶段来说,这可能看起来像是过度设计的解决方案。但是,对于数据驱动型用例而言,规模化的需求会很快体现在性能、敏捷性、功能添加、修改等方面。糟糕的设计决策或缺乏合适的框架会耗尽您的资源。最好在用户和功能需求激增之前,就构建敏捷的基础架构,而不必追求企业级规模。

总之,消息代理的入门门槛绝对值得你花时间去了解。


特别感谢Yaniv Ben-Hemo的写作。


加入 4500 多名订阅者的行列,订阅我们的数据工程新闻简报。


关注我们,获取最新动态!
GitHub文档Discord

文章来源:https://dev.to/memphis_dev/here-is-why-you-need-a-message-broker-31g0