理解可观测变量
我们试图实现什么目标
构建一个玩具版的 RxJS
总结
响应式编程最近发展迅猛。像 RxJS 和 Most.js 这样的库以及像 Cycle.js 这样的框架,让构建复杂的异步行为变得轻而易举。但是,理解这些可观察对象或流(以下我将交替使用这两个术语)的工作原理往往很困难。以我的经验来看,如果你能自己动手实现,那就说明你真正理解了它。因此,在本文中,我们将构建一个 RxJS 的玩具示例!
我们试图实现什么目标
由于很多人不熟悉流(Stream),这里做一个简单的概括:流是随时间变化的数组。我的意思是:
const myArray = [1, 2, 3, 4];
const myValue = myArray
.map(i => i * 2)
.reduce((acc, curr) => acc + curr, 0);
console.log(myValue);
在这段代码片段中,我们获取一个数组并计算其中所有元素的总和。但如果我们从外部来源(例如 API)获取值呢?那么我们可以使用 Promise:
const myValuePromise = getData() //uses a promise based API
.then(data => data
.map(i => i*2)
.reduce((acc, curr) => acc + curr, 0)
)
.then(console.log);
这种方法也很好用。但如果我们从 WebSocket 获取数据呢?WebSocket 传递的不是像 Promise 那样的单个未来值,而是多个值!这时,流就派上用场了:
let websocket = new Websocket(/* ... */);
const websocketStream = Observable.create(observer => {
websocket.onMessage = (msg) => observer.onNext(msg);
websocket.onClose = () => observer.complete();
return () => websocket.close();
});
const myValueStream = websocketStream
.map(i => i * 2)
.scan((acc, curr) => acc + curr, 0)
.subscribe(console.log);
现在,每次通过 WebSocket 收到新值时,scan都会发出新的总和。如果您想等到 WebSocket 关闭后再打印最终的总和,可以使用reduce.
构建一个玩具版的 RxJS
既然我们已经了解了如何使用流,现在就该开始构建流库了。首先,让我们问问自己,我们希望在什么情况下发生什么。我们需要一个观察者来订阅一个可观察对象。观察者随后会从上游接收值。所以,为了简单起见,我们首先定义我们的可观察对象。这里我将使用 TypeScript,因为它有助于理解代码的运行机制。
interface Observer<T> {
next(t: T): void;
complete(): void;
}
如您所见,观察者是一个包含属性next和complete函数的对象。现在我们需要可观察对象。为此,我们将从底层开始,这意味着目前,我们的可观察对象只需要一个subscribe方法。
interface Observable<T> {
subscribe(observer: Observer<T>): void;
}
所以,简单来说,我们只需要创建一个只有一个方法的对象。让我们复现一下我们的 WebSocket 示例:
let websocket = new Websocket(/* ... */);
const websocketStream = {
subscribe(observer) {
websocket.onMessage = msg => observer.next(msg);
websocket.onClose = () => observer.complete();
}
}
好的,这看起来几乎和真正的 RxJS 示例一模一样了。唯一的区别是缺少清理工作,但为了简单起见,我就不赘述了。接下来,我们需要定义一个 map 函数,它接受一个函数和一个 observable 对象,并返回一个新的 observable 对象:
function map<T, U>(fn: (t: T) => U): (s: Observable<T>) => Observable<U> {
return stream => ({
subscribe(observer: Observer<U>) {
stream.subscribe({
next: (value: T) => observer.next(fn(value)),
complete: observer.complete
});
}
});
}
我们基本上是创建了一个工厂函数,它订阅前一个可观察对象,并带有一个内部观察者,该观察者应用该函数并将值返回给下一个观察者。TypeScript 再次帮助我们理解了它的工作原理。
现在我们可以这样做(扩展前面的例子):
const myValueStream = map(i => i * 2)(websocketStream);
虽然这种方法可行,但它并不是最优雅的 API。我们习惯于在可观察对象上调用函数。幸运的是,这个问题很容易解决:
class Stream<T> implements Observable<T> {
constructor(public subscribe: (o: Observer<T>) => void) {}
public compose<U>(operator: (s: Stream<T>) => Stream<U>): Stream<U> {
return operator(this);
}
public map<U>(fn: (t: T) => U): Stream<U> {
return this.compose(map(fn));
}
}
现在我们有了一个 ES6 类class,它接受一个subscribe方法作为构造函数参数,并且map其原型上有一个 `__init__` 属性。这意味着我们的示例看起来像这样:
let websocket = new Websocket(/* ... */);
-const websocketStream = {
- subscribe(observer) {
+const websocketStream = new Stream(observer => {
websocket.onMessage = msg => observer.next(msg);
websocket.onClose = () => observer.complete();
}
}
const myValueStream = websocketStream
.map(i => i * 2);
现在实现起来scan相当容易,所以我们将改为实现reduce一个等待最后一个值到达后,再发出一次结果的函数:
function fold<T, U>(fn: (acc: U, curr: T) => U, seed: U): (s: Stream<T>) => Stream<U> {
return stream => new Stream(observer => {
let accumulator = seed;
stream.subscribe({
next: value => {
accumulator = fn(accumulator, value);
},
complete: () => {
observer.next(accumulator);
observer.complete();
}
});
});
}
可以看出,我们有一个内部状态,它会随着前一个数据流的每个事件而更新。前一个数据流完成后,我们会发出该值并结束该数据流。我们可以用scan同样的方法实现,只不过每次有新值时都会发出值,而不是在数据流结束后才发出值。
这样我们就可以复现我们的 WebSocket 示例了(假设我们已经scan像这样添加到 Stream 类中map):
let websocket = new Websocket(/* ... */);
const websocketStream = new Stream(observer => {
websocket.onMessage = (msg) => observer.onNext(msg);
websocket.onClose = () => observer.complete();
});
const myValueStream = websocketStream
.map(i => i * 2)
.scan((acc, curr) => acc + curr, 0)
.subscribe({
next: console.log,
complete: () => {}
});
让我们更进一步。我们希望初始请求使用 HTTP 协议,后续更新通过 WebSocket 进行。如果没有流(Stream),这很难实现。为此,我们首先需要某种方法将 Promise 转换为流:
function fromPromise<T>(p: Promise<T>): Stream<T> {
return new Stream<T>(observer => {
p.then(data => observer.next(data));
});
}
接下来,我们需要一种方法将数组流转换为单个元素流(假设我们的 API 返回数据数组,而 WebSocket 只返回单个元素)。我们可以将其拆分为两个函数:一个函数将数组转换为流,另一个函数“扁平化”流:
function fromArray<T>(array: T[]): Stream<T> {
return new Stream(observer => {
array.forEach(e => {
observer.next(e);
});
observer.complete();
});
}
function flatMap<T, U>(fn: (t: T) => Stream<U>): (s: Stream<T>) => Stream<U> {
return stream => new Stream<U>(observer => {
stream.subscribe({
next(s: Stream<U>) {
s.subscribe({
next: observer.next,
complete: () => {}
});
},
complete: () => observer.complete()
});
});
}
正如你所见,fromArray我们只是将每个元素都推送到流中。flatMap这里的情况更有意思。我们首先订阅外部流,然后对于接收到的每个新的内部流,我们也订阅它,并将所有值输出给下一个观察者。
让我们使用新方法(假设我们已经将 flatMap 添加到 Stream 类中):
let websocket = new Websocket(/* ... */);
const websocketStream = new Stream(observer => {
websocket.onMessage = (msg) => observer.onNext(msg);
websocket.onClose = () => observer.complete();
});
let httpStream = fromPromise(getData())
.flatMap(data => fromArray(data));
const myValueStream = websocketStream
.map(i => i * 2)
.scan((acc, curr) => acc + curr, 0)
.subscribe({
next: console.log,
complete: () => {}
});
最后还缺少将这两个数据流合并起来的方法:
function merge<T>(...streams: Stream<T>[]): Stream<T> {
return new Stream(observer => {
let numCompleted = 0;
streams.forEach(s => {
s.subscribe({
next: value => observer.next(value),
complete: () => {
numCompleted++;
if(numCompleted === streams.length) {
observer.complete();
}
}
});
});
});
}
如您所见,我们只是订阅了所有流,并在其中任何一个流发出值时发出相应的值。如果所有流都已完成,则完成该流。至此,我们的示例就完成了:
let websocket = new Websocket(/* ... */);
const websocketStream = new Stream(observer => {
websocket.onMessage = (msg) => observer.onNext(msg);
websocket.onClose = () => observer.complete();
});
let httpStream = fromPromise(getData())
.flatMap(data => fromArray(data));
const myValueStream = merge(httpStream, websocketStream)
.map(i => i * 2)
.scan((acc, curr) => acc + curr, 0)
.subscribe({
next: console.log,
complete: () => {}
});
总结
如果你有复杂的异步行为,Observables 会非常有用。而且自己编写也不难!我在这里展示的 RxJS 示例并不是主流流库的实现方式,因为在 JavaScript 中使用闭包会消耗大量性能。但核心思想是相同的。
希望您喜欢这篇文章并从中有所收获。如果您对响应式编程感兴趣,不妨了解一下Cycle.js,这是一个完全响应式的框架,我也是其核心团队成员之一。
文章来源:https://dev.to/jvanbruegge/understanding-observables