将 Promise 转换为 Observable
惰性可观测值
Rxjs fetch 操作符
在使用 RxJS 时,你可能会遇到需要在响应式代码库中集成 Promise 的情况。为了充分发挥响应式特性,最好将 Promise 转换为 Observable,这样我们就可以轻松地将其与其他操作符连接,甚至与其他流结合使用。
之前,rxjs 有一个专门为此类用例设计的运算符:`[] fromPromise`。当前版本的 rxjs 已弃用fromPromise该运算符from,转而使用 `[]`,但两者在使用上并无本质区别。from除了数组和字符串之外,`[]` 运算符还接受一个 Promise 对象,并将其转换为 Observable 对象。
如果您想了解它如何处理 Promise,或者如何判断传入的是否是 Promise,请查看https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/from.ts#L114和https://github.com/ReactiveX/rxjs/blob/master/src/internal/util/subscribeTo.ts#L20
const url = 'https://jsonplaceholder.typicode.com/todos/1';
function getTodo() {
return fetch(url)
.then(response => response.json());
}
getTodo().then(console.log);
上面的代码是我们要转换成使用可观察对象的代码片段的 Promise 表示,以便我们可以将其与其他现有的可观察对象集成在一起。
本文介绍的是一个返回 Promise 的函数,我们将把这个 Promise 转换为 Observable,而不仅仅是一个独立的 Promise。
实现 `from` 操作符的关键在于用 `from` 操作符包裹 Promise,并替换.then(...)为 Rxjs 的 ` map(...): `
const url = 'https://jsonplaceholder.typicode.com/todos/1';
function getTodo() {
return from(fetch(url))
.pipe(map(response => response.json()));
}
getTodo().subscribe(console.log);
这样应该就行了吧?我们已经成功地将返回 Promise 的函数转换成了返回 Observable 的函数。现在我们可以开始将它与其他 Observable/Operator 结合使用,从而创建更高级的数据流。
但如果我告诉你,这可能不是你(现在)想要的呢?
惰性可观测值
使用 Observable 时,如果没有活动的订阅,通常不会发生任何事情。但是,即使从上面的代码中移除订阅,仍然会触发 HTTP 请求。您可以在这里查看实际效果:https://stackblitz.com/edit/rxjs-bb626s
如果您检查 DevTools 的网络选项卡,您会发现即使我们没有任何订阅,HTTP 调用也确实被触发了。
我们可以通过两种方式解决这个问题:一是使用现有的 rxjs 操作符,二是结合from我们已经在使用的操作符;二是您可以决定从头开始构建可观察对象。
使用延迟运算符
Rxjs 的 defer 操作符可以用来等待观察者订阅后再创建实际的可观察对象。
function getTodo() {
return defer(() => from(fetch(url)));
}
const getTodo$ = getTodo();
setTimeout(() => {
getTodo$.subscribe();
}, 5000);
这将确保 HTTP 调用仅在 5000 毫秒后触发,也就是我们向 Observable 添加订阅的时刻。您可以在https://stackblitz.com/edit/rxjs-fgwokv
查看实际效果。
注意:大多数情况下,您可能会通过 mergeMap/switchMap/exhaustMap/concatMap 等操作引入异步数据,这些操作可能会返回一个源自 Promise 的 Observable。如果是这种情况,理论上我们不需要使用 defer,因为 Observable 只有在源 Observable 发出数据后才会创建。但是,无论 Promise 如何使用,使用 defer 来确保它是惰性的仍然是一个好主意。
从零开始构建可观测对象
虽然我建议尽可能使用现有的 rxjs 操作符,但我认为,对于将 Promise 转换为 Observable 而言,值得我们自己控制 Observable 的创建,这样我们就可以更好地控制取消订阅 Observable 时发生的情况(我们将在 Promise 取消部分介绍)。
function getTodo() {
return new Observable(observer => {
return from(fetch(url)).subscribe(observer);
});
}
const getTodo$ = getTodo();
setTimeout(() => {
getTodo$.subscribe();
}, 5000);
上面的代码会基于 Promise 创建一个 Observable,并在 5000 毫秒后才订阅它。如果你查看这个 StackBlitz 示例https://stackblitz.com/edit/rxjs-4zj1bx,你会发现 HTTP 调用是在 5 秒后才触发的。因此,我们的 Observable 现在是惰性的,只有在添加订阅时才会解析 Promise(并触发 HTTP 调用)。
请注意,我们添加了一个显式订阅,并将其作为清理逻辑从 Observable 的构造函数回调中返回。这样做可以确保在取消订阅 Observable 时,该订阅也会被清理
getTodo()。
承诺取消
我们的 Promise 到 Observable 转换过程中仍然缺少一个关键步骤。在我们的例子中,Promise 代表一个 HTTP 请求。如果我们在 HTTP 请求完成之前取消订阅 Observable,我们可能希望中止该 HTTP 请求。
function getTodo() {
return new Observable(observer => {
const abortController = new AbortController();
const subscription = from(fetch(url, {
signal: abortController.signal
})).subscribe(observer);
return () => {
abortController.abort();
subscription.unsubscribe();
}
});
}
const getTodo$ = getTodo();
setTimeout(() => {
const sub = getTodo$.subscribe();
sub.unsubscribe();
}, 5000);
AbortController 是一个内置接口,允许我们取消 DOM 请求,包括 Promise。尽管许多异步操作可能需要自定义 AbortController 实现,但 fetch API 默认支持 AbortController。这意味着我们只需要创建一个 AbortController 实例,将其 signal 属性传递给 fetch 方法,并在适当的时候调用 abort 方法即可。在我们的例子中,指的是在 TearDownLogic 中调用 abort 方法,TearDownLogic 会在我们取消订阅 Observable 时被调用。您可以在https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API上阅读更多关于中止 fetch 请求的信息。
这里有一个 StackBlitz 示例,其中包含了中止 HTTP 调用的功能:https://stackblitz.com/edit/rxjs-7wc1rb。如果您检查开发者工具的网络选项卡,您会注意到 HTTP 调用被触发,但立即被取消了。
Rxjs fetch 操作符
Rxjs 内置了将 fetch API 转换为 Observable 的功能(参见:https
://github.com/ReactiveX/rxjs/blob/0e4849a36338133ac3c1b890cd68817547177f44/src/internal/observable/dom/fetch.ts )。您可能已经注意到,它还使用了 AbortController 来在取消订阅 Observable 时取消 HTTP 请求(尽管由于本文仅关注 Promise 取消的基础知识,实际操作会稍微复杂一些)。您可能更倾向于使用 Rxjs 的内置功能,而不是自己编写代码。不过,本文旨在提供一个示例,说明如何将任何 Promise 转换为 Observable。

