0%

Rxjs操作符

操作符学习

已学习

D

distinctUntilChanged

函数签名: distinctUntilChanged(compare: function): Observable
:bulb: distinctUntilChanged 默认使用 === 进行比较, 对象引用必须匹配!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 去除相邻重复
import { from } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

// 基于最新发出的值进行比较,只输出不同的值
const myArrayWithDuplicatesInARow = from([1, 1, 2, 2, 3, 1, 2, 3]);

const distinctSub = myArrayWithDuplicatesInARow
.pipe(distinctUntilChanged())
// 输出: 1,2,3,1,2,3
.subscribe(val => console.log('DISTINCT SUB:', val));

const nonDistinctSub = myArrayWithDuplicatesInARow
// 输出 : 1,1,2,2,3,1,2,3
.subscribe(val => console.log('NON DISTINCT SUB:', val));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 同一引用去重
import { from } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

const sampleObject = { name: 'Test' };
// 对象必须有相同的引用
const myArrayWithDuplicateObjects = from([
sampleObject,
sampleObject,
sampleObject
]);
// 基于最新发出的值进行比较,只输出不同的对象
const nonDistinctObjects = myArrayWithDuplicateObjects
.pipe(distinctUntilChanged())
// 输出: 'DISTINCT OBJECTS: {name: 'Test'}
.subscribe(val => console.log('DISTINCT OBJECTS:', val));

S

scan

函数签名: scan(accumulator: function, seed: any): Observable
随着时间的推移进行归并。
:bulb: 此操作符是许多基于 Redux 实现的 RxJS 的核心!

1
2
3
4
5
6
7
8
9
10
// 基于种子0累加
import { of } from 'rxjs';
import { scan } from 'rxjs/operators';

const source = of(1, 2, 3);
// 基础的 scan 示例,从0开始,随着时间的推移计算总数
const example = source.pipe(scan((acc, curr) => acc + curr, 0));
// 输出累加值
// 输出: 1,3,6
const subscribe = example.subscribe(val => console.log(val));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 对象累加
import { Subject } from 'rxjs';
import { scan } from 'rxjs/operators';

const subject = new Subject();
// scan 示例,随着时间的推移构建对象
const example = subject.pipe(
scan((acc, curr) => Object.assign({}, acc, curr), {})
);
// 输出累加值
const subscribe = example.subscribe(val =>
console.log('Accumulated object:', val)
);
// subject 发出的值会被添加成对象的属性
// {name: 'Joe'}
subject.next({ name: 'Joe' });
// {name: 'Joe', age: 30}
subject.next({ age: 30 });
// {name: 'Joe', age: 30, favoriteLanguage: 'JavaScript'}
subject.next({ favoriteLanguage: 'JavaScript' });
1
2
3
4
5
6
7
8
9
10
11
12
// 随机发出累加的数值
import { interval } from 'rxjs';
import { scan, map, distinctUntilChanged } from 'rxjs/operators';

// 累加数组中的值,并随机发出此数组中的值
const scanObs = interval(1000)
.pipe(
scan((a, c) => [...a, c], []),
map(r => r[Math.floor(Math.random() * r.length)]),
distinctUntilChanged()
)
.subscribe(console.log);

startWith

函数签名: startWith(an: Values): Observable
发出给定的第一个值(在当前流头部插入数据)

  • 用来初始化值,然后后续值由Observable输入
1
2
3
4
5
6
7
8
9
10
// RxJS v6+
import { startWith } from 'rxjs/operators';
import { of } from 'rxjs';

// 发出 (1,2,3)
const source = of(1, 2, 3);
// 从0开始
const example = source.pipe(startWith(0));
// 输出: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 与scan 结合使用拼接
import { startWith, scan } from 'rxjs/operators';
import { of } from 'rxjs';

