合并操作符

1. merge

可以把多个Observable 转化为一个 Observable。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// RxJS v6+
import { mapTo } from 'rxjs/operators';
import { interval, merge } from 'rxjs';

const first$ = interval(2000);
const second$ = interval(3000);

const merge$ = merge(first$.pipe(mapTo('first')), second$.pipe(mapTo('second')));
// 输出: 'fisrt', 'second', 'first', 'second', ...
merge$.subscribe(val => console.log(val));

数据流图例:

2. concat

按先前完成的顺序订阅可观察对象(可以将concat看作是ATM上的一条线路,在前一个事务完成之前,下一个事务(订阅)将无法开始!)。

1
2
3
4
5
6
7
8
9
// RxJS v6+
import { of, concat } from 'rxjs';

const first$ = of(1, 2, 3);
const second$ = of(4, 5);

const concat$ = concat(first$, second$);
// 输出: '1', '2', '3', '4', '5'
concat$.subscribe(val => console.log(val));

数据流图例:

3. combineLatest

只要有一个改变, 就会触发。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// html
// 可以尝试用 div>input#length生成
<div><input type="text" id="length"></div>
<div><input type="text" id="width"></div>
<div id="area"></div>

// RxJS v6+
import { combineLatest, fromEvent } from 'rxjs';
import { pluck, map } from 'rxjs/operators';

const length = document.getElementById('length');
const width = document.getElementById('width');
const area = document.getElementById('area');

const length$ = fromEvent(length, 'keyup').pipe(pluck('target', 'value'));
const width$ = fromEvent(width, 'keyup').pipe(pluck('target', 'value'));

const area$ = combineLatest([length$, width$]).pipe(map(data => {
  const l = data[0] as bigint;
  const w = data[1] as bigint;
  return l * w;
}));

area$.subscribe(val => area.innerHTML = String(val));

// 除了利用pluck查看元素属性值之外,也可以利用下面的方式获取值
// length$.subscribe(ev => console.log((ev.target as HTMLInputElement).value));

数据流图例:

4. zip

成对改变的时候,才会触发。

1
2
3
4
5
6
7
8
9
...

const area$ = zip(length$, width$).pipe(map(data => {
  const l = data[0] as bigint;
  const w = data[1] as bigint;
  return l * w;
}));

...

数据流图例:

5. withLatestFrom

跟combineLatest有点像,只是它有主从关系,只有在主要的 Observable送出新的值时,才会执行callback,附随的Observable只是在背景下运作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// RxJS v6+
import { of } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';

const first$ = of(1, 2, 3);
const second$ = of(4, 5);

const withLatest$ = first$.pipe(withLatestFrom(second$));
// 输出:'[1, 5]', '[2, 5]', '[3, 5]'
withLatest$.subscribe(val => console.log(val));

数据流图例:

6. startWith

在目标流的前面增加一个值

1
2
3
4
5
6
7
8
9
// RxJS v6+
import { startWith } from 'rxjs/operators';
import { of } from 'rxjs';

const target$ = of(1, 2, 3);

const newTarget$ = target$.pipe(startWith(0));
// 输出:'0', '1', '2', '3'
newTarget$.subscribe(val => console.log(val));

数据流图例: