发布于 2026-01-06 1 阅读
0

理解 Observables 我们想要实现什么? 构建一个 RxJS 玩具示例 总结

理解可观测变量

我们试图实现什么目标

构建一个玩具版的 RxJS

总结

响应式编程最近发展迅猛。像 RxJS 和 Most.js 这样的库以及像 Cycle.js 这样的框架,让构建复杂的异步行为变得轻而易举。但是,理解这些可观察对象或流(以下我将交替使用这两个术语)的工作原理往往很困难。以我的经验来看,如果你能自己动手实现,那就说明你真正理解了它。因此,在本文中,我们将构建一个 RxJS 的玩具示例!

我们试图实现什么目标

由于很多人不熟悉流(Stream),这里做一个简单的概括:流是随时间变化的数组。我的意思是:

const myArray = [1, 2, 3, 4];

const myValue = myArray
    .map(i => i * 2)
    .reduce((acc, curr) => acc + curr, 0);
console.log(myValue);

在这段代码片段中,我们获取一个数组并计算其中所有元素的总和。但如果我们从外部来源(例如 API)获取值呢?那么我们可以使用 Promise:

const myValuePromise = getData() //uses a promise based API
    .then(data => data
        .map(i => i*2)
        .reduce((acc, curr) => acc + curr, 0)
    )
    .then(console.log);

这种方法也很好用。但如果我们从 WebSocket 获取数据呢?WebSocket 传递的不是像 Promise 那样的单个未来值,而是多个值!这时,流就派上用场了:

let websocket = new Websocket(/* ... */);
const websocketStream = Observable.create(observer => {
    websocket.onMessage = (msg) => observer.onNext(msg);
    websocket.onClose = () => observer.complete();

    return () => websocket.close();
});

const myValueStream = websocketStream
    .map(i => i * 2)
    .scan((acc, curr) => acc + curr, 0)
    .subscribe(console.log);

现在,每次通过 WebSocket 收到新值时,scan都会发出新的总和。如果您想等到 WebSocket 关闭后再打印最终的总和,可以使用reduce.

构建一个玩具版的 RxJS

既然我们已经了解了如何使用流,现在就该开始构建流库了。首先,让我们问问自己,我们希望在什么情况下发生什么。我们需要一个观察者来订阅一个观察对象。观察者随后会从上游接收值。所以,为了简单起见,我们首先定义我们的可观察对象。这里我将使用 TypeScript,因为它有助于理解代码的运行机制。

interface Observer<T> {
    next(t: T): void;
    complete(): void;
}

如您所见,观察者是一个包含属性nextcomplete函数的对象。现在我们需要可观察对象。为此,我们将从底层开始,这意味着目前,我们的可观察对象只需要一个subscribe方法。

interface Observable<T> {
    subscribe(observer: Observer<T>): void;
}

所以,简单来说,我们只需要创建一个只有一个方法的对象。让我们复现一下我们的 WebSocket 示例:

let websocket = new Websocket(/* ... */);
const websocketStream = {
    subscribe(observer) {
        websocket.onMessage = msg => observer.next(msg);
        websocket.onClose = () => observer.complete();
    }
}

好的,这看起来几乎和真正的 RxJS 示例一模一样了。唯一的区别是缺少清理工作,但为了简单起见,我就不赘述了。接下来,我们需要定义一个 map 函数,它接受一个函数和一个 observable 对象,并返回一个新的 observable 对象:

function map<T, U>(fn: (t: T) => U): (s: Observable<T>) => Observable<U> {
    return stream => ({
        subscribe(observer: Observer<U>) {
            stream.subscribe({
                next: (value: T) => observer.next(fn(value)),
                complete: observer.complete
            });
        }
    });
}

我们基本上是创建了一个工厂函数,它订阅前一个可观察对象,并带有一个内部观察者,该观察者应用该函数并将值返回给下一个观察者。TypeScript 再次帮助我们理解了它的工作原理。

现在我们可以这样做(扩展前面的例子):

const myValueStream = map(i => i * 2)(websocketStream);

虽然这种方法可行,但它并不是最优雅的 API。我们习惯于在可观察对象上调用函数。幸运的是,这个问题很容易解决:

class Stream<T> implements Observable<T> {
    constructor(public subscribe: (o: Observer<T>) => void) {}

    public compose<U>(operator: (s: Stream<T>) => Stream<U>): Stream<U> {
        return operator(this);
    }

    public map<U>(fn: (t: T) => U): Stream<U> {
        return this.compose(map(fn));
    }
}

现在我们有了一个 ES6 类class,它接受一个subscribe方法作为构造函数参数,并且map其原型上有一个 `__init__` 属性。这意味着我们的示例看起来像这样:

