发布于 2026-01-06 4 阅读
0

深入探索 Angular 中的 RxJS:RxJS 流、可观察对象、订阅运算符、创建运算符、组合运算符、错误处理运算符、过滤运算符、多播运算符、转换运算符

深入探索 Angular 中的 RxJS

为什么选择 RxJS

可观测变量

订阅

操作员

创建运算符

组合运算符

错误处理运算符

过滤运算符

多播运营商

变换算子

在深入探讨RxJS( Angular 中的JavaScript 响应式扩展)之前,我们应该先了解 RxJS 究竟是什么。RxJS 是一个强大的 JavaScript 库,它使用Observables的概念来实现响应式编程。作为 Web 开发领域最热门的库之一,RxJS 提供了一种强大的函数式方法来处理事件,并且与越来越多的框架、库和工具集成,因此学习 RxJS 的理由从未如此充分。

根据其文档

可以把 RxJS 看作是事件处理的 Lodash。

ReactiveX 或 RxJS 内部采用观察者模式,其中我们称之为Subject的对象维护其依赖项,并在其任何状态发生变化时发出通知。

为什么选择 RxJS

由于 RxJS 遵循函数式编程的基本原理,它为事件提供了各种类型的纯函数。这意味着你的代码更不容易出错。通常情况下,我们会创建非纯函数,而这些函数在代码规模扩大时可能会导致代码混乱。

RxJS 以Stream 的形式为应用程序处理各种事件。Stream 本质上是Observable的定义,我们稍后会详细介绍 Observable。Stream API 允许你以数据块的形式获取一系列数据,而我们通常从 API 获取的大型数据往往是分块的。RxJS Stream 本身包含许多子 API,这使得处理与 Web API 相关的日常任务变得更加容易,例如鼠标事件、键盘事件或任何来自后端服务的数据。

现在让我们来了解一下 RxJS 用于异步事件管理的一些基本概念。

可观测变量

正如我们前面讨论的,Observable 是 Stream 的一种定义或声明,它本质上是一个未来事件或值的集合,我们会持续地获取这些事件或值。几乎任何事物都可以创建 Observable,但在 RxJS 中最常见的用例是事件。创建Observable最简单的方法是使用RxJS提供的内置函数。Angular 默认包含这个强大的库,因此您无需显式安装它。

让我们来看一段代码片段:

注意:您可以在ng-run.com上在线尝试代码片段,这样您就不必仅仅为了这些代码片段而创建 Angular 项目。

import { Component, VERSION, OnInit } from '@angular/core';
import { interval, fromEvent } from "rxjs";      // <----------- importing rxjs lib 

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  ngOnInit() {
    const interval$ = interval(2000);  //<-- interval func. same as setinterval in vanilla javascript
    interval$.subscribe(val => console.log(val)) // subscribed to listen our stream of numbers
  }
}
Enter fullscreen mode Exit fullscreen mode

运行这段代码后,按下快捷F-12键打开 Chrome 调试工具,并查看控制台选项卡。延迟 2 秒后,您将看到一些数字。

您可能已经注意到我创建了一个常量变量interval$,并且您可能想知道为什么我$在变量名中添加了 `#`。这只是Observable的一个标准做法,表示该变量是一个Observable

我们来看另一个简单的代码示例:

import { Component, VERSION, OnInit } from '@angular/core';
import { interval, fromEvent } from "rxjs";      // <----------- importing rxjs lib 

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  ngOnInit() {
    const clickEvt$ = fromEvent(document, 'click');
    clickEvt$.subscribe(evt => console.log(evt))
  }
}
Enter fullscreen mode Exit fullscreen mode

执行此代码后,当您单击浏览器文档中的任何位置时,您将mouse click event在控制台中看到,因为它会创建一个点击事件流来监听每一次点击。

订阅

订阅是整个流程的启动机制。我们可以说它是 Observable 的执行,通过订阅,你可以订阅事件并根据需要映射或转换数据。要创建订阅,你需要调用 `subscribe` 方法,并传入一个函数(或对象)——也称为观察者。订阅对象有一个重要的方法,称为 `disposable`,unsubscribe()它不接受任何参数,负责释放/退出订阅。在之前的 RxJS 版本中,订阅被称为“Disposable”。

import { Component, OnInit } from '@angular/core';
import { fromEvent } from "rxjs";
@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  name = 'Angular';
  ngOnInit() {
    const clickEvt$ = fromEvent(document, 'click');
    clickEvt$.subscribe(evt => console.log(evt))
  }
}
Enter fullscreen mode Exit fullscreen mode

在上面的代码片段中,我们在文档的任何位置设置了一个点击事件监听器,然后在每次点击文档时传递subscribe方法,然后它返回一个包含Unsscribe 的对象,其中包含清理逻辑,例如删除事件。

需要注意的是,每个订阅都会创建自己的执行上下文,这意味着subscribe第二次调用该方法将创建一个新的事件监听器。

