Rxjava 介绍
响应式编程(Reactive Programming,简称RP),是一种面向数据流和变化传播的编程范式,有以下几个特点:异步、数据流和变化传播。
RXJava
是 Reactive Extensions
在 JVM
平台上的一个实现,通过使用观察者序列来构建异步、基于事件的程序。 RxJava
可以说是观察者设计模式的一个扩展,支持不同的数据事件流和额外的操作类,允许通过声明式的方式构建不同的执行序列,通过抽象的方式屏蔽底层的多线程实现、同步、线程安全、并发数据结构、非阻塞 IO 等逻辑。
Rxjava 基础知识
Observable
RxJava
的使用通常需要三步:
一. 创建 Observable
Observable
的意思是被观察者,使用 RxJava
时需要创建一个被观察者,它会决定什么时候触发事件以及触发怎样的事件。有点类似上游发送命令,可以在这里决定异步操作模块的顺序和异步操作模块的次数。
二. 创建 Observer
Observer
即观察者,它可以在不同的线程中执行任务。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,可以在未来某个时刻响应 Observable
的通知,而不需要阻塞等待 Observable
发射数据。
三. 使用 subscribe 进行订阅
创建了 Observable
和 Observer
之后,我们还需要使用subscribe()
方法将它们连接起来,这样整个上下游就能衔接起来实现链式调用,比如:
String[] array = {"1", "2", "3", "4", "5"};
Disposable disposable = Observable.just(array).subscribe(new Consumer<String[]>() {
@Override
public void accept(String[] strings) throws Exception {
for (String string : strings) {
Log.d(TAG, "收到消息==" + string + " == 消息线程为:" + Thread.currentThread().getName());
}
}
});
just
是创建 Observable
的操作符,Consumer
是消费者,用于接收单个值,除外,还有 Action
,用于接收无参数类型。在 RxJava
中,被观察者,观察者,subscribe()
方法三者缺一不可,只有使用了 subscribe() 方法才会开始发送数据。
被观察者类型
Observable,能够发射 0 或 n 个数据,并以成功或错误事件终止。
Flowable ,能够发射 0 或 n 个数据,并以成功或错误事件终止,支持背压,可以控制数据源发射的速度。
Single ,只有
onSuccess
和onError
,并没有onComplete
。其中,onSuccess
用于发射数据,且只能发射 1 个数据,后面即使再发射数据也不会做任何处理。Completable,不发射任何数据,只有
onComplete
和onError
事件。Maybe ,
single
和completable
的结合体,只能发射 0 或 1 个数据,后续发的数据不会做任何处理,有onComplete
和onError
事件。如果先调用了onComplete
,即使后面再调用onSuccess
也不会发射任何数据。在没有数据发射时,会回调onComplete
,如果有数据发射或回调了onError
,则不会再执行onComplete
了。
热 Observable 和 冷 Observable
Hot Observable
,无论有没有观察者进行订阅,事件始终都会发生。当 Hot Observable
有多个订阅者时(多个观察者进行订阅时),Hot Observable
与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息,就像一个广播电台,所有在此刻收听的听众都会听到同一首歌。
Cold Observable
是只有观察者订阅了,才开始执行发射数据流的代码。并且 Cold Observable
和 Observer
只能是一对一的关系。当有多个不同的订阅者时,消息是重新完整发送 的。也就是说,对 Cold Observable
而言,有多个 Observer
的时候,它们各自的事件是独立的,就像一张音乐 CD ,人们可以独立购买并听取它。
Subject 和 Processor
Subject
是一种特殊的存在,既是 Observable
,又是 Observer
,即又是被观察的对象又是观察者,共有 4 种不同的类别:
AsyncSubject,无论订阅发生在什么时候,只发射最后一个数据,必须调用 onComplete 才会开始发送数据。
BehaviorSubject,发送订阅之前的一个数据(没发射数据则发送一个默认数据)和订阅之后的全部数据。
ReplaySubject,无论订阅发生在什么时候,都发射全部数据。
PublishSubject,发送订阅之后的全部数据。
Processor
跟 Subject
的作用相同,能够支持背压控制,也有 4 种类型,分别为 AsyncProcessor
、 BehaviorProcessor
、 ReplayProcessor
、 PublishProcessor
。
线程调度
在默认情况下,RxJava
只在当前线程中运行,它是单线程的。此时 Observable
用于发射数据流,Observer
用于接收和响应数据流,各种操作符(Operators) 用于加工数据流,它们都在同一个线程中运行,实现出来的是一个同步的函数响应式。然而,函数响应式的实际应用是大部分操作都在后台处理,前台响应的一个过程。此时会涉及使用多线程来操作,我们可以使用 RxJava
的调度器(Scheduler)来实现。
共有6种线程调度类型:
single,使用定长为1的线程池(new Scheduled Thread pool(1)),重复利用这个线程。
newThread,每次都启用新线程,并在新线程中执行操作。
computation,使用的固定的线程池(Fixed Scheduler Pool),大小为 CPU 核数,适用于 CPU 密集型计算。
io,适合 IO 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 new Thread() 差不多,区别在于 io 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下,io 比 new Thread 更有效率。
trampoline,直接在当前线程运行,如果当前线程有其他任务正在执行,则会先暂停其他任务。
Schedulers.from,如果内置的 Scheduler 不能满足业务需求,那么可以使用自定义的 Executor 作为调度器,以满足个性化需求。
默认 Observable
和 Observer
处于同一线程中,我们可以使用 subscribeOn()
和observeOn()
来切换到指定线程来工作。
subscribeOn,用来指定对数据的处理运行在特定的线程上,若多次使用,只有第一次起作用。
observeOn,用来指定下游操作运行在特定的线程上,多次使用都会起作用。
代码示例
SingleObservable 示例
/**
* 只能发射零或一个数据,后续发的数据不会做任何处理
*
* 收到消息==2 == 消息线程为:main
*/
private void testSingleObservable() {
Disposable disposable = Single.create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> emitter) throws Exception {
emitter.onSuccess(2);
emitter.onSuccess(3);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
}
});
}
CompletableObservable 示例
/**
* 不会发射任何数据,只有 onComplete 和 onError 事件
*/
private void testCompletableObservable() {
Disposable disposable = Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
TimeUnit.SECONDS.sleep(1);
emitter.onComplete();
}
}).andThen(Observable.range(1, 10)).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
}
});
}
MaybeObservable 示例
/**
* single 和 completable 的结合体,只能发射零或一个数据,后续发的数据不会做任何处理,有 onComplete 和 onError 事件
*
* 收到消息==1 == 消息线程为:main
*
* 后续的 2 和 onComplete 事件不会收到
*/
private void testMaybeObservable() {
Disposable disposable = Maybe.create(new MaybeOnSubscribe<Integer>() {
@Override
public void subscribe(MaybeEmitter<Integer> emitter) throws Exception {
emitter.onSuccess(1);
emitter.onSuccess(2);
emitter.onComplete();
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Maybe Complete==" + " == 消息线程为:" + Thread.currentThread().getName());
}
});
// 如果没有发射任何数据时,会回调 onComplete ,有发射数据或 onError ,则不会再回调 onComplete
Maybe.empty().subscribe(new MaybeObserver<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Object o) {
Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
}
});
}