Rxjava使用介绍

Rxjava 介绍

响应式编程(Reactive Programming,简称RP),是一种面向数据流和变化传播的编程范式,有以下几个特点:异步数据流变化传播

RXJavaReactive ExtensionsJVM 平台上的一个实现,通过使用观察者序列来构建异步、基于事件的程序。 RxJava 可以说是观察者设计模式的一个扩展,支持不同的数据事件流和额外的操作类,允许通过声明式的方式构建不同的执行序列,通过抽象的方式屏蔽底层的多线程实现、同步、线程安全、并发数据结构、非阻塞 IO 等逻辑。

Rxjava 基础知识

Observable

RxJava 的使用通常需要三步:

一. 创建 Observable

Observable 的意思是被观察者,使用 RxJava 时需要创建一个被观察者,它会决定什么时候触发事件以及触发怎样的事件。有点类似上游发送命令,可以在这里决定异步操作模块的顺序和异步操作模块的次数。

二. 创建 Observer

Observer 即观察者,它可以在不同的线程中执行任务。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,可以在未来某个时刻响应 Observable 的通知,而不需要阻塞等待 Observable 发射数据。

三. 使用 subscribe 进行订阅

创建了 ObservableObserver 之后,我们还需要使用subscribe() 方法将它们连接起来,这样整个上下游就能衔接起来实现链式调用,比如:

String[] array = {"1", "2", "3", "4", "5"};
Disposable disposable = Observable.just(array).subscribe(new Consumer<String[]>() {
    @Override
    public void accept(String[] strings) throws Exception {
        for (String string : strings) {
            Log.d(TAG, "收到消息==" + string + " == 消息线程为:" + Thread.currentThread().getName());
        }
    }
});

just 是创建 Observable 的操作符,Consumer 是消费者,用于接收单个值,除外,还有 Action ,用于接收无参数类型。在 RxJava 中,被观察者,观察者,subscribe() 方法三者缺一不可,只有使用了 subscribe() 方法才会开始发送数据

被观察者类型

  • Observable,能够发射 0n 个数据,并以成功或错误事件终止。

  • Flowable ,能够发射 0n 个数据,并以成功或错误事件终止,支持背压,可以控制数据源发射的速度。

  • Single ,只有 onSuccessonError,并没有 onComplete 。其中,onSuccess 用于发射数据,且只能发射 1 个数据,后面即使再发射数据也不会做任何处理。

  • Completable,不发射任何数据,只有 onCompleteonError 事件。

  • Maybesinglecompletable 的结合体,只能发射 01 个数据,后续发的数据不会做任何处理,有 onCompleteonError 事件。如果先调用了 onComplete ,即使后面再调用 onSuccess 也不会发射任何数据。在没有数据发射时,会回调 onComplete ,如果有数据发射或回调了 onError,则不会再执行 onComplete 了。

热 Observable 和 冷 Observable

Hot Observable ,无论有没有观察者进行订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时(多个观察者进行订阅时),Hot Observable 与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息,就像一个广播电台,所有在此刻收听的听众都会听到同一首歌。

Cold Observable 是只有观察者订阅了,才开始执行发射数据流的代码。并且 Cold ObservableObserver 只能是一对一的关系。当有多个不同的订阅者时,消息是重新完整发送 的。也就是说,对 Cold Observable 而言,有多个 Observer 的时候,它们各自的事件是独立的,就像一张音乐 CD ,人们可以独立购买并听取它。

Subject 和 Processor

Subject 是一种特殊的存在,既是 Observable ,又是 Observer ,即又是被观察的对象又是观察者,共有 4 种不同的类别:

  • AsyncSubject,无论订阅发生在什么时候,只发射最后一个数据,必须调用 onComplete 才会开始发送数据

  • BehaviorSubject,发送订阅之前的一个数据(没发射数据则发送一个默认数据)和订阅之后的全部数据。

  • ReplaySubject,无论订阅发生在什么时候,都发射全部数据。

  • PublishSubject,发送订阅之后的全部数据。

ProcessorSubject 的作用相同,能够支持背压控制,也有 4 种类型,分别为 AsyncProcessorBehaviorProcessorReplayProcessorPublishProcessor

线程调度

在默认情况下,RxJava 只在当前线程中运行,它是单线程的。此时 Observable 用于发射数据流,Observer 用于接收和响应数据流,各种操作符(Operators) 用于加工数据流,它们都在同一个线程中运行,实现出来的是一个同步的函数响应式。然而,函数响应式的实际应用是大部分操作都在后台处理,前台响应的一个过程。此时会涉及使用多线程来操作,我们可以使用 RxJava 的调度器(Scheduler)来实现。

共有6种线程调度类型:

  • single,使用定长为1的线程池(new Scheduled Thread pool(1)),重复利用这个线程。

  • newThread,每次都启用新线程,并在新线程中执行操作。

  • computation,使用的固定的线程池(Fixed Scheduler Pool),大小为 CPU 核数,适用于 CPU 密集型计算。

  • io,适合 IO 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 new Thread() 差不多,区别在于 io 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下,io 比 new Thread 更有效率。

  • trampoline,直接在当前线程运行,如果当前线程有其他任务正在执行,则会先暂停其他任务。

  • Schedulers.from,如果内置的 Scheduler 不能满足业务需求,那么可以使用自定义的 Executor 作为调度器,以满足个性化需求。

默认 ObservableObserver 处于同一线程中,我们可以使用 subscribeOn()observeOn() 来切换到指定线程来工作。

subscribeOn,用来指定对数据的处理运行在特定的线程上,若多次使用,只有第一次起作用。
observeOn,用来指定下游操作运行在特定的线程上,多次使用都会起作用。

代码示例

SingleObservable 示例

/**
 * 只能发射零或一个数据,后续发的数据不会做任何处理
 * 
 * 收到消息==2 == 消息线程为:main
 */
private void testSingleObservable() {
    Disposable disposable = Single.create(new SingleOnSubscribe<Integer>() {

        @Override
        public void subscribe(SingleEmitter<Integer> emitter) throws Exception {
            emitter.onSuccess(2);
            emitter.onSuccess(3);
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });
}

CompletableObservable 示例

/**
 * 不会发射任何数据,只有 onComplete 和 onError 事件
 */
private void testCompletableObservable() {
    Disposable disposable = Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(CompletableEmitter emitter) throws Exception {
            TimeUnit.SECONDS.sleep(1);
            emitter.onComplete();

        }
    }).andThen(Observable.range(1, 10)).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });
}

MaybeObservable 示例

/**
 * single 和 completable 的结合体,只能发射零或一个数据,后续发的数据不会做任何处理,有 onComplete 和 onError 事件
 * 
 * 收到消息==1 == 消息线程为:main
 * 
 * 后续的 2 和 onComplete 事件不会收到
 */
private void testMaybeObservable() {
    Disposable disposable = Maybe.create(new MaybeOnSubscribe<Integer>() {

        @Override
        public void subscribe(MaybeEmitter<Integer> emitter) throws Exception {
            emitter.onSuccess(1);
            emitter.onSuccess(2);
            emitter.onComplete();
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {

        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "Maybe Complete=="  + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });

    // 如果没有发射任何数据时,会回调 onComplete ,有发射数据或 onError ,则不会再回调 onComplete
    Maybe.empty().subscribe(new MaybeObserver<Object>() {

        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onSuccess(Object o) {
            Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {
            Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
        }
    });
}