操作符学习
已学习
D
distinctUntilChanged
函数签名: distinctUntilChanged(compare: function): Observable
:bulb: distinctUntilChanged 默认使用 === 进行比较, 对象引用必须匹配!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| // 去除相邻重复 import { from } from 'rxjs'; import { distinctUntilChanged } from 'rxjs/operators';
// 基于最新发出的值进行比较,只输出不同的值 const myArrayWithDuplicatesInARow = from([1, 1, 2, 2, 3, 1, 2, 3]);
const distinctSub = myArrayWithDuplicatesInARow .pipe(distinctUntilChanged()) // 输出: 1,2,3,1,2,3 .subscribe(val => console.log('DISTINCT SUB:', val));
const nonDistinctSub = myArrayWithDuplicatesInARow // 输出 : 1,1,2,2,3,1,2,3 .subscribe(val => console.log('NON DISTINCT SUB:', val));
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| // 同一引用去重 import { from } from 'rxjs'; import { distinctUntilChanged } from 'rxjs/operators';
const sampleObject = { name: 'Test' }; // 对象必须有相同的引用 const myArrayWithDuplicateObjects = from([ sampleObject, sampleObject, sampleObject ]); // 基于最新发出的值进行比较,只输出不同的对象 const nonDistinctObjects = myArrayWithDuplicateObjects .pipe(distinctUntilChanged()) // 输出: 'DISTINCT OBJECTS: {name: 'Test'} .subscribe(val => console.log('DISTINCT OBJECTS:', val));
|
S
scan
函数签名: scan(accumulator: function, seed: any): Observable
随着时间的推移进行归并。
:bulb: 此操作符是许多基于 Redux 实现的 RxJS 的核心!
1 2 3 4 5 6 7 8 9 10
| // 基于种子0累加 import { of } from 'rxjs'; import { scan } from 'rxjs/operators';
const source = of(1, 2, 3); // 基础的 scan 示例,从0开始,随着时间的推移计算总数 const example = source.pipe(scan((acc, curr) => acc + curr, 0)); // 输出累加值 // 输出: 1,3,6 const subscribe = example.subscribe(val => console.log(val));
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| // 对象累加 import { Subject } from 'rxjs'; import { scan } from 'rxjs/operators';
const subject = new Subject(); // scan 示例,随着时间的推移构建对象 const example = subject.pipe( scan((acc, curr) => Object.assign({}, acc, curr), {}) ); // 输出累加值 const subscribe = example.subscribe(val => console.log('Accumulated object:', val) ); // subject 发出的值会被添加成对象的属性 // {name: 'Joe'} subject.next({ name: 'Joe' }); // {name: 'Joe', age: 30} subject.next({ age: 30 }); // {name: 'Joe', age: 30, favoriteLanguage: 'JavaScript'} subject.next({ favoriteLanguage: 'JavaScript' });
|
1 2 3 4 5 6 7 8 9 10 11 12
| // 随机发出累加的数值 import { interval } from 'rxjs'; import { scan, map, distinctUntilChanged } from 'rxjs/operators';
// 累加数组中的值,并随机发出此数组中的值 const scanObs = interval(1000) .pipe( scan((a, c) => [...a, c], []), map(r => r[Math.floor(Math.random() * r.length)]), distinctUntilChanged() ) .subscribe(console.log);
|
startWith
函数签名: startWith(an: Values): Observable
发出给定的第一个值(在当前流头部插入数据)
- 用来初始化值,然后后续值由Observable输入
1 2 3 4 5 6 7 8 9 10
| // RxJS v6+ import { startWith } from 'rxjs/operators'; import { of } from 'rxjs';
// 发出 (1,2,3) const source = of(1, 2, 3); // 从0开始 const example = source.pipe(startWith(0)); // 输出: 0,1,2,3 const subscribe = example.subscribe(val => console.log(val));
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| // 与scan 结合使用拼接 import { startWith, scan } from 'rxjs/operators'; import { of } from 'rxjs';
// 发出 ('World!', 'Goodbye', 'World!') const source = of('World!', 'Goodbye', 'World!'); // 以 'Hello' 开头,后面接当前字符串 const example = source.pipe( startWith('Hello'), scan((acc, curr) => `${acc} ${curr}`) ); /* 输出: "Hello" "Hello World!" "Hello World! Goodbye" "Hello World! Goodbye World!" */ const subscribe = example.subscribe(val => console.log(val));
|
switchMap
函数签名: switchMap(project: function: Observable, resultSelector: function(outerValue, innerValue, outerIndex, innerIndex): any): Observable
映射成 observable,完成前一个内部 observable,发出值。
如果你想要维护多个内部 subscription 的话, 请尝试 mergeMap!
此操作符通常被认为是 mergeMap 的安全版本!
此操作符可以取消正在进行中的网络请求!
为什么使用 switchMap?
switchMap 和其他打平操作符的主要区别是它具有取消效果。在每次发出时,会取消前一个内部 observable (你所提供函数的结果) 的订阅,然后订阅一个新的 observable 。你可以通过短语切换成一个新的 observable来记忆它。
它能在像 typeaheads 这样的场景下完美使用,当有新的输入时便不再关心之前请求的响应结果。在内部 observable 长期存活可能会导致内存泄露的情况下,这也是一种安全的选择,例如,如果你使用 mergeMap 和 interval,并忘记正确处理内部订阅。记住,switchMap 同一时间只维护一个内部订阅,在示例 1中可以清楚出看到这一点。
不过要小心,在每个请求都需要完成的情况下,考虑写数据库,你可能要避免使用 switchMap 。如果源 observable 发出速度足够快的话,switchMap 可以取消请求。在这些场景中,mergeMap 是正确的选择。
1 2 3 4 5 6 7 8 9 10
| // 每5秒重新启动 interval import { timer, interval } from 'rxjs'; import { switchMap } from 'rxjs/operators';
// 立即发出值, 然后每5秒发出值 const source = timer(0, 5000); // 当 source 发出值时切换到新的内部 observable,发出新的内部 observable 所发出的值 const example = source.pipe(switchMap(() => interval(500))); // 输出: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8 const subscribe = example.subscribe(val => console.log(val));
|
使用timer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| // 倒计时控制器 import { interval, fromEvent, merge, empty } from 'rxjs'; import { switchMap, scan, takeWhile, startWith, mapTo } from 'rxjs/operators';
const countdownSeconds = 10; const setHTML = id => val => (document.getElementById(id).innerHTML = val); const pauseButton = document.getElementById('pause'); const resumeButton = document.getElementById('resume'); const interval$ = interval(1000).pipe(mapTo(-1));
const pause$ = fromEvent(pauseButton, 'click').pipe(mapTo(false)); const resume$ = fromEvent(resumeButton, 'click').pipe(mapTo(true));
const timer$ = merge(pause$, resume$) .pipe( startWith(true), switchMap(val => (val ? interval$ : empty())), scan((acc, curr) => (curr ? curr + acc : acc), countdownSeconds), takeWhile(v => v >= 0) ) .subscribe(setHTML('remaining'));
|
T
timer
函数签名: timer(initialDelay: number | Date, period: number, scheduler: Scheduler): Observable
给定持续时间后,再按照指定间隔时间依次发出数字。从0开始
1 2 3 4 5 6 7
| // 当timer参数值时间结束后发出值 import { timer } from 'rxjs';
// 1秒后发出0,然后结束,因为没有提供第二个参数 const source = timer(1000); // 输出: 0 const subscribe = source.subscribe(val => console.log(val));
|
1 2 3 4 5 6 7 8 9 10
| // 1秒后,间隔2秒发出值 import { timer } from 'rxjs';
/* timer 接收第二个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第一个值, 然后每2秒发出序列值 */ const source = timer(1000, 2000); // 输出: 0,1,2,3,4,5...... const subscribe = source.subscribe(val => console.log(val));
|
1 2 3 4 5 6 7 8 9 10 11 12
| // 每次点击时重置 import { interval, fromEvent } from 'rxjs'; import { switchMap, mapTo } from 'rxjs/operators';
// 发出每次点击 const source = fromEvent(document, 'click'); // 如果3秒内发生了另一次点击,则消息不会被发出 const example = source.pipe( switchMap(val => interval(3000).pipe(mapTo('Hello, I made it!'))) ); // (点击)...3s...'Hello I made it!'...(点击)...2s(点击)... const subscribe = example.subscribe(val => console.log(val));
|
M
map
函数签名: map(project: Function, thisArg: any): Observable
对源 observable 的每个值应用投射函数。
进多少,出多少
如果在处理过程中没有return 则返回undefined
1 2 3 4 5 6 7 8 9 10
| // 每个值+10 import { from } from 'rxjs'; import { map } from 'rxjs/operators';
// 发出 (1,2,3,4,5) const source = from([1, 2, 3, 4, 5]); // 每个数字加10 const example = source.pipe(map(val => val + 10)); // 输出: 11,12,13,14,15 const subscribe = example.subscribe(val => console.log(val));
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| // 从对象中映射单一属性 import { from } from 'rxjs'; import { map } from 'rxjs/operators';
// 发出 ({name: 'Joe', age: 30}, {name: 'Frank', age: 20},{name: 'Ryan', age: 50}) const source = from([ { name: 'Joe', age: 30 }, { name: 'Frank', age: 20 }, { name: 'Ryan', age: 50 } ]); // 提取每个 person 的 name 属性 const example = source.pipe(map(({ name }) => name)); // 输出: "Joe","Frank","Ryan" const subscribe = example.subscribe(val => console.log(val));
|
mapTo
函数签名: mapTo(value: any): Observable
将每个发出值映射成常量。
1 2 3 4 5 6 7 8 9 10
| // 间隔2秒输出固定字符串 import { interval } from 'rxjs'; import { mapTo } from 'rxjs/operators';
// 每2秒发出值 const source = interval(2000); // 将所有发出值映射成同一个值 const example = source.pipe(mapTo('HELLO WORLD!')); // 输出: 'HELLO WORLD!'...'HELLO WORLD!'...'HELLO WORLD!'... const subscribe = example.subscribe(val => console.log(val));
|
1 2 3 4 5 6 7 8 9 10
| // 将点击变成文字输入 import { fromEvent } from 'rxjs'; import { mapTo } from 'rxjs/operators';
// 发出每个页面点击 const source = fromEvent(document, 'click'); // 将所有发出值映射成同一个值 const example = source.pipe(mapTo('GOODBYE WORLD!')); // 输出: (click)'GOODBYE WORLD!'... const subscribe = example.subscribe(val => console.log(val));
|
merge
函数签名: merge(input: Observable): Observable
将多个 observables 转换成单个 observable
此操作符可以既有静态方法,又有实例方法!
如果产生值的顺序是首要考虑的,那么试试用 concat 来代替!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| // 合并多个observables import { mapTo } from 'rxjs/operators'; import { interval, merge } from 'rxjs';
// 每2.5秒发出值 const first = interval(2500); // 每2秒发出值 const second = interval(2000); // 每1.5秒发出值 const third = interval(1500); // 每1秒发出值 const fourth = interval(1000);
// 从一个 observable 中发出输出值 const example = merge( first.pipe(mapTo('FIRST!')), second.pipe(mapTo('SECOND!')), third.pipe(mapTo('THIRD')), fourth.pipe(mapTo('FOURTH')) ); // 输出: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH" const subscribe = example.subscribe(val => console.log(val));
|
1 2 3 4 5 6 7 8 9 10 11 12
| // 未实现 import { merge } from 'rxjs/operators'; import { interval } from 'rxjs';
// 每2.5秒发出值 const first = interval(2500); // 每1秒发出值 const second = interval(1000); // 作为实例方法使用 const example = first.pipe(merge(second)); // 输出: 0,1,0,2.... const subscribe = example.subscribe(val => console.log(val));
|
我只是记录生活,欢迎来到我的博客!
如需转载,请咨询作者同意后,标明作者以及出处,谢谢!