Rxjava并行编程和转换器

转换器(Transformer)

在 RxJava 2.x 版本中有 ObservableTransformerSingleTransformerCompletableTransformerFlowableTransformerMaybeTransformer 。其中, FlowableTransformerMaybeTransformer 是新增的。由于 RxJava 2将 Observable 拆分成了 ObservableFlowable ,所以有了 FlowableTransformer 。 同样,Maybe 也是 RxJava2 新增的一个类型,所以有 MaybeTransformer

Transformer 能够将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象,与调用一系列的内联操作符一模一样。

转换器是和 compose 操作符结合使用的,compose 操作符能够从数据流中得到原始的被观察者。当创建被观察者时,compose 操作符会立即执行,而不像其他的操作符需要在 onNext 调用后执行。

注意:Transformer 面向的对象是源 Observable ,进行统一的转换操作,可以给不同的 Observable 复用,达到代码重用的效果。

代码示例:

这里用到的 Transformer 是用来做线程切换的。

public class TransformerUtil {

    private final static ObservableTransformer mObservableTransformer = new ObservableTransformer() {
        @Override
        public ObservableSource apply(Observable upstream) {
            return upstream.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };

    public static <T> ObservableTransformer<T, T> io_main() {
        return mObservableTransformer;
    }
}

private void testCompose() {
    Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
            .compose(TransformerUtil.<Long>io_main()).subscribe(mObserver);
}

并行操作

我们之前的 Observable 发射的数据流可以经历各种线程切换,但是数据流的各个元素之间不会产生并行的效果,并行不是并发,也不是同步,更不是异步。

并发:一个处理器同进处理多个不同的任务。

并行:多个处理器或多核的处理器同进处理多个不同的任务,是同时发生的多个并发事件,具有并发的含义。

利用 flatMap 操作符实现并行效果

/**
 * 通过 FlatMap 操作符来实现并行效果,发射的数据会交错
 */
private void testParalleByFlatMap() {
    Observable.range(1, 100).flatMap(new Function<Integer, ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> apply(Integer integer) throws Exception {
            return Observable.just(integer).subscribeOn(Schedulers.computation());
        }
    }).subscribe(mObserver);
}

flatMap 会将原 Observable 发射的数据转化为多个 Observable ,再将这些 Observable 发射的数据整合发射出来,这些 Observable 可以指定用成的线程来并发地执行,这里使用了 Schedulers.computation() ,当然,我们也可以自定义线程池来处理。

/**
 * 通过 FlatMap 操作符来实现并行效果,发射的数据会交错,用自定义的调度器
 */
private void testParalleByFlatMapWithExecutor() {
    int threadNum = Runtime.getRuntime().availableProcessors() + 1;
    final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
    Observable.range(1, 100).flatMap(new Function<Integer, ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> apply(Integer integer) throws Exception {
            return Observable.just(integer).subscribeOn(Schedulers.from(executorService));
        }
    }).doFinally(new Action() {
        @Override
        public void run() throws Exception {
            // 最后需要执行 shutdown 来关闭
            executorService.shutdown();
        }
    }).subscribe(mObserver);
}

注意:当完成所有的操作后,需要关闭线程池,这里用到了 doFinally 来做后续的处理。

通过 ParallelFlowable 来实现并行效果

ParallelFlowable 是并行的 Flowable 版本,不是新的 Observable ,因为并行处理中背压是必不可少的,而 Flowable 是专门处理背压的, ParallelFlowable 只是增加了并行的功能,为了增加并行效果,所以会有一些操作符是无法使用的,比如 take 、skip 等。

在相应的操作符上调用 Flowableparalle() 方法就会返回 ParallelFlowable 对象,最后需要使用 sequential() 操作符返回到顺序流。在 ParallelFlowable 中是通过 runOn 方法来切换线程的,不能使用 subscribeOnobserveOn

示例代码如下:

/**
 * 通过 ParalleFlowable 来实现并行效果,优先使用 ParalleFlowable 来实现并行,对于无法使用 ParalleFlowable 的操作符,则可以
 * 使用 FlatMap 的方式来实现并行
 */
private void testParalleByFlowable() {
    ParallelFlowable<Integer> parallelFlowable = Flowable.range(1, 100).parallel();
    // 需要调用 sequential 操作符返回到顺序流
    Disposable disposable = parallelFlowable.runOn(Schedulers.io()).sequential().subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });

}