发布于 2026-01-06 1 阅读
0

将 Promise 转换为 Observable 惰性 Observables Rxjs fetch 操作符

将 Promise 转换为 Observable

惰性可观测值

Rxjs fetch 操作符

在使用 RxJS 时,你可能会遇到需要在响应式代码库中集成 Promise 的情况。为了充分发挥响应式特性,最好将 Promise 转换为 Observable,这样我们就可以轻松地将其与其他操作符连接,甚至与其他流结合使用。

之前,rxjs 有一个专门为此类用例设计的运算符:`[] fromPromise`。当前版本的 rxjs 已弃用fromPromise该运算符from,转而使用 `[]`,但两者在使用上并无本质区别。from除了数组和字符串之外,`[]` 运算符还接受一个 Promise 对象,并将其转换为 Observable 对象。

如果您想了解它如何处理 Promise,或者如何判断传入的是否是 Promise,请查看https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/from.ts#L114https://github.com/ReactiveX/rxjs/blob/master/src/internal/util/subscribeTo.ts#L20

    const url = 'https://jsonplaceholder.typicode.com/todos/1';

    function getTodo() {
      return fetch(url)
        .then(response => response.json());
    }

    getTodo().then(console.log);
Enter fullscreen mode Exit fullscreen mode

上面的代码是我们要转换成使用可观察对象的代码片段的 Promise 表示,以便我们可以将其与其他现有的可观察对象集成在一起。

本文介绍的是一个返回 Promise 的函数,我们将把这个 Promise 转换为 Observable,而不仅仅是一个独立的 Promise。

实现 `from` 操作符的关键在于用 `from` 操作符包裹 Promise,并替换.then(...)为 Rxjs 的 ` map(...): `


    const url = 'https://jsonplaceholder.typicode.com/todos/1';

    function getTodo() {
      return from(fetch(url))
        .pipe(map(response => response.json()));
    }

    getTodo().subscribe(console.log);
Enter fullscreen mode Exit fullscreen mode

这样应该就行了吧?我们已经成功地将返回 Promise 的函数转换成了返回 Observable 的函数。现在我们可以开始将它与其他 Observable/Operator 结合使用,从而创建更高级的数据流。

但如果我告诉你,这可能不是你(现在)想要的呢?

惰性可观测值

使用 Observable 时,如果没有活动的订阅,通常不会发生任何事情。但是,即使从上面的代码中移除订阅,仍然会触发 HTTP 请求。您可以在这里查看实际效果:https://stackblitz.com/edit/rxjs-bb626s

如果您检查 DevTools 的网络选项卡,您会发现即使我们没有任何订阅,HTTP 调用也确实被触发了。

替代文字

我们可以通过两种方式解决这个问题:一是使用现有的 rxjs 操作符,二是结合from我们已经在使用的操作符;二是您可以决定从头开始构建可观察对象。

使用延迟运算符

Rxjs 的 defer 操作符可以用来等待观察者订阅后再创建实际的可观察对象。

    function getTodo() {
      return defer(() => from(fetch(url)));
    }

    const getTodo$ = getTodo();

    setTimeout(() => {
      getTodo$.subscribe();
    }, 5000);
Enter fullscreen mode Exit fullscreen mode

这将确保 HTTP 调用仅在 5000 毫秒后触发,也就是我们向 Observable 添加订阅的时刻。您可以在https://stackblitz.com/edit/rxjs-fgwokv
查看实际效果。

注意:大多数情况下,您可能会通过 mergeMap/switchMap/exhaustMap/concatMap 等操作引入异步数据,这些操作可能会返回一个源自 Promise 的 Observable。如果是这种情况,理论上我们不需要使用 defer,因为 Observable 只有在源 Observable 发出数据后才会创建。但是,无论 Promise 如何使用,使用 defer 来确保它是惰性的仍然是一个好主意。

从零开始构建可观测对象

虽然我建议尽可能使用现有的 rxjs 操作符,但我认为,对于将 Promise 转换为 Observable 而言,值得我们自己控制 Observable 的创建,这样我们就可以更好地控制取消订阅 Observable 时发生的情况(我们将在 Promise 取消部分介绍)。

    function getTodo() {
      return new Observable(observer => {
        return from(fetch(url)).subscribe(observer);
      });
    }

    const getTodo$ = getTodo();

    setTimeout(() => {
      getTodo$.subscribe();
    }, 5000);
Enter fullscreen mode Exit fullscreen mode

上面的代码会基于 Promise 创建一个 Observable,并在 5000 毫秒后才订阅它。如果你查看这个 StackBlitz 示例https://stackblitz.com/edit/rxjs-4zj1bx,你会发现 HTTP 调用是在 5 秒后才触发的。因此,我们的 Observable 现在是惰性的,只有在添加订阅时才会解析 Promise(并触发 HTTP 调用)。

请注意,我们添加了一个显式订阅,并将其作为清理逻辑从 Observable 的构造函数回调中返回。这样做可以确保在取消订阅 Observable 时,该订阅也会被清理getTodo()

承诺取消

我们的 Promise 到 Observable 转换过程中仍然缺少一个关键步骤。在我们的例子中,Promise 代表一个 HTTP 请求。如果我们在 HTTP 请求完成之前取消订阅 Observable,我们可能希望中止该 HTTP 请求。

    function getTodo() {
      return new Observable(observer => {
        const abortController = new AbortController();
        const subscription = from(fetch(url, {
          signal: abortController.signal
        })).subscribe(observer);

        return () => {
          abortController.abort();
          subscription.unsubscribe();
        }
      });
    }

    const getTodo$ = getTodo();

    setTimeout(() => {
      const sub = getTodo$.subscribe();
      sub.unsubscribe();
    }, 5000);
Enter fullscreen mode Exit fullscreen mode

AbortController 是一个内置接口,允许我们取消 DOM 请求,包括 Promise。尽管许多异步操作可能需要自定义 AbortController 实现,但 fetch API 默认支持 AbortController。这意味着我们只需要创建一个 AbortController 实例,将其 signal 属性传递给 fetch 方法,并在适当的时候调用 abort 方法即可。在我们的例子中,指的是在 TearDownLogic 中调用 abort 方法,TearDownLogic 会在我们取消订阅 Observable 时被调用。您可以在https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API上阅读更多关于中止 fetch 请求的信息。

这里有一个 StackBlitz 示例,其中包含了中止 HTTP 调用的功能:https://stackblitz.com/edit/rxjs-7wc1rb。如果您检查开发者工具的网络选项卡,您会注意到 HTTP 调用被触发,但立即被取消了。

替代文字

Rxjs fetch 操作符

Rxjs 内置了将 fetch API 转换为 Observable 的功能(参见:https
://github.com/ReactiveX/rxjs/blob/0e4849a36338133ac3c1b890cd68817547177f44/src/internal/observable/dom/fetch.ts )。您可能已经注意到,它还使用了 AbortController 来在取消订阅 Observable 时取消 HTTP 请求(尽管由于本文仅关注 Promise 取消的基础知识,实际操作会稍微复杂一些)。您可能更倾向于使用 Rxjs 的内置功能,而不是自己编写代码。不过,本文旨在提供一个示例,说明如何将任何 Promise 转换为 Observable。

文章来源:https://dev.to/frederikprijck/converting-a-promise-into-an-observable-dag