流媒体如何简化你的生活
问题
什么是流?
代码示例
流媒体如何解决我们的问题
我没说出口的事
更多资源
本文最初发表于 LogRocket,经许可转载于此。
在 Web 开发领域,数据流(及其构建模块——可观察对象)正日益成为热门话题。像BaconJS和RxJS这样的库已经存在多年,RxJS 甚至被用作 Angular 2+ 的基础。事实上,TC39 甚至提出了一项提案,建议为 Angular 语言添加原生可观察对象。
所以直播现在很火。但是……为什么?为什么这么多人关心直播?
简而言之,基于流的方法极大地简化了困扰人们数十年的几个问题。我们稍后会讨论这些问题以及流如何帮助解决它们,但在那之前,我想先抛砖引玉,提出一个贯穿始终的主题,希望大家在接下来的讨论中牢记于心。
流式传输解决的问题都与数据的发送、接收和处理有关。因此,我们的主题核心是:在我看来,流式传输提供了一种视角上的转变,从请求数据转变为接收数据。
问题
这几乎是显而易见的(但我还是要说)——现代网络应用程序极其复杂。它们通常包含大量或多或少独立的组件,这些组件同时存在于同一个页面上,从各种来源请求数据,转换数据,以各种巧妙的方式组合来自不同来源的数据,最终,如果一切顺利,会将部分数据呈现在屏幕上供我们用户查看。
顺便说一下,“数据源”并不一定指“REST API”。数据可以来自各种各样的地方:
- 用于从服务器获取实时更新的 WebSocket 连接
- 用户输入事件,例如文本字段中的鼠标移动或键盘事件。
- 来自传感器的输入,例如手机的陀螺仪或加速度计
postMessage()来自 Web Worker、iframe 或相关窗口的通信;来自IndexedDBlocalStorage的存储更改事件
还有很多其他例子;你可能还能想到我遗漏的一些东西。
所有这些复杂因素都难以应对。以下是一些经常出现的问题情况:
- 多个独立组件同时使用同一个数据源。
- 组件需要监听来自数据源的更新并实时做出反应。
- 应用程序的多个独立部分需要保持同步;来自单个数据源的更新应尽可能立即反映在所有地方。
- 一个组件中的用户操作应该更新其他几个独立的组件,以便该组件可以作为其他组件的数据源。
- 每个组件都使用不同的数据源,并以独特的方式组合它们的输出,通常会对这些数据源中的数据进行转换和合并,以满足该组件的需求。每次从任何数据源更新数据后,都需要再次进行这种转换。
- 流媒体可以轻松处理所有这些问题,而且处理方式简单易懂。
什么是流?
在深入代码示例之前,让我们先花一分钟时间简单谈谈理论。
这里提到的软件设计模式叫做观察者模式。在这个模式中,有两个重要的角色:“观察者”和“主体”(也称为“可观察对象”)。顾名思义,观察者“观察”主体,每当主体发出任何数据时,观察者都会获知。在代码中,这是通过主体维护一个列表来实现的,该列表记录了所有正在观察它的观察者。每当主体有数据要传递时,它就会遍历这个列表,并调用每个观察者的特殊方法,将数据作为参数传递给该方法。
观察者模式在软件开发中应用广泛。它是所有发布/订阅交互背后的基本架构。你甚至可以把日常事件处理程序看作是观察者。我认为这种模式如此流行的原因显而易见:它能够轻松地在异步事件发生时发现它们,并在数据源可用时立即获取数据而无需轮询,这非常强大。
流(Streams)是比观察者和主体(Subjects)高一层的抽象概念。流使用可以同时充当观察者的主体,观察其他主体以接收数据。每个主体观察其他主体以等待数据,对接收到的数据进行某种处理,然后将部分数据发送给正在观察它的主体。这些观察者主体使得构建长链数据处理器变得非常容易,这些处理器可以对数据进行各种有趣的操作,并将数据传递给应用程序中需要它的组件。
值得一提的是,正如一个对象可以被多个观察者观察一样,一个观察者也可以观察多个对象。这使得我们可以用各种有趣的方式将来自不同来源的数据合并在一起。
花点时间想象一下,把许多这样的独立观察者连接起来,然后退后一步,看看全局。想想数据是如何在这个系统中从源头流向目的地,与其他源头的数据融合,分成支流,再与其他数据汇合,从而形成各种有趣的路径,高效地将数据输送到系统中各个需要的地方。我们把这种全局性的视角称为“数据流”。
代码示例
既然我们已经了解了理论,那就让我们把它付诸实践吧。
对于每个数据源,无论其类型如何,都创建一个主题,并将其提供给任何需要从该数据源获取数据的组件。不同的 UI 框架实现此功能的方式各不相同,但就我们的目的而言,我们将每个主题放在一个 JavaScript 模块中。然后,任何需要从该数据源获取数据的组件都可以导入该主题。
注:本文代码示例将使用 JavaScript 作为编程语言,RxJS 作为流库,但这只是我个人的选择。RxJS 是我最熟悉的,但还有其他流库可以实现相同的功能,无论是在 JavaScript 还是其他语言中。实际上,RxJS 只是ReactiveX这种抽象规范的 JavaScript 实现,而 ReactiveX 本身在各种编程语言中都有实现。
假设我们需要定期轮询一个 API。我们可以使用 RxJS 的便捷ajax辅助interval函数 `setTimeout` 创建一个主题来处理此操作。该函数会创建一个主题,并按指定的时间间隔发出数据。(` pipesetTimeout` 操作符本质上是将你提供的操作符链接起来,并switchMap从接收到的每一位数据创建一个新的可观察对象,然后在创建下一个可观察对象之前发出该可观察对象的数据。不过,不必过于纠结于此;这些是 RxJS 特有的,与本文主题关系不大。)
import {interval} from 'rxjs'
import {ajax} from 'rxjs/ajax'
Import {switchMap} from 'rxjs/operators'
// every 10 seconds, poll /api/updates
const apiSubject = interval(1000).pipe(
switchMap(_ => ajax.getJSON('https://mysite.com/api/updates'))
)
export apiSubject
我们可以继续沿用这种方式,为每个返回主题的数据源创建一个模块。当需要在组件中使用来自这些数据源的数据时,就像导入其他任何数据一样简单:
import {webSocket} from 'rxjs/webSocket'
const wsSubject = webSocket('ws://mysite.com:8081')
// if needed, we could do some pre-processing of websocket messages here
export wsSubject
所有数据源都通过通用接口生成数据,这本身就很有用。但流的真正强大之处在于,我们可以极其轻松地将这些观察者对象链接起来,从而处理和操作数据。像 RxJS 这样的流库通过在其对象数据类型上提供“操作符”方法,使这一切变得非常简单。这些操作符方法在内部观察对象,并返回一个新的待观察对象。
为了演示这一点,我们来看一个非常简单的例子:一个聊天室应用程序。在这种情况下,上述 WebSocket 可用于实时聊天通知,而 API 可用于接收来自服务器的、无需实时更新的信息。(我知道,两者都可以通过 WebSocket 实现,但为了演示方便,我们暂且这样理解。)
假设我们的服务器更新 API 返回两种类型的数据:
- 服务器用户列表在列表更改时会更新。
- 服务器偶尔会发布通知,这些通知应该会出现在聊天室中,供所有用户查看。
假设从服务器接收到的数据包格式如下:
{
"messages": [
{"type": "who", "val": ["joe", "coolguy", "jane", "karla"]},
{"type": "notice", "val": "Welcome to the server!"},
{"type": "notice", "val": "Server maintenance scheduled for 5:00pm EST"}
]
}
我们需要通过更新用户列表来处理“谁”消息,并通过在聊天室中显示来处理“通知”消息。完成第二个任务的一种方法可能是将通知消息视为用户消息,并为其分配一个特殊的用户名,例如“服务器”。
现在假设从 WebSocket 接收到的消息格式如下:
{
"user": "joe",
"message": "Hey, everyone!"
}
我们需要将通知转换为符合此格式,并将通知消息与 WebSocket 消息合并,以便发送到聊天室。幸运的是,使用流式传输非常简单:
import apiSubject from 'api-subject'
import wsSubject from 'ws-subject'
import {merge, from} from 'rxjs'
import {filter, pluck, switchMap} from 'rxjs/operators'
const serverMessages = apiSubject.pipe(
pluck('messages'), // grab the “messages” array from the server response
switchMap(from) // create an observable from the array that emits one message at a time
)
// update the user list when the server sends a new one
serverMessages.pipe(
filter(m => m.type === 'who'), // get just the 'who' messages
pluck('val') // get the array of usernames from each 'who' message
).subscribe(function(userList) {
// update the user list here
})
// transform the 'notice' messages to the same format as a websocket message
const notices = serverMessages.pipe(
filter(m => m.type === 'notice'),
pluck('val'),
map(notice => ({ user: 'SERVER', message: val }))
)
// create a subject that observes both the server notices and the user messages from the websocket
merge(notices, wsSubject).subscribe(function(message) {
// post the message to the chat room here
})
相当不错!这段代码中有些细节不太明显,因为它被各种复杂的辅助函数和运算符抽象了起来,那就是每个辅助函数和运算符(webSocket`, ajax, from, pluck, switchMap, , filter` merge)都会创建一个新的 Subject,该 Subject 会观察流中的前一个(或多个)Subject,对接收到的每个数据位进行处理,并将新的数据发送到流的下游。而特殊subscribe方法则创建了一个简单的观察者,它会消费流末端输出的所有内容,但自身无法被观察。
流媒体如何解决我们的问题
既然我们已经对流的功能有了一些了解,让我们回到之前讨论过的问题列表,确保我们能找到每个问题的答案。我们逐一来看:
- 多个独立组件同时使用同一个数据源。—— 将你的主题封装在一个模块中,可以让任何组件访问并订阅它。
- 组件需要监听数据源的更新并实时做出反应。—— 这就是观察者模式的核心思想:一旦主体发出数据,其观察者就能获知并实时做出反应。
- 应用程序的多个独立部分需要保持同步;来自单个数据源的更新应尽可能即时地反映到所有地方。—— 多个观察者可以观察同一对象,因此保持不同组件同步很容易。
- 一个组件中的用户操作应该更新其他几个独立组件,以便该组件能够作为其他组件的数据源。——这个问题有几种可能的解决方案,具体取决于使用场景。我过去实现这一目标的一种方法是创建一个中心模块,其中包含一个代表数据源的 Subject,并允许组件订阅该 Subject 或向其推送数据。在 RxJS 中,Subject 类型有一个名为“next”的方法,可以调用该方法将一些数据传递给 Subject:
const mySubject = new Subject
mySubject.subscribe(console.log)
mySubject.next('Testing!') // logs 'Testing!' to the console
- 每个组件都使用不同的数据源,并以独特的方式组合它们的输出,通常会转换和合并来自这些数据源的数据以满足该组件的需求。每次从任何数据源更新数据后,都需要再次进行这种转换。—— 我们看到了一个简单的例子,将 WebSocket 消息和服务器通知合并到一个数据流中。每当有来自任一数据源的消息到达时,它都会立即推送到聊天室。这是一个非常简单的例子,但希望您能从中了解如何扩展它以处理更复杂的情况。除了上述方法之外,RxJS 还提供了几个用于
merge合并来自多个数据流的数据的函数,例如`addData()`、`addData()combineLatest`zip或 ` addData(concat)`。
我没说出口的事
这次对数据流的介绍比较浅显,但我希望能够让大家初步了解数据流的强大功能。数据流可以显著简化系统中的数据流,尤其是在处理多个需要交互并同时更新应用程序中不同部分的数据源时。
但因为我想尽量浅尝辄止,所以有很多内容我没有提及。比如,如何处理数据流中的错误?如何清理可观察对象以防止内存泄漏?“热”可观察对象和“冷”可观察对象到底是什么?所有这些都非常重要,如果你决定深入研究数据流(嘿嘿),它们应该是你最先学习的内容之一。但我的重点在于:说服你深入探索数据流。希望我已经做到了!
更多资源
如果您想了解更多关于流媒体能为您带来什么的信息(我希望您能这样做),以下是一些可供进一步阅读/观看的链接:
- ReactiveX 提供的操作符概述——我发现通读这些操作符能让你更好地了解流的功能。如果你想要更具交互性的体验,RxJS 文档中有一个交互式的“选择你的操作符”指南。
- 如果要推荐一位学习流式(他称之为“响应式”)编程的人,我会推荐Ben Lesh。他做过很多关于 RxJS 和“响应式编程”的精彩演讲,他的博客文章也写得非常好,信息量丰富。以下是一些亮点:
- 响应式思考 ——一场精彩的演讲,讲解如何理解基于观察者的架构,以及如何以“响应式”的视角思考你正在解决的问题。
- 通过构建可观察对象来学习可观察对象 ——Ben 在这篇精彩的博文中提出了一个他经常重复的理念:“可观察对象本质上就是函数”。绝对值得一读。
- 热观测值与冷观测值 ——这是你决定深入研究时首先要回答的问题之一。
- 你一直错过的响应式编程入门指南——André Staltz 撰写的这篇关于 Observables 和 Streams 的精彩入门文章,比我这里介绍的要更技术性一些。