Rxjava辅助和错误操作符

Rxjava 辅助和错误操作符

RxJava 的辅助操作符主要包括如下内容:

  • delay,延迟指定的时间后再发射数据。

  • toList,将所有的数据最终以 List 的形式输出,返回 SingleObserver 对象。

  • toMap,将所有的数据最终以 Map 的形式输出(默认是HashMap),返回 SingleObserver 对象。

  • toMultiMap,将原始 Observable 发射的所有数据项到一个 Map ,同时还是一个 ArrayList,返回 SingleObserver 对象。

  • toSortedList,类似于 toList ,但是它可以对数据进行自然排序,默认是自然升序,如果发射的数据项没有实现 Comparable 接口,会抛出一个异常,返回 SingleObserver 对象。

  • timeOut,如果原始 Observable 过了指定的一段时间没有发射任何数据,Timeout 操作符会以一个onError 通知终止这个 Observable 或切换到一个我们指定的备用的 Observable ,默认在 computation 调度器上执行。

  • timestamp,给 Observable 发射的数据项附加一个时间戳 ,默认在 computation 调度器上执行。

  • timeInterval,将原始 Observable 发射的数据项替换为发射表示相邻数据时间间隔的对象。

  • doOnNext/doAfterNext,分别在 onNext 回调之前和之后会回调。

  • doOnError,在 onError 回调之前会先回调。

  • doOnEach,相当于 doOnNext,doOnError,doOnCompleted 的综合体。

  • doOnTerminate/doAfterTerminate,分别在 Observable 终止之前和之后会被调用,无论是正常还是异常终止

  • doOnFinally,在观察者执行完 onError 、 onComplete 或取消订阅时会回调。

  • doOnComplete,在 onComplete 回调之前会先回调。

  • doOnSubscribe,当观察者订阅它生成的 Observable 就会被调用,在 onSubscribe 回调之前会先调用。

RxJava 的错操作符主要包括如下内容:

  • onErrorReturn/onErrorReturnItem,返回一个镜像原有 Observable 行为的新 Observable,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的 onCompleted 方法。

  • onErrorResumeNext,让 Observable 在遇到错误时开始发射第二个 Observable 的数据序列。

  • onExceptionResumeNext,跟 onErrorResumeNext 类似 ,但是,如果 onError 收到的 Throwable 不是一个 Exception ,它会将错误传递给观察者的 onError 方法,不会使用备用的 Observable 。

  • retry/retryWhen,不会将原始 Observable 的 onError 通知传递给观察者,它会订阅这个 Observable ,再给它一次机会无错误地完成它的数据序列。由于重新订阅,可能会造成数据项重复。默认在 trampoline 调度器上执行。

操作符示例代码

这里如果使用的是 Observer ,则默认为如下实现:

mObserver = new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
    }
};

delay 示例

/**
 * delay 默认在 computation    调度器上执行
 */
private void testDelay() {
    //延迟一段指定的时间再发射来自Observable的发射物
    Observable.range(0, 5).delay(2, TimeUnit.SECONDS).subscribe(mObserver);

    // delay 不会延迟 onError 通知,它会立即将这个通知传递给订阅者,同时丢弃剩下的所有数据
    Observable.error(new Throwable("测试错误")).delay(10, TimeUnit.SECONDS).subscribe(mObserver);

    // 会延迟 onCompleted 通知
    Observable.empty().delay(10, TimeUnit.SECONDS).subscribe(mObserver);

    // 它和dealy的区别是dealy是延迟数据的发送,而此操作符是延迟数据的注册,指定延迟时间的重载方法是执行在computation调度器的。
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

            Log.d(TAG, "call 线程:" + Thread.currentThread().getName());
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onNext(4);
            emitter.onNext(5);

        }
    }).delaySubscription(10, TimeUnit.SECONDS).subscribe(mObserver);
}

toList 示例

/**
 * 通过toList将所有的数据最终以List的形式输出,返回 SingleObserver
 * 
 * 结果如下;
 * 
 * 收到消息==[0, 1, 2, 3, 4] == 消息线程为:main
 */
private void testToList() {
    Observable.range(0, 5).toList().subscribe(new SingleObserver<List<Integer>>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(List<Integer> integers) {
            Log.d(TAG, "收到消息==" + integers + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }
    });
}

toMap 示例

/**
 * 该操作符收集原始Observable发射的所有数据项到一个Map(默认是HashMap)
 * 
 * 结果如下;
 * 
 * 收到消息=={key1=1, key2=2, key0=0} == 消息线程为:main
 * 收到消息=={key1=101, key2=102, key0=100} == 消息线程为:main
 */
