转换器(Transformer)
在 RxJava 2.x 版本中有 ObservableTransformer
、 SingleTransformer
、 CompletableTransformer
、 FlowableTransformer
和 MaybeTransformer
。其中, FlowableTransformer
和 MaybeTransformer
是新增的。由于 RxJava 2将 Observable
拆分成了 Observable
和 Flowable
,所以有了 FlowableTransformer
。 同样,Maybe
也是 RxJava2 新增的一个类型,所以有 MaybeTransformer
。
Transformer
能够将一个 Observable/Flowable/Single/Completable/Maybe
对象转换成另一个 Observable/Flowable/Single/Completable/Maybe
对象,与调用一系列的内联操作符一模一样。
转换器是和 compose
操作符结合使用的,compose
操作符能够从数据流中得到原始的被观察者。当创建被观察者时,compose
操作符会立即执行,而不像其他的操作符需要在 onNext
调用后执行。
注意:Transformer
面向的对象是源 Observable
,进行统一的转换操作,可以给不同的 Observable
复用,达到代码重用的效果。
代码示例:
这里用到的 Transformer
是用来做线程切换的。
public class TransformerUtil {
private final static ObservableTransformer mObservableTransformer = new ObservableTransformer() {
@Override
public ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
public static <T> ObservableTransformer<T, T> io_main() {
return mObservableTransformer;
}
}
private void testCompose() {
Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
.compose(TransformerUtil.<Long>io_main()).subscribe(mObserver);
}
并行操作
我们之前的 Observable
发射的数据流可以经历各种线程切换,但是数据流的各个元素之间不会产生并行的效果,并行不是并发,也不是同步,更不是异步。
并发:一个处理器同进处理多个不同的任务。
并行:多个处理器或多核的处理器同进处理多个不同的任务,是同时发生的多个并发事件,具有并发的含义。
利用 flatMap 操作符实现并行效果
/**
* 通过 FlatMap 操作符来实现并行效果,发射的数据会交错
*/
private void testParalleByFlatMap() {
Observable.range(1, 100).flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
return Observable.just(integer).subscribeOn(Schedulers.computation());
}
}).subscribe(mObserver);
}
flatMap
会将原 Observable
发射的数据转化为多个 Observable
,再将这些 Observable
发射的数据整合发射出来,这些 Observable
可以指定用成的线程来并发地执行,这里使用了 Schedulers.computation()
,当然,我们也可以自定义线程池来处理。
/**
* 通过 FlatMap 操作符来实现并行效果,发射的数据会交错,用自定义的调度器
*/
private void testParalleByFlatMapWithExecutor() {
int threadNum = Runtime.getRuntime().availableProcessors() + 1;
final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
Observable.range(1, 100).flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
return Observable.just(integer).subscribeOn(Schedulers.from(executorService));
}
}).doFinally(new Action() {
@Override
public void run() throws Exception {
// 最后需要执行 shutdown 来关闭
executorService.shutdown();
}
}).subscribe(mObserver);
}
注意:当完成所有的操作后,需要关闭线程池,这里用到了 doFinally 来做后续的处理。
通过 ParallelFlowable 来实现并行效果
ParallelFlowable
是并行的 Flowable
版本,不是新的 Observable
,因为并行处理中背压是必不可少的,而 Flowable
是专门处理背压的, ParallelFlowable
只是增加了并行的功能,为了增加并行效果,所以会有一些操作符是无法使用的,比如 take 、skip 等。
在相应的操作符上调用 Flowable
的 paralle()
方法就会返回 ParallelFlowable
对象,最后需要使用 sequential()
操作符返回到顺序流。在 ParallelFlowable
中是通过 runOn
方法来切换线程的,不能使用 subscribeOn
和 observeOn
。
示例代码如下:
/**
* 通过 ParalleFlowable 来实现并行效果,优先使用 ParalleFlowable 来实现并行,对于无法使用 ParalleFlowable 的操作符,则可以
* 使用 FlatMap 的方式来实现并行
*/
private void testParalleByFlowable() {
ParallelFlowable<Integer> parallelFlowable = Flowable.range(1, 100).parallel();
// 需要调用 sequential 操作符返回到顺序流
Disposable disposable = parallelFlowable.runOn(Schedulers.io()).sequential().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
}
});
}