let websocket = new Websocket(/* ... */);
-const websocketStream = {
-    subscribe(observer) {
+const websocketStream = new Stream(observer => {
        websocket.onMessage = msg => observer.next(msg);
        websocket.onClose = () => observer.complete();
    }
}

const myValueStream = websocketStream
    .map(i => i * 2);

现在实现起来scan相当容易,所以我们将改为实现reduce一个等待最后一个值到达后,再发出一次结果的函数:

function fold<T, U>(fn: (acc: U, curr: T) => U, seed: U): (s: Stream<T>) => Stream<U> {
    return stream => new Stream(observer => {
        let accumulator = seed;
        stream.subscribe({
            next: value => {
                accumulator = fn(accumulator, value);
            },
            complete: () => {
                observer.next(accumulator);
                observer.complete();
            }
        });
    });
}

可以看出,我们有一个内部状态,它会随着前一个数据流的每个事件而更新。前一个数据流完成后,我们会发出该值并结束该数据流。我们可以用scan同样的方法实现,只不过每次有新值时都会发出值,而不是在数据流结束后才发出值。

这样我们就可以复现我们的 WebSocket 示例了(假设我们已经scan像这样添加到 Stream 类中map):

let websocket = new Websocket(/* ... */);
const websocketStream = new Stream(observer => {
    websocket.onMessage = (msg) => observer.onNext(msg);
    websocket.onClose = () => observer.complete();
});

const myValueStream = websocketStream
    .map(i => i * 2)
    .scan((acc, curr) => acc + curr, 0)
    .subscribe({
        next: console.log,
        complete: () => {}
    });

让我们更进一步。我们希望初始请求使用 HTTP 协议,后续更新通过 WebSocket 进行。如果没有流(Stream),这很难实现。为此,我们首先需要某种方法将 Promise 转换为流:

function fromPromise<T>(p: Promise<T>): Stream<T> {
    return new Stream<T>(observer => {
        p.then(data => observer.next(data));
    });
}

接下来,我们需要一种方法将数组流转换为单个元素流(假设我们的 API 返回数据数组,而 WebSocket 只返回单个元素)。我们可以将其拆分为两个函数:一个函数将数组转换为流,另一个函数“扁平化”流:

function fromArray<T>(array: T[]): Stream<T> {
    return new Stream(observer => {
        array.forEach(e => {
            observer.next(e);
        });
        observer.complete();
    });
}

function flatMap<T, U>(fn: (t: T) => Stream<U>): (s: Stream<T>) => Stream<U> {
    return stream => new Stream<U>(observer => {
        stream.subscribe({
            next(s: Stream<U>) {
                s.subscribe({
                    next: observer.next,
                    complete: () => {}
                });
            },
            complete: () => observer.complete()
        });
    });
}

正如你所见,fromArray我们只是将每个元素都推送到流中。flatMap这里的情况更有意思。我们首先订阅外部流,然后对于接收到的每个新的内部流,我们也订阅它,并将所有值输出给下一个观察者。

让我们使用新方法(假设我们已经将 flatMap 添加到 Stream 类中):

let websocket = new Websocket(/* ... */);
const websocketStream = new Stream(observer => {
    websocket.onMessage = (msg) => observer.onNext(msg);
    websocket.onClose = () => observer.complete();
});

let httpStream = fromPromise(getData())
    .flatMap(data => fromArray(data));

const myValueStream = websocketStream
    .map(i => i * 2)
    .scan((acc, curr) => acc + curr, 0)
    .subscribe({
        next: console.log,
        complete: () => {}
    });

最后还缺少将这两个数据流合并起来的方法:

function merge<T>(...streams: Stream<T>[]): Stream<T> {
    return new Stream(observer => {
        let numCompleted = 0;
        streams.forEach(s => {
            s.subscribe({
                next: value => observer.next(value),
                complete: () => {
                    numCompleted++;
                    if(numCompleted === streams.length) {
                        observer.complete();
                    }
                }
            });
        });
    });
}

如您所见,我们只是订阅了所有流,并在其中任何一个流发出值时发出相应的值。如果所有流都已完成,则完成该流。至此,我们的示例就完成了:

let websocket = new Websocket(/* ... */);
const websocketStream = new Stream(observer => {
    websocket.onMessage = (msg) => observer.onNext(msg);
    websocket.onClose = () => observer.complete();
});

let httpStream = fromPromise(getData())
    .flatMap(data => fromArray(data));

const myValueStream = merge(httpStream, websocketStream)
    .map(i => i * 2)
    .scan((acc, curr) => acc + curr, 0)
    .subscribe({
        next: console.log,
        complete: () => {}
    });

总结

如果你有复杂的异步行为,Observables 会非常有用。而且自己编写也不难!我在这里展示的 RxJS 示例并不是主流流库的实现方式,因为在 JavaScript 中使用闭包会消耗大量性能。但核心思想是相同的。

希望您喜欢这篇文章并从中有所收获。如果您对响应式编程感兴趣,不妨了解一下Cycle.js,这是一个完全响应式的框架,我也是其核心团队成员之一。

文章来源:https://dev.to/jvanbruegge/understanding-observables