private void testToMap() {
    Observable.range(0, 3).toMap(new Function<Integer, String>() {

        @Override
        public String apply(Integer integer) throws Exception {
            return "key" + integer;
        }
    }).subscribe(new SingleObserver<Map<String, Integer>>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(Map<String, Integer> stringIntegerMap) {
            Log.d(TAG, "收到消息==" + stringIntegerMap + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }
    });

    Observable.range(0, 3).toMap(new Function<Integer, String>() {

        @Override
        public String apply(Integer integer) throws Exception {
            // 生成 KEY
            return "key" + integer;
        }
    }, new Function<Integer, Integer>() {

        @Override
        public Integer apply(Integer integer) throws Exception {
            // 对值做变换
            return integer + 100;
        }
    }).subscribe(new SingleObserver<Map<String, Integer>>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(Map<String, Integer> stringIntegerMap) {
            Log.d(TAG, "收到消息==" + stringIntegerMap + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }
    });
}

toMultimap 示例

/**
 * 它生成的这个Map同时还是一个ArrayList
 * 
 * 结果如下;
 * 
 * 收到消息=={key1=[1], key2=[2], key0=[0]} == 消息线程为:main
 */
private void testMultiMap() {
    Observable.range(0, 3).toMultimap(new Function<Integer, String>() {

        @Override
        public String apply(Integer integer) throws Exception {
            return "key" + integer;
        }
    }).subscribe(new SingleObserver<Map<String, Collection<Integer>>>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(Map<String, Collection<Integer>> stringCollectionMap) {
            Log.d(TAG, "收到消息==" + stringCollectionMap + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }
    });
}

toSortedList 示例

/**
 * 类似于 toList ,但是它可以对数据进行自然排序,默认是自然升序,如果发射的数据项没有实现 Comparable     接口,会抛出一个异常。
 *
 * 结果如下:
 *
 * 收到消息==[0, 1, 2, 4, 7, 9, 10] == 消息线程为:main
 */
private void testToSortedList() {
    Observable.just(1, 10, 4, 7, 2, 9, 0).toSortedList().subscribe(new SingleObserver<List<Integer>>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onSuccess(List<Integer> integers) {
            Log.d(TAG, "收到消息==" + integers + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {

        }
    });

    Observable.just(1, 10, 4, 7, 2, 9, 0).toSortedList(new Comparator<Integer>() {
        @Override
        public int compare(Integer o1, Integer o2) {
            // 降序
            if (o1 == o2) {
                return 0;
            } else if (o1 > o2) {
                return -1;
            } else {
                return 1;
            }
        }
    }).subscribe(new SingleObserver<List<Integer>>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(List<Integer> integers) {
            Log.d(TAG, "收到消息==" + integers + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }
    });
}

timeOut 示例

/**
 * 如果原始Observable过了指定的一段时间没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable,默认在 computation 调度器上执行
 * 
 * 结果如下;
 * 
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 收到消息==3 == 消息线程为:main
 * 错误:The source did not signal an event for 250 milliseconds and has been terminated. == 错误线程为:RxComputationThreadPool-1
 * <p>
 * onNext 都是在 Main 线程上,报错的时候,在 computation 线程上
 */
private void testTimeout() {
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            Thread.sleep(100);
            emitter.onNext(2);
            Thread.sleep(200);
            emitter.onNext(3);
            Thread.sleep(300);
            emitter.onNext(4);
            Thread.sleep(400);
        }
    }).timeout(250, TimeUnit.MILLISECONDS).subscribe(mObserver);

    // 在超时的时候切换到一个我们指定的备用的Observable,而不是发错误通知
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            Thread.sleep(100);
            emitter.onNext(2);
            Thread.sleep(200);
            emitter.onNext(3);
            Thread.sleep(300);
            emitter.onNext(4);
            Thread.sleep(400);
        }
    }).timeout(250, TimeUnit.MILLISECONDS, Observable.just(100, 200)).subscribe(mObserver);

    Observable.just(100).delay(1, TimeUnit.SECONDS).timeout(500, TimeUnit.MILLISECONDS).subscribe(mObserver);
}

timestamp 示例

/**
 * 给Observable发射的数据项附加一个时间戳 ,默认在computation调度器上执行
 * 
 * 结果如下:
 * 
 * onNext: value:0 ===== time2019-07-24-09:55:24
 * onNext: value:1 ===== time2019-07-24-09:55:24
 * onNext: value:2 ===== time2019-07-24-09:55:24
 * onNext: value:3 ===== time2019-07-24-09:55:24
 */
private void testTimestamp() {
    // 默认单位为毫秒 TimeUnit.MILLISECONDS
    Observable.range(0, 5000).delay(1, TimeUnit.SECONDS).timestamp(TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Timed<Integer>>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(Timed<Integer> integerTimed) {
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
                    Log.d(TAG, "onNext: value:" + integerTimed.value() + " ===== " + "time" + sdf.format(new Date(integerTimed.time())));
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });
}