import { Component, OnInit } from '@angular/core';
import { fromEvent } from "rxjs";
@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  name = 'Angular';
  ngOnInit() {
    const clickEvt$ = fromEvent(document, 'click');
    const keyUpEvt$ = fromEvent(document, 'keyup');
    clickEvt$.subscribe(evt => console.log(evt));
    keyUpEvt$.subscribe(evt => console.log(evt));
  }
}
Enter fullscreen mode Exit fullscreen mode

订阅机制在可观察对象观察者之间建立一对一的单向通信,也称为单播。值得注意的是,当我们讨论可观察对象源向观察者发送数据时,这是一种推送模型。源本身并不了解或关心订阅者如何处理这些数据,它只是简单地将数据推送给观察者。

操作员

即使Observable是 RxJS 的基础,但如果没有操作符(operator) , RxJS 也是不完整的。操作符是 RxJS 中的一些纯函数,负责操作源数据并返回转换后的 Observable 值。许多 RxJS 操作符类似于原生 JavaScript 中用于处理数组的函数。以下是 RxJS 代码中操作符的示例:map

import { Component, OnInit } from '@angular/core';
import { fromEvent, of } from "rxjs";
import { map } from "rxjs/operators";
@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  name = 'Angular';
  ngOnInit() {
   const transformedData = of(1,2,3,4,5,6)
      .pipe(map((val: any) => val * 5))
      .subscribe(data => console.log(data));
  }
}
Enter fullscreen mode Exit fullscreen mode

你会看到所有这些数字在订阅中都会相乘5,如果你在控制台中查看transformedData,它会显示特定的 Observable。RxJS
的操作符数量庞大,刚开始学习时可能会感到不知所措。我们当然不会涵盖所有操作符,但会详细介绍一些最常用的操作符,这些操作符你很可能会在应用程序中用到。

我们先从最常见的说起,

管道

Pipe函数就像一条流水线,将可观察数据源连接到各个操作符。它允许在包含多个操作符的可观察链中使用它们。为了提高代码的可读性,我们可以在函数中实现多个操作pipe

import { Component, OnInit } from '@angular/core';
import { fromEvent, of } from "rxjs";
import { map } from "rxjs/operators";
@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  name = 'Angular';
  ngOnInit() {
   const transformedData = of(1,2,3,4,5,6)
      .pipe(map((val: any) => val * 5))
      .subscribe(data => console.log(data));
  }
}
Enter fullscreen mode Exit fullscreen mode

另一个最常用也最简单的 RxJS 操作符是Of函数。它会从数据源中按顺序发出每个值,然后发出一个完整的通知。

替代文字
来自 rxjs 官方网站的官方大理石纹理图片

Of运算符的代码片段

import { Component, OnInit } from '@angular/core';
import { of } from "rxjs";
@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  name = 'Angular';
  ngOnInit() {
    const person = { name: 'John Doe', age: 22 };  //<-- simple object
    const personObs = of(person);                  //<-- convert object into stream
    personObs.subscribe(data => console.log(data)) //<-- execute observable
  }
}
Enter fullscreen mode Exit fullscreen mode

RxJS 基于 6 种类型的运算符。

1)创建运算符
2)组合运算符
3)错误处理运算符
4)过滤运算符
5)多播运算符
6)转换运算符

创建运算符

创建运算符是一些函数,可以用来从任何其他数据类型创建 Observable,或者将其转换为 Observable,就像我们在上面的示例中所做的那样。从通用用例到特定用例,您可以自由地(也鼓励您)将所有内容都转换为流。创建运算符中还包含许多其他运算符。

以下是使用 RxJS Ajax 模块的简单创建操作符的示例:

import { Component, VERSION, OnInit } from '@angular/core';
import { ajax } from 'rxjs/ajax';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  name = 'Angular ' + VERSION.full;
  githubUsers = `https://api.github.com/users`;
  users = ajax({ url: this.githubUsers, method: "GET" })
  ngOnInit() {
    const subscribe = this.users.subscribe(
      res => console.log(res.response),
      err => console.error(err)
    );
  }
}

Enter fullscreen mode Exit fullscreen mode

组合运算符

组合运算符(也称为连接运算符)允许连接来自多个可观察对象的数据。发射值是这些运算符的主要区别。组合运算符中还包含许多其他运算符。

以下是一个最常见的组合运算符示例:

import { Component, VERSION, OnInit } from '@angular/core';
import { fromEvent, interval } from 'rxjs';
import { map, combineAll, take } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  name = 'Angular ' + VERSION.full;
  ngOnInit() {
    const clicks = fromEvent(document, 'click');
    const higherOrder = clicks.pipe(
      map(
        ev => interval(Math.random() * 2000).pipe(take(3))
      ), take(2)
    );
    const result = higherOrder.pipe(combineAll())

    result.subscribe(data => console.log(data));
  }
}

Enter fullscreen mode Exit fullscreen mode

在这个例子中,我们将结果clickshigherOrder可观察对象结合起来,并通过订阅可观察对象将其显示在控制台中result

错误处理运算符

错误是开发过程中不可避免的副作用。这些运算符提供了有效的方式来优雅地处理错误,并在错误发生时执行重试逻辑。其他一些运算符包含在错误处理运算符中。

以下是处理运算符的示例catchError,它可以捕获要处理的可观察对象上的错误,并通过返回一个新的可观察对象或抛出一个错误来进行处理。

import { Component, VERSION, OnInit } from '@angular/core';
import { of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  name = 'Angular ' + VERSION.full;
  ngOnInit() {
    of(1, 2, 3, 4, 5).pipe(
      map(num => {
        if (num == 4) throw 'Four!'
        return num
      }),
      catchError(err => of('I', 'II', 'III', 'IV', 'V')),
    )
      .subscribe(data => console.log(data))
  }
}
Enter fullscreen mode Exit fullscreen mode

过滤运算符

过滤运算符提供了一些技术,用于接受(或拒绝)来自可观测源的值,以及处理流中值的累积。此运算符类似于 `get` Array.prototype.filter,后者对发出的值返回 true。

以下是filterRxJS 中最简单的运算符示例:

import { Component, VERSION, OnInit } from '@angular/core';
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  name = 'Angular ' + VERSION.full;
  ngOnInit() {
    const source = from([
      { name: 'Joe', age: 31 },
      { name: 'Bob', age: 25 }
    ]);

    //filter out people with age under 30
    const example = source.pipe(filter(person => person.age >= 30));
    //output: "Over 30: Joe"
    const subscribe = example.subscribe(val => console.log(`Over 30: ${val.name}`))
  }
}
Enter fullscreen mode Exit fullscreen mode

多播运营商

在 RxJS 中,默认情况下 Observable 是冷的,或者说是单播的(每个订阅者对应一个源)。这些操作符可以将 Observable 变为热的,或者说是多播的,从而允许多个订阅者共享副作用。

multicast标准主题运算符示例

import { Component, VERSION, OnInit } from '@angular/core';
import { Subject, interval, ConnectableObservable } from 'rxjs';
import { take, tap, multicast, mapTo } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  name = 'Angular ' + VERSION.full;
  ngOnInit() {
    //emit every 2 seconds, take 5
    const source = interval(2000).pipe(take(5));

    const example = source.pipe(
      //since we are multicasting below, side effects will be     executed once
      tap(() => console.log('Side Effect #1')),
      mapTo('Result!')
    );

    //subscribe subject to source upon connect()
    const multi = example.pipe(multicast(() => new Subject())) as ConnectableObservable<number>;
    /*
      subscribers will share source
      output:
      "Side Effect #1"
      "Result!"
      "Result!"
      ...
    */
    const subscriberOne = multi.subscribe(val => console.log(val));
    const subscriberTwo = multi.subscribe(val => console.log(val));
    //subscribe subject to source
    multi.connect()
  }
}
Enter fullscreen mode Exit fullscreen mode

在上面的例子中,我们使用 ` connectObservable<number>Object` 作为pipe函数的类型,因为pipe`Object` 函数只返回一个 `Object` Observable,而mutlicast`Object` 运算符返回`Object` connectObservable,所以我们才能得到connect一个带有multi命名 Observable 的函数。你可以在这里了解更多关于可连接 Observable 的信息。

变换算子

在运算符链中转换值是一项常见任务。这些运算符提供了几乎适用于所有用例的转换技术。在上面的一些示例中,我们使用了一些转换运算符,例如mapTo`, map`scan和 `& mergeMap`。以下是转换运算符中的所有运算符

让我们来看一个最常用的变换运算符的例子,

import { Component, VERSION, OnInit } from '@angular/core';
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { mergeMap } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  name = 'Angular ' + VERSION.full;
  ngOnInit() {
    // free api url
    const API_URL = 'https://jsonplaceholder.typicode.com/todos/1';

    // streams
    const click$ = fromEvent(document, 'click');
    click$
      .pipe(
        /*
         * Using mergeMap for example, but generally for GET requests
         * you will prefer switchMap.
         * Also, if you do not need the parameter like
         * below you could use mergeMapTo instead.
         * ex. mergeMapTo(ajax.getJSON(API_URL))
         */
        mergeMap(() => ajax.getJSON(API_URL))
      )
      // { userId: 1, id: 1, ...}
      .subscribe(console.log);
  }
}
Enter fullscreen mode Exit fullscreen mode

以上示例展示了如何将click$可观察对象与从 API 获取的响应合并ajax.getJSON()。当我们点击文档中的任意位置时,控制台会显示 API 的响应。

本文介绍了所有主要运算符,希望您能从中学习到一些关于 RxJS 的新知识。以下是一些 RxJS 的更多资源:
https://www.learnrxjs.io/
https://rxjs.dev/
https://www.learnrxjs.io/learn-rxjs/recipes
https://www.youtube.com/playlist?list=PL55RiY5tL51pHpagYcrN9ubNLVXF8rGVi

如果你喜欢这篇文章,请分享到你的圈子,并关注我以获取更多此类短文。

和平✌️✌️✌️

文章来源:https://dev.to/mquanit/deep-dive-with-rxjs-in-angular-4e6o