深入探索 Angular 中的 RxJS
为什么选择 RxJS
流
可观测变量
订阅
操作员
创建运算符
组合运算符
错误处理运算符
过滤运算符
多播运营商
变换算子
在深入探讨RxJS( Angular 中的JavaScript 响应式扩展)之前,我们应该先了解 RxJS 究竟是什么。RxJS 是一个强大的 JavaScript 库,它使用Observables的概念来实现响应式编程。作为 Web 开发领域最热门的库之一,RxJS 提供了一种强大的函数式方法来处理事件,并且与越来越多的框架、库和工具集成,因此学习 RxJS 的理由从未如此充分。
根据其文档
可以把 RxJS 看作是事件处理的 Lodash。
ReactiveX 或 RxJS 内部采用观察者模式,其中我们称之为Subject的对象维护其依赖项,并在其任何状态发生变化时发出通知。
为什么选择 RxJS
由于 RxJS 遵循函数式编程的基本原理,它为事件提供了各种类型的纯函数。这意味着你的代码更不容易出错。通常情况下,我们会创建非纯函数,而这些函数在代码规模扩大时可能会导致代码混乱。
流
RxJS 以Stream 的形式为应用程序处理各种事件。Stream 本质上是Observable的定义,我们稍后会详细介绍 Observable。Stream API 允许你以数据块的形式获取一系列数据,而我们通常从 API 获取的大型数据往往是分块的。RxJS Stream 本身包含许多子 API,这使得处理与 Web API 相关的日常任务变得更加容易,例如鼠标事件、键盘事件或任何来自后端服务的数据。
现在让我们来了解一下 RxJS 用于异步事件管理的一些基本概念。
可观测变量
正如我们前面讨论的,Observable 是 Stream 的一种定义或声明,它本质上是一个未来事件或值的集合,我们会持续地获取这些事件或值。几乎任何事物都可以创建 Observable,但在 RxJS 中最常见的用例是事件。创建Observable最简单的方法是使用RxJS提供的内置函数。Angular 默认包含这个强大的库,因此您无需显式安装它。
让我们来看一段代码片段:
注意:您可以在ng-run.com上在线尝试代码片段,这样您就不必仅仅为了这些代码片段而创建 Angular 项目。
import { Component, VERSION, OnInit } from '@angular/core';
import { interval, fromEvent } from "rxjs"; // <----------- importing rxjs lib
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
ngOnInit() {
const interval$ = interval(2000); //<-- interval func. same as setinterval in vanilla javascript
interval$.subscribe(val => console.log(val)) // subscribed to listen our stream of numbers
}
}
运行这段代码后,按下快捷F-12键打开 Chrome 调试工具,并查看控制台选项卡。延迟 2 秒后,您将看到一些数字。
您可能已经注意到我创建了一个常量变量interval$,并且您可能想知道为什么我$在变量名中添加了 `#`。这只是Observable的一个标准做法,表示该变量是一个Observable。
我们来看另一个简单的代码示例:
import { Component, VERSION, OnInit } from '@angular/core';
import { interval, fromEvent } from "rxjs"; // <----------- importing rxjs lib
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
ngOnInit() {
const clickEvt$ = fromEvent(document, 'click');
clickEvt$.subscribe(evt => console.log(evt))
}
}
执行此代码后,当您单击浏览器文档中的任何位置时,您将mouse click event在控制台中看到,因为它会创建一个点击事件流来监听每一次点击。
订阅
订阅是整个流程的启动机制。我们可以说它是 Observable 的执行,通过订阅,你可以订阅事件并根据需要映射或转换数据。要创建订阅,你需要调用 `subscribe` 方法,并传入一个函数(或对象)——也称为观察者。订阅对象有一个重要的方法,称为 `disposable`,unsubscribe()它不接受任何参数,负责释放/退出订阅。在之前的 RxJS 版本中,订阅被称为“Disposable”。
import { Component, OnInit } from '@angular/core';
import { fromEvent } from "rxjs";
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
name = 'Angular';
ngOnInit() {
const clickEvt$ = fromEvent(document, 'click');
clickEvt$.subscribe(evt => console.log(evt))
}
}
在上面的代码片段中,我们在文档的任何位置设置了一个点击事件监听器,然后在每次点击文档时传递subscribe方法,然后它返回一个包含Unsscribe 的对象,其中包含清理逻辑,例如删除事件。
需要注意的是,每个订阅都会创建自己的执行上下文,这意味着subscribe第二次调用该方法将创建一个新的事件监听器。
import { Component, OnInit } from '@angular/core';
import { fromEvent } from "rxjs";
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
name = 'Angular';
ngOnInit() {
const clickEvt$ = fromEvent(document, 'click');
const keyUpEvt$ = fromEvent(document, 'keyup');
clickEvt$.subscribe(evt => console.log(evt));
keyUpEvt$.subscribe(evt => console.log(evt));
}
}
订阅机制在可观察对象和观察者之间建立一对一的单向通信,也称为单播。值得注意的是,当我们讨论可观察对象源向观察者发送数据时,这是一种推送模型。源本身并不了解或关心订阅者如何处理这些数据,它只是简单地将数据推送给观察者。
操作员
即使Observable是 RxJS 的基础,但如果没有操作符(operator) , RxJS 也是不完整的。操作符是 RxJS 中的一些纯函数,负责操作源数据并返回转换后的 Observable 值。许多 RxJS 操作符类似于原生 JavaScript 中用于处理数组的函数。以下是 RxJS 代码中操作符的示例:map
import { Component, OnInit } from '@angular/core';
import { fromEvent, of } from "rxjs";
import { map } from "rxjs/operators";
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
name = 'Angular';
ngOnInit() {
const transformedData = of(1,2,3,4,5,6)
.pipe(map((val: any) => val * 5))
.subscribe(data => console.log(data));
}
}
你会看到所有这些数字在订阅中都会相乘5,如果你在控制台中查看transformedData,它会显示特定的 Observable。RxJS
的操作符数量庞大,刚开始学习时可能会感到不知所措。我们当然不会涵盖所有操作符,但会详细介绍一些最常用的操作符,这些操作符你很可能会在应用程序中用到。
我们先从最常见的说起,
管道
Pipe函数就像一条流水线,将可观察数据源连接到各个操作符。它允许在包含多个操作符的可观察链中使用它们。为了提高代码的可读性,我们可以在函数中实现多个操作符pipe。
import { Component, OnInit } from '@angular/core';
import { fromEvent, of } from "rxjs";
import { map } from "rxjs/operators";
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
name = 'Angular';
ngOnInit() {
const transformedData = of(1,2,3,4,5,6)
.pipe(map((val: any) => val * 5))
.subscribe(data => console.log(data));
}
}
的
另一个最常用也最简单的 RxJS 操作符是Of函数。它会从数据源中按顺序发出每个值,然后发出一个完整的通知。
Of运算符的代码片段
import { Component, OnInit } from '@angular/core';
import { of } from "rxjs";
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
name = 'Angular';
ngOnInit() {
const person = { name: 'John Doe', age: 22 }; //<-- simple object
const personObs = of(person); //<-- convert object into stream
personObs.subscribe(data => console.log(data)) //<-- execute observable
}
}
RxJS 基于 6 种类型的运算符。
1)创建运算符
2)组合运算符
3)错误处理运算符
4)过滤运算符
5)多播运算符
6)转换运算符
创建运算符
创建运算符是一些函数,可以用来从任何其他数据类型创建 Observable,或者将其转换为 Observable,就像我们在上面的示例中所做的那样。从通用用例到特定用例,您可以自由地(也鼓励您)将所有内容都转换为流。创建运算符中还包含许多其他运算符。
以下是使用 RxJS Ajax 模块的简单创建操作符的示例:
import { Component, VERSION, OnInit } from '@angular/core';
import { ajax } from 'rxjs/ajax';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
githubUsers = `https://api.github.com/users`;
users = ajax({ url: this.githubUsers, method: "GET" })
ngOnInit() {
const subscribe = this.users.subscribe(
res => console.log(res.response),
err => console.error(err)
);
}
}
组合运算符
组合运算符(也称为连接运算符)允许连接来自多个可观察对象的数据。发射值是这些运算符的主要区别。组合运算符中还包含许多其他运算符。
以下是一个最常见的组合运算符示例:
import { Component, VERSION, OnInit } from '@angular/core';
import { fromEvent, interval } from 'rxjs';
import { map, combineAll, take } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
ngOnInit() {
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map(
ev => interval(Math.random() * 2000).pipe(take(3))
), take(2)
);
const result = higherOrder.pipe(combineAll())
result.subscribe(data => console.log(data));
}
}
在这个例子中,我们将结果clicks和higherOrder可观察对象结合起来,并通过订阅可观察对象将其显示在控制台中result。
错误处理运算符
错误是开发过程中不可避免的副作用。这些运算符提供了有效的方式来优雅地处理错误,并在错误发生时执行重试逻辑。其他一些运算符包含在错误处理运算符中。
以下是处理运算符的示例catchError,它可以捕获要处理的可观察对象上的错误,并通过返回一个新的可观察对象或抛出一个错误来进行处理。
import { Component, VERSION, OnInit } from '@angular/core';
import { of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
ngOnInit() {
of(1, 2, 3, 4, 5).pipe(
map(num => {
if (num == 4) throw 'Four!'
return num
}),
catchError(err => of('I', 'II', 'III', 'IV', 'V')),
)
.subscribe(data => console.log(data))
}
}
过滤运算符
过滤运算符提供了一些技术,用于接受(或拒绝)来自可观测源的值,以及处理流中值的累积。此运算符类似于 `get` Array.prototype.filter,后者对发出的值返回 true。
以下是filterRxJS 中最简单的运算符示例:
import { Component, VERSION, OnInit } from '@angular/core';
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
ngOnInit() {
const source = from([
{ name: 'Joe', age: 31 },
{ name: 'Bob', age: 25 }
]);
//filter out people with age under 30
const example = source.pipe(filter(person => person.age >= 30));
//output: "Over 30: Joe"
const subscribe = example.subscribe(val => console.log(`Over 30: ${val.name}`))
}
}
多播运营商
在 RxJS 中,默认情况下 Observable 是冷的,或者说是单播的(每个订阅者对应一个源)。这些操作符可以将 Observable 变为热的,或者说是多播的,从而允许多个订阅者共享副作用。
multicast标准主题运算符示例
import { Component, VERSION, OnInit } from '@angular/core';
import { Subject, interval, ConnectableObservable } from 'rxjs';
import { take, tap, multicast, mapTo } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
ngOnInit() {
//emit every 2 seconds, take 5
const source = interval(2000).pipe(take(5));
const example = source.pipe(
//since we are multicasting below, side effects will be executed once
tap(() => console.log('Side Effect #1')),
mapTo('Result!')
);
//subscribe subject to source upon connect()
const multi = example.pipe(multicast(() => new Subject())) as ConnectableObservable<number>;
/*
subscribers will share source
output:
"Side Effect #1"
"Result!"
"Result!"
...
*/
const subscriberOne = multi.subscribe(val => console.log(val));
const subscriberTwo = multi.subscribe(val => console.log(val));
//subscribe subject to source
multi.connect()
}
}
在上面的例子中,我们使用 ` connectObservable<number>Object` 作为pipe函数的类型,因为pipe`Object` 函数只返回一个 `Object` Observable,而mutlicast`Object` 运算符返回`Object` connectObservable,所以我们才能得到connect一个带有multi命名 Observable 的函数。你可以在这里了解更多关于可连接 Observable 的信息。
变换算子
在运算符链中转换值是一项常见任务。这些运算符提供了几乎适用于所有用例的转换技术。在上面的一些示例中,我们使用了一些转换运算符,例如mapTo`, map`scan和 `& mergeMap`。以下是转换运算符中的所有运算符。
让我们来看一个最常用的变换运算符的例子,
import { Component, VERSION, OnInit } from '@angular/core';
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { mergeMap } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
name = 'Angular ' + VERSION.full;
ngOnInit() {
// free api url
const API_URL = 'https://jsonplaceholder.typicode.com/todos/1';
// streams
const click$ = fromEvent(document, 'click');
click$
.pipe(
/*
* Using mergeMap for example, but generally for GET requests
* you will prefer switchMap.
* Also, if you do not need the parameter like
* below you could use mergeMapTo instead.
* ex. mergeMapTo(ajax.getJSON(API_URL))
*/
mergeMap(() => ajax.getJSON(API_URL))
)
// { userId: 1, id: 1, ...}
.subscribe(console.log);
}
}
以上示例展示了如何将click$可观察对象与从 API 获取的响应合并ajax.getJSON()。当我们点击文档中的任意位置时,控制台会显示 API 的响应。
本文介绍了所有主要运算符,希望您能从中学习到一些关于 RxJS 的新知识。以下是一些 RxJS 的更多资源:
https://www.learnrxjs.io/
https://rxjs.dev/
https://www.learnrxjs.io/learn-rxjs/recipes
https://www.youtube.com/playlist?list=PL55RiY5tL51pHpagYcrN9ubNLVXF8rGVi
如果你喜欢这篇文章,请分享到你的圈子,并关注我以获取更多此类短文。
和平✌️✌️✌️
文章来源:https://dev.to/mquanit/deep-dive-with-rxjs-in-angular-4e6o