timeInterval 示例

/**
 * 将原始Observable发射的数据项替换为发射表示相邻数据时间间隔的对象
 *
 * 结果如下:
 *
 * onNext: value:0 ===== time1 == 消息线程为:main
 * onNext: value:1 ===== time0 == 消息线程为:main
 * onNext: value:2 ===== time0 == 消息线程为:main
 *
 */
private void testTimeInterval() {
    // 默认单位为毫秒 TimeUnit.MILLISECONDS
    Observable.range(0, 20).timeInterval(TimeUnit.MILLISECONDS)
            .subscribe(new Observer<Timed<Integer>>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(Timed<Integer> integerTimed) {
                    Log.d(TAG, "onNext: value:" + integerTimed.value() + " ===== " + "time" + integerTimed.time()+ " == 消息线程为:" + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });
}

doOnNext/doAfterNext 示例

/**
 * doOnNext onNext回调之前会先回调
 * doAfterNext  onNext回调之后再回调
 * 
 * 结果如下;
 * 
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 错误:测试错误 == 错误线程为:main
 */
private void testDoOnNext() {
    Observable.just(1, 2, 3).doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            if (integer > 2) {
                throw new RuntimeException("测试错误");
            }
        }
    }).subscribe(mObserver);
}

doOnError 示例

/**
 * onError回调之前会先回调
 * 
 * 结果如下:
 * 
 * call 线程:main
 * 错误:测试错误 == 错误线程为:main
 */
private void testDoOnError() {
    Observable.error(new Throwable("测试错误")).doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.d(TAG, "call 线程:" + Thread.currentThread().getName());
        }
    }).subscribe(mObserver);
}

doOnComplete 示例

/**
 * onComplete回调之前会先回调
 *
 * 结果如下:
 *
 * 收到消息==1 == 消息线程为:main
 * call 线程:main
 * 完成 == 完成线程为:main
 */
private void testDoOnComplete() {
    Observable.just(1).doOnComplete(new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "call 线程:" + Thread.currentThread().getName());

        }
    }).subscribe(mObserver);
}

doOnEach 示例

/**
 * doOnEach 相当于doOnNext,doOnError,doOnCompleted 的综合体
 *
 * 结果如下:
 *
 * DO 收到消息==1 == 消息线程为:main
 * 收到消息==1 == 消息线程为:main
 * DO 错误:测试错误 == 错误线程为:main
 * 错误:测试错误 == 错误线程为:main
 */
private void testDoOnEach() {
    Observable.just(1).concatWith(Observable.<Integer>error(new Throwable("测试错误"))).doOnEach(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(Integer o) {
            // onNext回调之前,会先回调这里的
            Log.d(TAG, "DO 收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            // onError回调之前,会先回调这里的
            Log.d(TAG, "DO 错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onComplete() {
            // onComplete回调之前,会先回调这里的
            Log.d(TAG, "DO 完成" + " == 完成线程为:" + Thread.currentThread().getName());
        }
    }).subscribe(mObserver);
} 

doOnTerminate/doAfterTerminate 示例

/**
 * Observable终止之前会被调用,无论是正常还是异常终止
 * 
 * 结果如下:
 * 
 * 收到消息==1 == 消息线程为:main
 * call 线程:main
 * 错误:测试错误 == 错误线程为:main
 */
private void testDoOnTerminate() {
    // doAfterTerminate Observable终止之后再被调用,无论是正常还是异常终止
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);

// emitter.onComplete();
emitter.onError(new Throwable(“测试错误”));
}
}).doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, “call 线程:” + Thread.currentThread().getName());
}
}).subscribe(mObserver);
}

doOnFinally 示例

/**
 * 在观察者执行完 onError、onComplete 或取消时执行的操作。
 * 
 * 结果如下:
 * 
 * 收到消息==0 == 消息线程为:RxComputationThreadPool-1
 * 收到消息==1 == 消息线程为:RxComputationThreadPool-1
 * call dispose:RxComputationThreadPool-1
 */
private void testDoFinally() {
    disposable = Observable.interval(1, TimeUnit.SECONDS).doFinally(new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "call dispose:" + Thread.currentThread().getName());
        }
    }).subscribe(new Consumer<Long>() {
        long count = 0;
        @Override
        public void accept(Long aLong) throws Exception {
            count++;
            Log.d(TAG, "收到消息==" + aLong + " == 消息线程为:" + Thread.currentThread().getName());
            if (count >= 2) {
                if (!disposable.isDisposed()) {
                    disposable.dispose();
                }
            }
        }
    });
}

