Rxjava 合并和连接操作符
RxJava 的合并操作符主要包括如下内容:
combineLatest,当多个 Observable 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。
join,该操作符只要在另一个 Observable 发射的数据定义的时间窗口内,这个 Observable 发射了一条数据,就结合两个 Observable 发射的数据。即一个 Observable 的每个数据跟基座的 Observable 的所有数据都结合一次,是一对多的关系。先发射数据的 Observable 为基座。
merge,将多个 Observable 的输出合并,就好像它们是一个单个的 Observable 一样,合并的 Observable 发射的数据可能会交错(顺序发生变化),有一个类似的操作符 Concat 不会让数据交错,能够保持数据的顺序性。在此过程中任何一个原始 Observable 的 onError 通知都会被立即传递给观察者,而且会终止合并后的 Observable 。
mergeDelayError,跟 merge 同样的作用,但是它会保留 onError 通知直到合并后的 Observable 所有的数据发射完成,在那时它才会把 onError 传递给观察者。
zip,使用一个函数按顺序结合多个 Observable 发射的数据项,然后发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个 Observable 一样多的数据,假如两个 Observable 数据分布为 4 项,5 项,则最终合并是 4 项。
startWith/concatWith,startWith 在发射数据之前先发射一个指定的数据序列 (头添加),最后发射的数据永远排在最前面。concatWith,在发射数据后面发射一个指定的数据序列 (尾添加)。
RxJava 的连接操作符主要包括如下内容:
ConnectableObservable.connect(),指示一个可连接的 Observable 开始发射数据。
Observable.publish(),将一个 Observable 转换为一个可连接的 Observable 。
Observable.replay,确保所有的订阅者收到相同的数据序列,即使它们在 Observable 开始发射数据之后才订阅。
合并操作符示例代码
这里如果使用的是 Observer ,则默认为如下实现:
mObserver = new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
}
};
combineLatest 示例
/**
* 当多个 Observable 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据
*
* 结果如下;
*
* 收到消息==observableA:1=====observableB:14=====observableC:104===== == 消息线程为:RxComputationThreadPool-1
* 收到消息==observableA:2=====observableB:14=====observableC:104===== == 消息线程为:RxComputationThreadPool-1
* 收到消息==observableA:3=====observableB:14=====observableC:104===== == 消息线程为:RxComputationThreadPool-1
* 收到消息==observableA:4=====observableB:14=====observableC:104===== == 消息线程为:RxComputationThreadPool-1
* 完成 == 完成线程为:RxComputationThreadPool-1
*/
private void testCombineLatest() {
Observable<Integer> observableA = Observable.range(1, 4).delay(1, TimeUnit.SECONDS);
Observable<Integer> observableB = Observable.range(10, 5);
Observable<Integer> observableC = Observable.range(100, 5);
Observable.combineLatest(observableA, observableB, observableC, new Function3<Integer, Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2, Integer integer3) throws Exception {
return "observableA:" + integer + "=====" + "observableB:" + integer2 + "=====" + "observableC:" + integer3 + "=====";
}
}).subscribe(mObserver);
}
join 示例
/**
* 该操作符只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据
* 即一个 Observable 的每个数据跟基座的 Observable 的所有数据都结合一次,是一对多的关系。先发射数据的 Observable 为基座。
*
* 结果如下;
*
* observableB==7 == 消息线程为:main
* observableB==8 == 消息线程为:main
* observableA==1 == 消息线程为:RxComputationThreadPool-1
* observableA ==1observableB ==7== 消息线程为:RxComputationThreadPool-1
* 收到消息==8 == 消息线程为:RxComputationThreadPool-1-1
* observableA ==1observableB ==8== 消息线程为:RxComputationThreadPool-1
* 收到消息==9 == 消息线程为:RxComputationThreadPool-1-1
* observableA==2 == 消息线程为:RxComputationThreadPool-1
* observableA ==2observableB ==7== 消息线程为:RxComputationThreadPool-1-1
* 收到消息==9 == 消息线程为:RxComputationThreadPool-1
* observableA ==2observableB ==8== 消息线程为:RxComputationThreadPool-1
* 收到消息==10 == 消息线程为:RxComputationThreadPool-1-1
* 完成 == 完成线程为:RxComputationThreadPool-1
*
*/
private void testJoin() {
Observable<Integer> observableA = Observable.range(1, 2).delay(1, TimeUnit.SECONDS);
Observable<Integer> observableB = Observable.range(7, 2);
observableA.join(observableB, new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
Log.d(TAG, "observableA==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
return Observable.just(integer).delay(1, TimeUnit.SECONDS);
}
}, new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
Log.d(TAG, "observableB==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
return Observable.just(integer).delay(1, TimeUnit.SECONDS);
}
}, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG, "observableA ==" + integer + "observableB ==" + integer2 + "== 消息线程为:" + Thread.currentThread().getName());
return integer + integer2;
}
}).subscribe(mObserver);
}
merge 示例
/**
* 将多个Observables的输出合并,就好像它们是一个单个的Observable一样,合并的Observable发射的数据可能会交错(顺序发生变化),
* 有一个类似的操作符 Concat 不会让数据交错,能够保持数据的顺序性。
* 在此过程中任何一个原始Observable的onError通知都会被立即传递给观察者,而且会终止合并后的Observable。
*
* 结果如下;
*
* 收到消息==1 == 消息线程为:main
* 收到消息==2 == 消息线程为:main
* 收到消息==5 == 消息线程为:main
* 收到消息==6 == 消息线程为:main
* 完成 == 完成线程为:main
*/
private void testMerge() {
Observable<Integer> observableA = Observable.range(1, 2);
Observable<Integer> observableB = Observable.range(5, 2);
Observable.merge(observableA, observableB).subscribe(mObserver);
}
mergeDelayError 示例
/**
* 将多个Observables的输出合并,就好像它们是一个单个的Observable一样,合并的Observable发射的数据可能会交错(顺序发生变化),
* 有一个类似的操作符 concatDelayError 不会让数据交错,能够保持数据的顺序性。
* 但是它会保留 onError 通知直到合并后的 Observable 所有的数据发射完成,在那时它才会把 onError 传递给观察者。
*
* 结果如下;
*
* 收到消息==100 == 消息线程为:main
* 收到消息==0 == 消息线程为:main
* 收到消息==1 == 消息线程为:main
* 收到消息==2 == 消息线程为:main
* 错误:error == 错误线程为:main
*/
private void testMergeDelayError() {
Observable<Integer> observableA = Observable.range(1, 2);
Observable<Integer> observableB = Observable.range(5, 2);
Observable.merge(observableA, observableB).subscribe(mObserver);
}
zip 示例
/**
* 使用一个函数按顺序结合多个 Observable 发射的数据项,然后发射这个函数返回的结果。
* 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据,假如两个Observable数据分布为4项,5项,则最终合并是4项
*
* 结果如下;
*
* 收到消息==observableA: 1 ==== observableB: 7 == 消息线程为:main
* 收到消息==observableA: 2 ==== observableB: 8 == 消息线程为:main
*/
private void testZip() {
Observable<Integer> observableA = Observable.range(1, 2); // 1 2
Observable<Integer> observableB = Observable.range(7, 3); // 7 8 9
Observable.zip(observableA, observableB, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return "observableA: " + integer + " ==== " + "observableB: " + integer2;
}
}).subscribe(mObserver);
}
startWith 示例
/**
* startWith 在发射数据之前先发射一个指定的数据序列 (头添加),最后发射的数据永远排在最前面
* concatWith 在发射数据后面发射一个指定的数据序列 (尾添加)
*
* 结果如下;
*
* 收到消息==15 == 消息线程为:main
* 收到消息==500 == 消息线程为:main
* 收到消息==100 == 消息线程为:main
* 收到消息==200 == 消息线程为:main
* 收到消息==0 == 消息线程为:main
* 收到消息==1 == 消息线程为:main
* 收到消息==2 == 消息线程为:main
* 完成 == 完成线程为:main
*/
private void testStartWith() {
Observable.range(0, 3).startWithArray(100, 200).startWith(500)
.startWith(Observable.range(15, 1)).subscribe(mObserver);
}
ConnectableObservable
ConnectableObservable
继承自 Observable
,然而它并不是在调用 subscribe()
的时候发射数据,而是只有对其使用 connect
操作符它才会发射数据,所以可以用来更灵活地控制数据发射的时机,另外,ConnectableObservable
是 Hot Observable
。
publish
/**
* 发射的数据是共享的 ,默认在 Computation 调度器上
*
* 结果如下;
*
* 第1个订阅:0->time:08:04:22 == 消息线程为:main
* 第1个订阅:1->time:08:04:22 == 消息线程为:main
* 第1个订阅:2->time:08:04:23 == 消息线程为:main
* 第1个订阅:3->time:08:04:24 == 消息线程为:main
* 第2个订阅:3->time:08:04:24 == 消息线程为:RxComputationThreadPool-1
* 第1个订阅:4->time:08:04:25 == 消息线程为:main
* 第2个订阅:4->time:08:04:25 == 消息线程为:RxComputationThreadPool-1
* 第1个订阅:5->time:08:04:26 == 消息线程为:main
* 第2个订阅:5->time:08:04:26 == 消息线程为:RxComputationThreadPool-1
*
*/
private void testPublish() {
final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss", Locale.CANADA);
Observable<Long> oldObservable = Observable.interval(1, TimeUnit.SECONDS).take(6);
// 转换成 ConnectableObservable
ConnectableObservable<Long> connectableObservable = oldObservable.publish();
// 可以在 subscribe 之前先调用 connect ,等到 subscribe 之后就会马上发射数据了
connectableObservable.connect();
Disposable disposable = connectableObservable.subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "第1个订阅:" + aLong + "->time:" + format.format(new Date()) + " == 消息线程为:" + Thread.currentThread().getName());
}
});
// 延迟3秒再订阅
connectableObservable.delaySubscription(3, TimeUnit.SECONDS).doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "解除订阅了" + " == 解除订阅的线程为:" + Thread.currentThread().getName());
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
addDispose(d);
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "第2个订阅:" + aLong + "->time:" + format.format(new Date()) + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// 调用 connect 才会发射数据,subscribe 是不会发射数据的,可以更加灵活的控制发射时机,ConnectableObservable 是 Hot Observable
// connectableObservable.connect();
addDispose(disposable);
}
replay
/**
* 跟 publish 类似,也是转换成 ConnectableObservable ,但是观察者无论何时开始订阅,都能收到所有的数据(变成了 Cold ?)
* <p>
* 第1个订阅:0->time:12:28:31 == 消息线程为:main
* 第1个订阅:1->time:12:28:32 == 消息线程为:main
* 第1个订阅:2->time:12:28:33 == 消息线程为:main
* 第2个订阅:0->time:12:28:33 == 消息线程为:RxComputationThreadPool-2
* 第2个订阅:1->time:12:28:33 == 消息线程为:RxComputationThreadPool-2
* 第2个订阅:2->time:12:28:33 == 消息线程为:RxComputationThreadPool-2
* 第1个订阅:3->time:12:28:34 == 消息线程为:main
* 第2个订阅:3->time:12:28:34 == 消息线程为:RxComputationThreadPool-1
* 第1个订阅:4->time:12:28:35 == 消息线程为:main
* 第2个订阅:4->time:12:28:35 == 消息线程为:RxComputationThreadPool-1
* 第2个订阅:5->time:12:28:36 == 消息线程为:RxComputationThreadPool-1
* 第1个订阅:5->time:12:28:36 == 消息线程为:main
*/
private void testReplay() {
final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss", Locale.CANADA);
Observable<Long> oldObservable = Observable.interval(1, TimeUnit.SECONDS).take(6);
// 转换成 ConnectableObservable
// 最好先限定缓存的大小,否则缓存的数据会占用很大的内存,可能会造成OOM
ConnectableObservable<Long> connectableObservable = oldObservable.replay(100);
// 可以在 subscribe 之前先调用 connect ,等到 subscribe 之后就会马上发射数据了
connectableObservable.connect();
Disposable disposable = connectableObservable.subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "第1个订阅:" + aLong + "->time:" + format.format(new Date()) + " == 消息线程为:" + Thread.currentThread().getName());
}
});
// 延迟3秒再订阅
Disposable disposable2 = connectableObservable.delaySubscription(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "第2个订阅:" + aLong + "->time:" + format.format(new Date()) + " == 消息线程为:" + Thread.currentThread().getName());
}
});
addDispose(disposable, disposable2);
}
refCount
refCount
操作符是将 ConnectableObservable
转换成普通的 Observable
,同时又保持了Hot Observable
的特征。特别注意的是,要所有的订阅者都取消订阅后,在重新订阅时数据流才会从头开始发射,如果只有部分订阅者取消,则取消的订阅者或新的订阅者再次订阅的时候,不会再头开始发射,从当前数据发射并共享。
/**
* 调用了 refCount 操作符之后,变成普通的 Observable ,不再需要调用 connect 来触发数据发射
*
* 结果如下;
*
* RefCountObservable第1个订阅:0->time:08:19:33
* RefCountObservable第1个订阅:1->time:08:19:34
* RefCountObservable第1个订阅:2->time:08:19:35
* RefCountObservable第1个订阅:3->time:08:19:36
* RefCountObservable第2个订阅:3->time:08:19:36
* RefCountObservable第1个订阅:4->time:08:19:37
* RefCountObservable第2个订阅:4->time:08:19:37
* RefCountObservable第1个订阅:5->time:08:19:38
* RefCountObservable第2个订阅:5->time:08:19:38
*/
private void testRefCount() {
final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss", Locale.CANADA);
Observable<Long> oldObservable = Observable.interval(1, TimeUnit.SECONDS).take(6);
// 转换成 ConnectableObservable
ConnectableObservable<Long> connectableObservable = oldObservable.publish();
// 转换成普通的 Observable,但是保留了 Hot 的特征,
// 要所有的订阅者都取消订阅后,在重新订阅时数据流才会从头开始发射
// 如果只有部分订阅者取消,则取消的订阅者或新的订阅者再次订阅的时候,不会再头开始发射,从当前数据发射并共享
Observable<Long> refCountObservable = connectableObservable.refCount();
Disposable refCountdisposable = refCountObservable.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "RefCountObservable第1个订阅:" + aLong + "->time:" + format.format(new Date()));
}
});
// 延迟3秒再订阅
Disposable refCountdisposable2 = refCountObservable.delaySubscription(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "RefCountObservable第2个订阅:" + aLong + "->time:" + format.format(new Date()));
}
});
addDispose(refCountdisposable, refCountdisposable2);
}