layout: post title: 'RxJS基本单子词典' date: 2017-03-13 17:44:19 +0800 image: 'blog-author.jpg' description: '澄清Subject、Observer、Observerable等单子' main-class: 'frontend' color: '#66CCFF' tags:
本文将介绍 Rx 中的 Subject、Observer、Observerable 等单子在生产中的可能用法,而不介绍amb 算子这样的态射,因为介绍态射的文章明显更多。
为方便了解,我提供了一些特性示例,放入一个 js 文件即可执行。在执行之前需要在文件夹里 npm i rxjs bluebird babel-core babel-polyfill babel-preset-stage-0
,然后加一个 .babelrc
内书:
{
"presets": ["stage-0"]
}
然后以 node -r "./node_modules/babel-core/register" -r "./node_modules/babel-polyfill/lib/index.js"
运行。
这么一想,新手想尝试性运行现代 JS 居然还蛮麻烦的嘛?
单子指的是 Promise、Array、Observerable 这样的容器,可以把 number、string、Object 这样的类型提升到新的范畴里。在 TypeScript 里我们所写的尖括号就描述了这一提升:
const aaa: Promise<number>;
const bbb: Array<string>;
const ccc: Observerable<Request>;
态射指的是 map、reduce、filter 这样的函数。我们知道 aaa.map(item => item.someProp)
返回的还是一个 Promise,只是里面装的东西可能不一样了(可能装着一个 error),同样地 ccc.map(item => item.someProp)
返回的还是一个 Observerable,这种返回值的一致性让我们可以组合使用函数,特别是组合使用 lodash
提供的那些函数。
RxJS 说它是 Observerable 的 lodash,指的就是它提供了各种各样的态射,对 Observerable 使用这些函数依然返回 Observerable。
众所周知,Promise 会在 resolve 后将内部的值吐出来,它可以吐出一个值,比如一个 number、一个 string、一个 Object。虽然通过 Object 我们可以一次性解构出好多个值,但这时候 Object 内的几个值是同步地吐出来的,也就是说这个 Promise 后面的 then 只会被调用一次。
AsyncSubject 也差不多,你看:
// 特性示例
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
// 这是把三个值装进了容器里
subject.next(0);
subject.next(1);
subject.next(2);
// complete 就像我们从 Promise 的里面控制它 resolve 了一样
subject.complete();
// subscribe 就像我们对 Promise 使用 .then 一样
subject.subscribe(value => console.log(value));
// 可以拿到最后一个「resolve」的值:
// 2
Observable 乃可观察对象,用 create 函数创建。它是一个流,类似于可以异步地 resolve 多个值的 Promise。向流里传值的工作用创建时传给你的 observer 对象来做。
Observer 乃对可观察对象的观察者,可以是一个很简单的、只包含三个键值对的对象,把它传给 subscribe 函数后可以用它观察到 Observable 流中的值。
// 特性示例
import Promise from 'bluebird';
import { Observable } from 'rxjs';
const observable = Observable.create(async observer => {
// 向流里传值的工作用传给你的 observer 对象来做
// 此处我们同步地传两个值
observer.next(1);
observer.next(2);
// 再异步地传一个值
await Promise.delay(1000);
observer.next(3);
observer.complete('complete!');
// Error 与 complete 都类似于终结符(EOF、终止密码子),用了 complete 就不能再接收到 observer.error('error!');
observer.error('error!');
// 此处可以 return 一个函数,它会在 Observer unsubscribe 的时候被调用
// return () => console.log('unsubscribed!');
});
console.log('before1');
// 订阅以得到流中的值,自动创建 Observer
// 注意这边第一个函数其实就是上面 Observable.create(async (observer) => { 中的 ovserver
observable.subscribe(
nextValue => console.log(`First function get a value: ${nextValue}`),
error => console.error(`Second function get an error: ${error}`),
notification => console.log(`Observer got a complete notification, but undefined is passed in: ${notification}`)
);
console.log('after1');
// 手动创建一个 Observer,一个只包含三个键值对的对象,并用它来订阅流
console.log('before2');
observable.subscribe({
// 注意这边这个 next 其实就是上面 Observable.create(async (observer) => { 中的 ovserver.next
next: nextValue => console.log(`First function get a value: ${nextValue}`),
error: error => console.error(`Second function get an error: ${error}`),
complete: notification =>
console.log(`Observer got a complete notification, but undefined is passed in: ${notification}`),
});
console.log('after2');
// before1
// First function get a value: 1
// First function get a value: 2
// after1
// before2
// First function get a value: 1
// First function get a value: 2
// after2
// First function get a value: 3
// Observer got a complete notification, but undefined is passed in: undefined
// First function get a value: 3
// Observer got a complete notification, but undefined is passed in: undefined
Cold Observable 就是在 create 等创建函数里搞一个数据源,这样每个 Observer 都会有自己的专属数据源:
const source = new Observable(observer => {
const socket = new WebSocket('ws://someurl'); // 专属数据源
socket.addEventListener(
'message',
e => observer.next(e) // 冷热 Observable 在这是一样的
);
return () => socket.close();
});
Cold Observable 每多用一次就会多用一个 socket 连接,需要更多计算资源,但你可以 return () => socket.close()
来单独在 unsubscribe 的时候关闭 socket。
Hot Observable 则是吃热的大锅饭,让每个 Observer 用同一个数据源:
const socket = new WebSocket('ws://someurl'); // 大锅饭数据源
const source = new Observable(observer => {
socket.addEventListener(
'message',
e => observer.next(e) // 冷热 Observable 在这是一样的
);
});
这种共用节省了资源,但是你不方便在 unsubscribe 的时候关闭 socket,因为你不知道还有没有 Observer 在用这个 socket 呢。可以用后面说的 ConnectableObservable 来折衷冷热。
用 Express 将传入的 HTTP request 转化为流:
// 服务端用法
import { Observable } from 'rxjs';
const observable = Observable.create(observer => {
expressApp.get('/api/v1/xxxxxx/:id', (req, res) => {
try {
const id: number = req.params.id;
const { limit }: NotificationQuery = req.query;
// 搞个流出来
observer.next({ id, limit });
res.send({ code: 0, data: result });
} catch (error) {
observer.error('error!');
res.send({ code: -1, message: error.toString() });
}
});
});
Subject 是一种中间节点,可以观察别的 Observerable,可以被别的 Observer 观察。
用 subject.subscribe(某个Observer)
就可以被某个 Observer 视奸,用 subject.next(1)
就可以向视奸它的 Observer 们暴露一根 1 。可攻可受,煞是方便。
// 特性示例
import { Subject } from 'rxjs';
const subject = new Subject();
console.log('before1');
// 先订阅这个流
subject.subscribe(
nextValue => console.log(`First function1 get a value: ${nextValue}`),
error => console.error(`Second function1 get an error: ${error}`),
notification => console.log(`Observer1 got a complete notification, but undefined is passed in: ${notification}`)
);
console.log('after1');
// 然后可以往流里塞值
subject.next(1);
// 可以将 next 函数传作它用,但一定要记得 bind this,不然会爆 Cannot read property 'closed' of undefined
const putValue = subject.next;
subject::putValue(2);
subject.complete('asdf');
subject.error('error1');
subject.complete('asdf');
subject.error('error2');
// Subject 默认不会缓存之前发送的值,所以来晚的 Observer 就啥也看不到了
// 但来晚的 Observer 会收到离自己最近的终结符,也就是收到 error 或 complete,此处是 error2,而且似乎 error 有比 complete 更高的优先级
console.log('before2');
subject.subscribe({
next: nextValue => console.log(`First function2 get a value: ${nextValue}`),
error: error => console.error(`Second function2 get an error: ${error}`),
complete: notification =>
console.log(`Observer2 got a complete notification, but undefined is passed in: ${notification}`),
});
console.log('after2');
// before1
// after1
// First function1 get a value: 1
// First function1 get a value: 2
// Observer1 got a complete notification, but undefined is passed in: undefined
// before2
// Second function2 get an error: error2
// after2
这种只把在订阅之后的数据发射给观察者,而没有记忆性的 Subject ,在 RxJava 里叫 PublishSubject。
BehaviorSubject 会有态度地为你预留一个值,任何来晚的 Observer 都可以拿到至少一个值。
// 特性示例
import { BehaviorSubject } from 'rxjs';
// 因为 BehaviorSubject 的特性是预留一个值,所以你可以在初始化时传一个值
const subject = new BehaviorSubject(0);
console.log('before1');
// 先订阅这个流
subject.subscribe({
next: nextValue => console.log(`First function1 get a value: ${nextValue}`),
complete: notification => console.log('Observer1 got a complete notification'),
});
console.log('after1');
subject.next(1);
// 关于 complete 和 error 的特性和 Subject 一样,但是这些终结符会挤掉之前预留的值
// 所以下面第二个 Observer 拿不到 next(1) 传入的 1
subject.complete(2);
console.log('before2');
subject.subscribe({
next: nextValue => console.log(`First function2 get a value: ${nextValue}`),
complete: notification => console.log('Observer2 got a complete notification'),
});
console.log('after2');
// before1
// First function1 get a value: 0
// after1
// First function1 get a value: 1
// Observer1 got a complete notification
// before2
// Observer2 got a complete notification
// after2
Replay 顾名思义就是重放它曾经拥有的所有值。
注意这时传给它的第一个参数是「记忆数量」,设为 6 就表示只记忆 6 个值,和人类差不多。不传这个值似乎就可以无限记忆了。传 0 和传 1 都是只记住一个,和 BehaviorSubject 效果相同。
第二个参数是「记忆时间」,传 0 表示记忆时间为 0 ms……
// 特性示例
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(0, 0);
subject.next(0);
subject.next(1);
subject.next(2);
subject.subscribe({
next: nextValue => console.log(`First function get a value: ${nextValue}`),
complete: notification => console.log('Observer got a complete notification'),
});
subject.next(3);
// 记忆力特别差,只能记住订阅前 0 ms 内的值,且只能记住一个
// First function get a value: 2
// First function get a value: 3
有时我们想让一个流变成多个一样的流,用 Cypher 来写就是 (newStream1)<-[:Clone]-(stream)-[:Clone]->(newStream2)
,有至少三种直观的方法:
前两种使用了 ConnectableObservable。它可以作为中转节点,让多个观察者同时收到数据源推送的消息,类似于 addEventListener,
这种方法主要的好处是 subject 也可以用 next 发送值。
// 特性示例
import { Subject, Observable } from 'rxjs';
// 先创建一个可观察对象,作为数据源
const observable = Observable.from([1, 2, 3]);
// 然后我们用一个 Subject 来作为中转节点,此时不用 subscribe,而是用 multicast
const subject = new Subject();
const multicasted = observable.multicast(subject);
multicasted.next(123);
// 然后中转节点可以被多个节点关注
multicasted.subscribe({
next: value => console.log(`observerA: ${value}`),
});
multicasted.subscribe({
next: value => console.log(`observerB: ${value}`),
});
// multicast - connect 是一对
multicasted.connect();
// 在 connect 之前 subscribe 到 subject 的 Observer 可以拿到所有的值
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
这种方法主要的好处是书写简洁,直接订阅了事。
// 特性示例
import { Observable } from 'rxjs';
// 先创建一个可观察对象,作为数据源
const observable = Observable.from([1, 2, 3]);
const published = observable.publish();
published.subscribe(value => console.log(`observerA: ${value}`));
published.subscribe(value => console.log(`observerB: ${value}`));
const connection = published.connect();
// 在 connect 之前 subscribe 到 observable 的 Observer 可以拿到所有的值
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
这种用法的特点是流不共享执行环境。
// 特性示例
import { Observable } from 'rxjs';
// 先创建一个可观察对象,作为数据源
const observable = Observable.from([1, 2, 3]);
const stream1 = observable.map(i => i);
const stream2 = observable.map(i => i);
stream1.subscribe(value => console.log(`observerA: ${value}`));
stream2.subscribe(value => console.log(`observerB: ${value}`));
observable.subscribe();
// 在 connect 之前 subscribe 到 observable 的 Observer 可以拿到所有的值
// observerA: 1
// observerA: 2
// observerA: 3
// observerB: 1
// observerB: 2
// observerB: 3