// 发出 ('World!', 'Goodbye', 'World!')
const source = of('World!', 'Goodbye', 'World!');
// 以 'Hello' 开头,后面接当前字符串
const example = source.pipe(
startWith('Hello'),
scan((acc, curr) => `${acc} ${curr}`)
);
/*
输出:
"Hello"
"Hello World!"
"Hello World! Goodbye"
"Hello World! Goodbye World!"
*/
const subscribe = example.subscribe(val => console.log(val));
  • 使用 scan(随着时间的推移进行归并)

switchMap

函数签名: switchMap(project: function: Observable, resultSelector: function(outerValue, innerValue, outerIndex, innerIndex): any): Observable
映射成 observable,完成前一个内部 observable,发出值。
如果你想要维护多个内部 subscription 的话, 请尝试 mergeMap!
此操作符通常被认为是 mergeMap 的安全版本!
此操作符可以取消正在进行中的网络请求!
为什么使用 switchMap?
switchMap 和其他打平操作符的主要区别是它具有取消效果。在每次发出时,会取消前一个内部 observable (你所提供函数的结果) 的订阅,然后订阅一个新的 observable 。你可以通过短语切换成一个新的 observable来记忆它。
它能在像 typeaheads 这样的场景下完美使用,当有新的输入时便不再关心之前请求的响应结果。在内部 observable 长期存活可能会导致内存泄露的情况下,这也是一种安全的选择,例如,如果你使用 mergeMap 和 interval,并忘记正确处理内部订阅。记住,switchMap 同一时间只维护一个内部订阅,在示例 1中可以清楚出看到这一点。
不过要小心,在每个请求都需要完成的情况下,考虑写数据库,你可能要避免使用 switchMap 。如果源 observable 发出速度足够快的话,switchMap 可以取消请求。在这些场景中,mergeMap 是正确的选择。

1
2
3
4
5
6
7
8
9
10
// 每5秒重新启动 interval
import { timer, interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';

// 立即发出值, 然后每5秒发出值
const source = timer(0, 5000);
// 当 source 发出值时切换到新的内部 observable,发出新的内部 observable 所发出的值
const example = source.pipe(switchMap(() => interval(500)));
// 输出: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8
const subscribe = example.subscribe(val => console.log(val));

使用timer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 倒计时控制器
import { interval, fromEvent, merge, empty } from 'rxjs';
import { switchMap, scan, takeWhile, startWith, mapTo } from 'rxjs/operators';

const countdownSeconds = 10;
const setHTML = id => val => (document.getElementById(id).innerHTML = val);
const pauseButton = document.getElementById('pause');
const resumeButton = document.getElementById('resume');
const interval$ = interval(1000).pipe(mapTo(-1));

const pause$ = fromEvent(pauseButton, 'click').pipe(mapTo(false));
const resume$ = fromEvent(resumeButton, 'click').pipe(mapTo(true));

const timer$ = merge(pause$, resume$)
.pipe(
startWith(true),
switchMap(val => (val ? interval$ : empty())),
scan((acc, curr) => (curr ? curr + acc : acc), countdownSeconds),
takeWhile(v => v >= 0)
)
.subscribe(setHTML('remaining'));

T

timer

函数签名: timer(initialDelay: number | Date, period: number, scheduler: Scheduler): Observable
给定持续时间后,再按照指定间隔时间依次发出数字。从0开始

1
2
3
4
5
6
7
// 当timer参数值时间结束后发出值
import { timer } from 'rxjs';

// 1秒后发出0,然后结束,因为没有提供第二个参数
const source = timer(1000);
// 输出: 0
const subscribe = source.subscribe(val => console.log(val));
1
2
3
4
5
6
7
8
9
10
// 1秒后,间隔2秒发出值
import { timer } from 'rxjs';

/*
timer 接收第二个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第一个值,
然后每2秒发出序列值
*/
const source = timer(1000, 2000);
// 输出: 0,1,2,3,4,5......
const subscribe = source.subscribe(val => console.log(val));
1
2
3
4
5
6
7
8
9
10
11
12
// 每次点击时重置
import { interval, fromEvent } from 'rxjs';
import { switchMap, mapTo } from 'rxjs/operators';

// 发出每次点击
const source = fromEvent(document, 'click');
// 如果3秒内发生了另一次点击,则消息不会被发出
const example = source.pipe(
switchMap(val => interval(3000).pipe(mapTo('Hello, I made it!')))
);
// (点击)...3s...'Hello I made it!'...(点击)...2s(点击)...
const subscribe = example.subscribe(val => console.log(val));
  • 使用mapTo(将每个发出值映射成常量。)

M

map

函数签名: map(project: Function, thisArg: any): Observable
对源 observable 的每个值应用投射函数。
进多少,出多少
如果在处理过程中没有return 则返回undefined

1
2
3
4
5
6
7
8
9
10
// 每个值+10
import { from } from 'rxjs';
import { map } from 'rxjs/operators';

// 发出 (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
// 每个数字加10
const example = source.pipe(map(val => val + 10));
// 输出: 11,12,13,14,15
const subscribe = example.subscribe(val => console.log(val));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 从对象中映射单一属性
import { from } from 'rxjs';
import { map } from 'rxjs/operators';

// 发出 ({name: 'Joe', age: 30}, {name: 'Frank', age: 20},{name: 'Ryan', age: 50})
const source = from([
{ name: 'Joe', age: 30 },
{ name: 'Frank', age: 20 },
{ name: 'Ryan', age: 50 }
]);
// 提取每个 person 的 name 属性
const example = source.pipe(map(({ name }) => name));
// 输出: "Joe","Frank","Ryan"
const subscribe = example.subscribe(val => console.log(val));

mapTo

函数签名: mapTo(value: any): Observable
将每个发出值映射成常量。

1
2
3
4
5
6
7
8
9
10
// 间隔2秒输出固定字符串
import { interval } from 'rxjs';
import { mapTo } from 'rxjs/operators';

// 每2秒发出值
const source = interval(2000);
// 将所有发出值映射成同一个值
const example = source.pipe(mapTo('HELLO WORLD!'));
// 输出: 'HELLO WORLD!'...'HELLO WORLD!'...'HELLO WORLD!'...
const subscribe = example.subscribe(val => console.log(val));
1
2
3
4
5
6
7
8
9
10
// 将点击变成文字输入
import { fromEvent } from 'rxjs';
import { mapTo } from 'rxjs/operators';

// 发出每个页面点击
const source = fromEvent(document, 'click');
// 将所有发出值映射成同一个值
const example = source.pipe(mapTo('GOODBYE WORLD!'));
// 输出: (click)'GOODBYE WORLD!'...
const subscribe = example.subscribe(val => console.log(val));

merge

函数签名: merge(input: Observable): Observable
将多个 observables 转换成单个 observable
此操作符可以既有静态方法,又有实例方法!
如果产生值的顺序是首要考虑的,那么试试用 concat 来代替!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 合并多个observables
import { mapTo } from 'rxjs/operators';
import { interval, merge } from 'rxjs';

// 每2.5秒发出值
const first = interval(2500);
// 每2秒发出值
const second = interval(2000);
// 每1.5秒发出值
const third = interval(1500);
// 每1秒发出值
const fourth = interval(1000);

// 从一个 observable 中发出输出值
const example = merge(
first.pipe(mapTo('FIRST!')),
second.pipe(mapTo('SECOND!')),
third.pipe(mapTo('THIRD')),
fourth.pipe(mapTo('FOURTH'))
);
// 输出: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH"
const subscribe = example.subscribe(val => console.log(val));
1
2
3
4
5
6
7
8
9
10
11
12
// 未实现
import { merge } from 'rxjs/operators';
import { interval } from 'rxjs';

// 每2.5秒发出值
const first = interval(2500);
// 每1秒发出值
const second = interval(1000);
// 作为实例方法使用
const example = first.pipe(merge(second));
// 输出: 0,1,0,2....
const subscribe = example.subscribe(val => console.log(val));

  我只是记录生活,欢迎来到我的博客!
  如需转载,请咨询作者同意后,标明作者以及出处,谢谢!