转换器(Transformer)
在 RxJava 2.x 版本中有 ObservableTransformer 、 SingleTransformer 、 CompletableTransformer 、 FlowableTransformer 和 MaybeTransformer 。其中, FlowableTransformer 和 MaybeTransformer 是新增的。由于 RxJava 2将 Observable 拆分成了 Observable 和 Flowable ,所以有了 FlowableTransformer 。 同样,Maybe 也是 RxJava2 新增的一个类型,所以有 MaybeTransformer 。
Transformer 能够将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象,与调用一系列的内联操作符一模一样。
转换器是和 compose 操作符结合使用的,compose 操作符能够从数据流中得到原始的被观察者。当创建被观察者时,compose 操作符会立即执行,而不像其他的操作符需要在 onNext 调用后执行。
注意:Transformer 面向的对象是源 Observable ,进行统一的转换操作,可以给不同的 Observable 复用,达到代码重用的效果。
代码示例:
这里用到的 Transformer 是用来做线程切换的。
public class TransformerUtil {
    private final static ObservableTransformer mObservableTransformer = new ObservableTransformer() {
        @Override
        public ObservableSource apply(Observable upstream) {
            return upstream.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
    public static <T> ObservableTransformer<T, T> io_main() {
        return mObservableTransformer;
    }
}
private void testCompose() {
    Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
            .compose(TransformerUtil.<Long>io_main()).subscribe(mObserver);
}
并行操作
我们之前的 Observable 发射的数据流可以经历各种线程切换,但是数据流的各个元素之间不会产生并行的效果,并行不是并发,也不是同步,更不是异步。
并发:一个处理器同进处理多个不同的任务。
并行:多个处理器或多核的处理器同进处理多个不同的任务,是同时发生的多个并发事件,具有并发的含义。
利用 flatMap 操作符实现并行效果
/**
 * 通过 FlatMap 操作符来实现并行效果,发射的数据会交错
 */
private void testParalleByFlatMap() {
    Observable.range(1, 100).flatMap(new Function<Integer, ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> apply(Integer integer) throws Exception {
            return Observable.just(integer).subscribeOn(Schedulers.computation());
        }
    }).subscribe(mObserver);
}
flatMap 会将原 Observable 发射的数据转化为多个 Observable ,再将这些 Observable 发射的数据整合发射出来,这些 Observable 可以指定用成的线程来并发地执行,这里使用了 Schedulers.computation() ,当然,我们也可以自定义线程池来处理。
/**
 * 通过 FlatMap 操作符来实现并行效果,发射的数据会交错,用自定义的调度器
 */
private void testParalleByFlatMapWithExecutor() {
    int threadNum = Runtime.getRuntime().availableProcessors() + 1;
    final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
    Observable.range(1, 100).flatMap(new Function<Integer, ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> apply(Integer integer) throws Exception {
            return Observable.just(integer).subscribeOn(Schedulers.from(executorService));
        }
    }).doFinally(new Action() {
        @Override
        public void run() throws Exception {
            // 最后需要执行 shutdown 来关闭
            executorService.shutdown();
        }
    }).subscribe(mObserver);
}
注意:当完成所有的操作后,需要关闭线程池,这里用到了 doFinally 来做后续的处理。
通过 ParallelFlowable 来实现并行效果
ParallelFlowable 是并行的 Flowable 版本,不是新的 Observable ,因为并行处理中背压是必不可少的,而 Flowable 是专门处理背压的, ParallelFlowable 只是增加了并行的功能,为了增加并行效果,所以会有一些操作符是无法使用的,比如 take 、skip 等。
在相应的操作符上调用 Flowable 的 paralle() 方法就会返回 ParallelFlowable 对象,最后需要使用 sequential() 操作符返回到顺序流。在 ParallelFlowable 中是通过 runOn 方法来切换线程的,不能使用 subscribeOn 和 observeOn 。
示例代码如下:
/**
 * 通过 ParalleFlowable 来实现并行效果,优先使用 ParalleFlowable 来实现并行,对于无法使用 ParalleFlowable 的操作符,则可以
 * 使用 FlatMap 的方式来实现并行
 */
private void testParalleByFlowable() {
    ParallelFlowable<Integer> parallelFlowable = Flowable.range(1, 100).parallel();
    // 需要调用 sequential 操作符返回到顺序流
    Disposable disposable = parallelFlowable.runOn(Schedulers.io()).sequential().subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });
}