doOnFinally 和 doAfterTerminate 的调用顺序示例

离观察者越近,就越先执行

/**
 * 测试 doOnFinally 和 doAfterTerminate 的调用顺序
 * 
 * 结果如下:
 * 
 * 收到消息==1 == 消息线程为:main
 * 完成 == 完成线程为:main
 * doFinally == 2
 * doAfterTerminate == 2
 * doFinally == 1
 * doAfterTerminate == 1
 */
private void testDoFinallyAndTerminate() {
    Observable.just(1).doAfterTerminate(() -> Log.d(TAG, "doAfterTerminate == 1")).doFinally(() -> Log.d(TAG, "doFinally == 1"))
            .doAfterTerminate(() -> Log.d(TAG, "doAfterTerminate == 2")).doFinally(() -> Log.d(TAG, "doFinally == 2"))
            .subscribe(mObserver);
}

doOnSubscribe 示例

/**
 * 当观察者订阅它生成的Observable就会被调用,在 onSubscribe 回调之前会先调用
 * 
 * 结果如下:
 * 
 * call 线程:main
 * 收到消息==1 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testDoOnSubscribe() {
    Observable.just(1).doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            Log.d(TAG, "call 线程:" + Thread.currentThread().getName());
        }
    }).subscribe(mObserver);
}

onErrorReturn/onErrorReturnItem 示例

/**
 * 方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的 onError 调用,
 * 不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的 onCompleted 方法
 * 
 * 结果如下:
 * 
 * 收到消息==100 == 消息线程为:main
 * 收到消息==0 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testOnErrorReturn() {
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(100);
            emitter.onError(new Throwable("测试错误"));
            emitter.onComplete();
        }
    }).onErrorReturn(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable throwable) throws Exception {
            return 0;
        }
    }).subscribe(mObserver);

    // 这里的结果等价于上面的方法
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(100);
            emitter.onError(new Throwable("测试错误"));
            emitter.onComplete();
        }
    }).onErrorReturnItem(0).subscribe(mObserver);
}

onErrorResumeNext 示例

/**
 * 让Observable在遇到错误时开始发射第二个Observable的数据序列。
 *
 * 结果如下:
 *
 * 收到消息==100 == 消息线程为:main
 * 收到消息==0 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testOnErrorResumeNext() {
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(100);
            emitter.onError(new Throwable("测试错误"));
            emitter.onComplete();
        }
    }).onErrorResumeNext(Observable.just(0)).subscribe(mObserver);
}

onExceptionResumeNext 示例

/**

 * 跟 onErrorResumeNext  类似 ,但是,如果 onError    收到 的    Throwable    不是一个    Exception    ,
 * 它会将错误传递给观察者的    onError    方法,不会使用备用 的Observable
 *
 * 结果如下:
 *
 *收到消息==100 == 消息线程为:main
 * 错误:测试错误 == 错误线程为:main
 */
private void testOnExceptionResumeNext() {
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(100);
            // 这里用的不是 Exception ,所以不会用备用 的 Observable
            emitter.onError(new Throwable("测试错误"));
            emitter.onComplete();
        }
    }).onExceptionResumeNext(Observable.just(0)).subscribe(mObserver);
}

retry/retryWhen 示例

private void testRetry() {
    // 无论收到多少次onError    通知,无参数版本的    retry    都会继续订阅并发射原始Observable,会一直重试,不会停止除非不报错
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(100);
            Log.d(TAG, "重试线程为:" + Thread.currentThread().getName());
            emitter.onError(new Throwable("测试错误"));
        }
    }).retry().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(mObserver);

    // 接受单个    count    参数的    retry    会最多重新订阅指定的次数,如果次数超了,它不会尝试再次 订阅,它会把最新的一个    onError    通知传递给它的观察者。
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(100);
            Log.d(TAG, "重试线程为:" + Thread.currentThread().getName());
            emitter.onError(new Throwable("测试错误"));
        }
    }).retry(3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(mObserver);


    //     retryWhen    观察它的结果再决定是不是要重新订阅原 始的Observable。如果这个Observable发射了一项数据,它就重新订阅,
    //     如果这个 Observable发射的是    onError    通知,它就将这个通知传递给观察者然后终止。
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d(TAG, "重试线程为:" + Thread.currentThread().getName());
            emitter.onError(new Throwable("测试错误"));
        }
    }).retryWhen(new Function<Observable<Throwable>, ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> apply(Observable<Throwable> throwableObservable) throws Exception {
            return Observable.just(1).concatWith(new CompletableSource() {
                @Override
                public void subscribe(CompletableObserver co) {
                    co.onError(new Throwable("发身一个Error通知"));
                }
            });
        }
    }).subscribe(mObserver);
}