Rxjava 辅助和错误操作符
RxJava 的辅助操作符主要包括如下内容:
delay,延迟指定的时间后再发射数据。
toList,将所有的数据最终以 List 的形式输出,返回 SingleObserver 对象。
toMap,将所有的数据最终以 Map 的形式输出(默认是HashMap),返回 SingleObserver 对象。
toMultiMap,将原始 Observable 发射的所有数据项到一个 Map ,同时还是一个 ArrayList,返回 SingleObserver 对象。
toSortedList,类似于 toList ,但是它可以对数据进行自然排序,默认是自然升序,如果发射的数据项没有实现 Comparable 接口,会抛出一个异常,返回 SingleObserver 对象。
timeOut,如果原始 Observable 过了指定的一段时间没有发射任何数据,Timeout 操作符会以一个onError 通知终止这个 Observable 或切换到一个我们指定的备用的 Observable ,默认在 computation 调度器上执行。
timestamp,给 Observable 发射的数据项附加一个时间戳 ,默认在 computation 调度器上执行。
timeInterval,将原始 Observable 发射的数据项替换为发射表示相邻数据时间间隔的对象。
doOnNext/doAfterNext,分别在 onNext 回调之前和之后会回调。
doOnError,在 onError 回调之前会先回调。
doOnEach,相当于 doOnNext,doOnError,doOnCompleted 的综合体。
doOnTerminate/doAfterTerminate,分别在 Observable 终止之前和之后会被调用,无论是正常还是异常终止。
doOnFinally,在观察者执行完 onError 、 onComplete 或取消订阅时会回调。
doOnComplete,在 onComplete 回调之前会先回调。
doOnSubscribe,当观察者订阅它生成的 Observable 就会被调用,在 onSubscribe 回调之前会先调用。
RxJava 的错操作符主要包括如下内容:
onErrorReturn/onErrorReturnItem,返回一个镜像原有 Observable 行为的新 Observable,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的 onCompleted 方法。
onErrorResumeNext,让 Observable 在遇到错误时开始发射第二个 Observable 的数据序列。
onExceptionResumeNext,跟 onErrorResumeNext 类似 ,但是,如果 onError 收到的 Throwable 不是一个 Exception ,它会将错误传递给观察者的 onError 方法,不会使用备用的 Observable 。
retry/retryWhen,不会将原始 Observable 的 onError 通知传递给观察者,它会订阅这个 Observable ,再给它一次机会无错误地完成它的数据序列。由于重新订阅,可能会造成数据项重复。默认在 trampoline 调度器上执行。
操作符示例代码
这里如果使用的是 Observer ,则默认为如下实现:
mObserver = new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
}
};
delay 示例
/**
* delay 默认在 computation 调度器上执行
*/
private void testDelay() {
//延迟一段指定的时间再发射来自Observable的发射物
Observable.range(0, 5).delay(2, TimeUnit.SECONDS).subscribe(mObserver);
// delay 不会延迟 onError 通知,它会立即将这个通知传递给订阅者,同时丢弃剩下的所有数据
Observable.error(new Throwable("测试错误")).delay(10, TimeUnit.SECONDS).subscribe(mObserver);
// 会延迟 onCompleted 通知
Observable.empty().delay(10, TimeUnit.SECONDS).subscribe(mObserver);
// 它和dealy的区别是dealy是延迟数据的发送,而此操作符是延迟数据的注册,指定延迟时间的重载方法是执行在computation调度器的。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "call 线程:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onNext(5);
}
}).delaySubscription(10, TimeUnit.SECONDS).subscribe(mObserver);
}
toList 示例
/**
* 通过toList将所有的数据最终以List的形式输出,返回 SingleObserver
*
* 结果如下;
*
* 收到消息==[0, 1, 2, 3, 4] == 消息线程为:main
*/
private void testToList() {
Observable.range(0, 5).toList().subscribe(new SingleObserver<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(List<Integer> integers) {
Log.d(TAG, "收到消息==" + integers + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
}
toMap 示例
/**
* 该操作符收集原始Observable发射的所有数据项到一个Map(默认是HashMap)
*
* 结果如下;
*
* 收到消息=={key1=1, key2=2, key0=0} == 消息线程为:main
* 收到消息=={key1=101, key2=102, key0=100} == 消息线程为:main
*/
private void testToMap() {
Observable.range(0, 3).toMap(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "key" + integer;
}
}).subscribe(new SingleObserver<Map<String, Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Map<String, Integer> stringIntegerMap) {
Log.d(TAG, "收到消息==" + stringIntegerMap + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
Observable.range(0, 3).toMap(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
// 生成 KEY
return "key" + integer;
}
}, new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
// 对值做变换
return integer + 100;
}
}).subscribe(new SingleObserver<Map<String, Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Map<String, Integer> stringIntegerMap) {
Log.d(TAG, "收到消息==" + stringIntegerMap + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
}
toMultimap 示例
/**
* 它生成的这个Map同时还是一个ArrayList
*
* 结果如下;
*
* 收到消息=={key1=[1], key2=[2], key0=[0]} == 消息线程为:main
*/
private void testMultiMap() {
Observable.range(0, 3).toMultimap(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "key" + integer;
}
}).subscribe(new SingleObserver<Map<String, Collection<Integer>>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Map<String, Collection<Integer>> stringCollectionMap) {
Log.d(TAG, "收到消息==" + stringCollectionMap + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
}
toSortedList 示例
/**
* 类似于 toList ,但是它可以对数据进行自然排序,默认是自然升序,如果发射的数据项没有实现 Comparable 接口,会抛出一个异常。
*
* 结果如下:
*
* 收到消息==[0, 1, 2, 4, 7, 9, 10] == 消息线程为:main
*/
private void testToSortedList() {
Observable.just(1, 10, 4, 7, 2, 9, 0).toSortedList().subscribe(new SingleObserver<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(List<Integer> integers) {
Log.d(TAG, "收到消息==" + integers + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
Observable.just(1, 10, 4, 7, 2, 9, 0).toSortedList(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
// 降序
if (o1 == o2) {
return 0;
} else if (o1 > o2) {
return -1;
} else {
return 1;
}
}
}).subscribe(new SingleObserver<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(List<Integer> integers) {
Log.d(TAG, "收到消息==" + integers + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
});
}
timeOut 示例
/**
* 如果原始Observable过了指定的一段时间没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable,默认在 computation 调度器上执行
*
* 结果如下;
*
* 收到消息==1 == 消息线程为:main
* 收到消息==2 == 消息线程为:main
* 收到消息==3 == 消息线程为:main
* 错误:The source did not signal an event for 250 milliseconds and has been terminated. == 错误线程为:RxComputationThreadPool-1
* <p>
* onNext 都是在 Main 线程上,报错的时候,在 computation 线程上
*/
private void testTimeout() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
Thread.sleep(100);
emitter.onNext(2);
Thread.sleep(200);
emitter.onNext(3);
Thread.sleep(300);
emitter.onNext(4);
Thread.sleep(400);
}
}).timeout(250, TimeUnit.MILLISECONDS).subscribe(mObserver);
// 在超时的时候切换到一个我们指定的备用的Observable,而不是发错误通知
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
Thread.sleep(100);
emitter.onNext(2);
Thread.sleep(200);
emitter.onNext(3);
Thread.sleep(300);
emitter.onNext(4);
Thread.sleep(400);
}
}).timeout(250, TimeUnit.MILLISECONDS, Observable.just(100, 200)).subscribe(mObserver);
Observable.just(100).delay(1, TimeUnit.SECONDS).timeout(500, TimeUnit.MILLISECONDS).subscribe(mObserver);
}
timestamp 示例
/**
* 给Observable发射的数据项附加一个时间戳 ,默认在computation调度器上执行
*
* 结果如下:
*
* onNext: value:0 ===== time2019-07-24-09:55:24
* onNext: value:1 ===== time2019-07-24-09:55:24
* onNext: value:2 ===== time2019-07-24-09:55:24
* onNext: value:3 ===== time2019-07-24-09:55:24
*/
private void testTimestamp() {
// 默认单位为毫秒 TimeUnit.MILLISECONDS
Observable.range(0, 5000).delay(1, TimeUnit.SECONDS).timestamp(TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Timed<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Timed<Integer> integerTimed) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
Log.d(TAG, "onNext: value:" + integerTimed.value() + " ===== " + "time" + sdf.format(new Date(integerTimed.time())));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
timeInterval 示例
/**
* 将原始Observable发射的数据项替换为发射表示相邻数据时间间隔的对象
*
* 结果如下:
*
* onNext: value:0 ===== time1 == 消息线程为:main
* onNext: value:1 ===== time0 == 消息线程为:main
* onNext: value:2 ===== time0 == 消息线程为:main
*
*/
private void testTimeInterval() {
// 默认单位为毫秒 TimeUnit.MILLISECONDS
Observable.range(0, 20).timeInterval(TimeUnit.MILLISECONDS)
.subscribe(new Observer<Timed<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Timed<Integer> integerTimed) {
Log.d(TAG, "onNext: value:" + integerTimed.value() + " ===== " + "time" + integerTimed.time()+ " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
doOnNext/doAfterNext 示例
/**
* doOnNext onNext回调之前会先回调
* doAfterNext onNext回调之后再回调
*
* 结果如下;
*
* 收到消息==1 == 消息线程为:main
* 收到消息==2 == 消息线程为:main
* 错误:测试错误 == 错误线程为:main
*/
private void testDoOnNext() {
Observable.just(1, 2, 3).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
if (integer > 2) {
throw new RuntimeException("测试错误");
}
}
}).subscribe(mObserver);
}
doOnError 示例
/**
* onError回调之前会先回调
*
* 结果如下:
*
* call 线程:main
* 错误:测试错误 == 错误线程为:main
*/
private void testDoOnError() {
Observable.error(new Throwable("测试错误")).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "call 线程:" + Thread.currentThread().getName());
}
}).subscribe(mObserver);
}
doOnComplete 示例
/**
* onComplete回调之前会先回调
*
* 结果如下:
*
* 收到消息==1 == 消息线程为:main
* call 线程:main
* 完成 == 完成线程为:main
*/
private void testDoOnComplete() {
Observable.just(1).doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "call 线程:" + Thread.currentThread().getName());
}
}).subscribe(mObserver);
}
doOnEach 示例
/**
* doOnEach 相当于doOnNext,doOnError,doOnCompleted 的综合体
*
* 结果如下:
*
* DO 收到消息==1 == 消息线程为:main
* 收到消息==1 == 消息线程为:main
* DO 错误:测试错误 == 错误线程为:main
* 错误:测试错误 == 错误线程为:main
*/
private void testDoOnEach() {
Observable.just(1).concatWith(Observable.<Integer>error(new Throwable("测试错误"))).doOnEach(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer o) {
// onNext回调之前,会先回调这里的
Log.d(TAG, "DO 收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
// onError回调之前,会先回调这里的
Log.d(TAG, "DO 错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
// onComplete回调之前,会先回调这里的
Log.d(TAG, "DO 完成" + " == 完成线程为:" + Thread.currentThread().getName());
}
}).subscribe(mObserver);
}
doOnTerminate/doAfterTerminate 示例
/**
* Observable终止之前会被调用,无论是正常还是异常终止
*
* 结果如下:
*
* 收到消息==1 == 消息线程为:main
* call 线程:main
* 错误:测试错误 == 错误线程为:main
*/
private void testDoOnTerminate() {
// doAfterTerminate Observable终止之后再被调用,无论是正常还是异常终止
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
// emitter.onComplete();
emitter.onError(new Throwable(“测试错误”));
}
}).doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, “call 线程:” + Thread.currentThread().getName());
}
}).subscribe(mObserver);
}
doOnFinally 示例
/**
* 在观察者执行完 onError、onComplete 或取消时执行的操作。
*
* 结果如下:
*
* 收到消息==0 == 消息线程为:RxComputationThreadPool-1
* 收到消息==1 == 消息线程为:RxComputationThreadPool-1
* call dispose:RxComputationThreadPool-1
*/
private void testDoFinally() {
disposable = Observable.interval(1, TimeUnit.SECONDS).doFinally(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "call dispose:" + Thread.currentThread().getName());
}
}).subscribe(new Consumer<Long>() {
long count = 0;
@Override
public void accept(Long aLong) throws Exception {
count++;
Log.d(TAG, "收到消息==" + aLong + " == 消息线程为:" + Thread.currentThread().getName());
if (count >= 2) {
if (!disposable.isDisposed()) {
disposable.dispose();
}
}
}
});
}
doOnFinally 和 doAfterTerminate 的调用顺序示例
离观察者越近,就越先执行
/**
* 测试 doOnFinally 和 doAfterTerminate 的调用顺序
*
* 结果如下:
*
* 收到消息==1 == 消息线程为:main
* 完成 == 完成线程为:main
* doFinally == 2
* doAfterTerminate == 2
* doFinally == 1
* doAfterTerminate == 1
*/
private void testDoFinallyAndTerminate() {
Observable.just(1).doAfterTerminate(() -> Log.d(TAG, "doAfterTerminate == 1")).doFinally(() -> Log.d(TAG, "doFinally == 1"))
.doAfterTerminate(() -> Log.d(TAG, "doAfterTerminate == 2")).doFinally(() -> Log.d(TAG, "doFinally == 2"))
.subscribe(mObserver);
}
doOnSubscribe 示例
/**
* 当观察者订阅它生成的Observable就会被调用,在 onSubscribe 回调之前会先调用
*
* 结果如下:
*
* call 线程:main
* 收到消息==1 == 消息线程为:main
* 完成 == 完成线程为:main
*/
private void testDoOnSubscribe() {
Observable.just(1).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "call 线程:" + Thread.currentThread().getName());
}
}).subscribe(mObserver);
}
onErrorReturn/onErrorReturnItem 示例
/**
* 方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的 onError 调用,
* 不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的 onCompleted 方法
*
* 结果如下:
*
* 收到消息==100 == 消息线程为:main
* 收到消息==0 == 消息线程为:main
* 完成 == 完成线程为:main
*/
private void testOnErrorReturn() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(100);
emitter.onError(new Throwable("测试错误"));
emitter.onComplete();
}
}).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return 0;
}
}).subscribe(mObserver);
// 这里的结果等价于上面的方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(100);
emitter.onError(new Throwable("测试错误"));
emitter.onComplete();
}
}).onErrorReturnItem(0).subscribe(mObserver);
}
onErrorResumeNext 示例
/**
* 让Observable在遇到错误时开始发射第二个Observable的数据序列。
*
* 结果如下:
*
* 收到消息==100 == 消息线程为:main
* 收到消息==0 == 消息线程为:main
* 完成 == 完成线程为:main
*/
private void testOnErrorResumeNext() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(100);
emitter.onError(new Throwable("测试错误"));
emitter.onComplete();
}
}).onErrorResumeNext(Observable.just(0)).subscribe(mObserver);
}
onExceptionResumeNext 示例
/**
* 跟 onErrorResumeNext 类似 ,但是,如果 onError 收到 的 Throwable 不是一个 Exception ,
* 它会将错误传递给观察者的 onError 方法,不会使用备用 的Observable
*
* 结果如下:
*
*收到消息==100 == 消息线程为:main
* 错误:测试错误 == 错误线程为:main
*/
private void testOnExceptionResumeNext() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(100);
// 这里用的不是 Exception ,所以不会用备用 的 Observable
emitter.onError(new Throwable("测试错误"));
emitter.onComplete();
}
}).onExceptionResumeNext(Observable.just(0)).subscribe(mObserver);
}
retry/retryWhen 示例
private void testRetry() {
// 无论收到多少次onError 通知,无参数版本的 retry 都会继续订阅并发射原始Observable,会一直重试,不会停止除非不报错
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(100);
Log.d(TAG, "重试线程为:" + Thread.currentThread().getName());
emitter.onError(new Throwable("测试错误"));
}
}).retry().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(mObserver);
// 接受单个 count 参数的 retry 会最多重新订阅指定的次数,如果次数超了,它不会尝试再次 订阅,它会把最新的一个 onError 通知传递给它的观察者。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(100);
Log.d(TAG, "重试线程为:" + Thread.currentThread().getName());
emitter.onError(new Throwable("测试错误"));
}
}).retry(3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(mObserver);
// retryWhen 观察它的结果再决定是不是要重新订阅原 始的Observable。如果这个Observable发射了一项数据,它就重新订阅,
// 如果这个 Observable发射的是 onError 通知,它就将这个通知传递给观察者然后终止。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "重试线程为:" + Thread.currentThread().getName());
emitter.onError(new Throwable("测试错误"));
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Observable<Throwable> throwableObservable) throws Exception {
return Observable.just(1).concatWith(new CompletableSource() {
@Override
public void subscribe(CompletableObserver co) {
co.onError(new Throwable("发身一个Error通知"));
}
});
}
}).subscribe(mObserver);
}