Rxjava合并和连接操作符

Rxjava 合并和连接操作符

RxJava 的合并操作符主要包括如下内容:

  • combineLatest,当多个 Observable 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。

  • join,该操作符只要在另一个 Observable 发射的数据定义的时间窗口内,这个 Observable 发射了一条数据,就结合两个 Observable 发射的数据。即一个 Observable 的每个数据跟基座的 Observable 的所有数据都结合一次,是一对多的关系。先发射数据的 Observable 为基座。

  • merge,将多个 Observable 的输出合并,就好像它们是一个单个的 Observable 一样,合并的 Observable 发射的数据可能会交错(顺序发生变化),有一个类似的操作符 Concat 不会让数据交错,能够保持数据的顺序性。在此过程中任何一个原始 Observable 的 onError 通知都会被立即传递给观察者,而且会终止合并后的 Observable 。

  • mergeDelayError,跟 merge 同样的作用,但是它会保留 onError 通知直到合并后的 Observable 所有的数据发射完成,在那时它才会把 onError 传递给观察者。

  • zip,使用一个函数按顺序结合多个 Observable 发射的数据项,然后发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个 Observable 一样多的数据,假如两个 Observable 数据分布为 4 项,5 项,则最终合并是 4 项。

  • startWith/concatWith,startWith 在发射数据之前先发射一个指定的数据序列 (头添加),最后发射的数据永远排在最前面。concatWith,在发射数据后面发射一个指定的数据序列 (尾添加)。

RxJava 的连接操作符主要包括如下内容:

  • ConnectableObservable.connect(),指示一个可连接的 Observable 开始发射数据。

  • Observable.publish(),将一个 Observable 转换为一个可连接的 Observable 。

  • Observable.replay,确保所有的订阅者收到相同的数据序列,即使它们在 Observable 开始发射数据之后才订阅。

合并操作符示例代码

这里如果使用的是 Observer ,则默认为如下实现:

mObserver = new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
    }
};

combineLatest 示例

/**
 * 当多个 Observable 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据
 *
 * 结果如下;
 *
 * 收到消息==observableA:1=====observableB:14=====observableC:104===== == 消息线程为:RxComputationThreadPool-1
 * 收到消息==observableA:2=====observableB:14=====observableC:104===== == 消息线程为:RxComputationThreadPool-1
 * 收到消息==observableA:3=====observableB:14=====observableC:104===== == 消息线程为:RxComputationThreadPool-1
 * 收到消息==observableA:4=====observableB:14=====observableC:104===== == 消息线程为:RxComputationThreadPool-1
 * 完成 == 完成线程为:RxComputationThreadPool-1
 */
private void testCombineLatest() {
    Observable<Integer> observableA = Observable.range(1, 4).delay(1, TimeUnit.SECONDS);
    Observable<Integer> observableB = Observable.range(10, 5);
    Observable<Integer> observableC = Observable.range(100, 5);

    Observable.combineLatest(observableA, observableB, observableC, new Function3<Integer, Integer, Integer, String>() {
        @Override
        public String apply(Integer integer, Integer integer2, Integer integer3) throws Exception {
            return "observableA:" + integer + "=====" + "observableB:" + integer2 + "=====" + "observableC:" + integer3 + "=====";
        }
    }).subscribe(mObserver);
}

join 示例

/**
 * 该操作符只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据
 * 即一个 Observable 的每个数据跟基座的 Observable 的所有数据都结合一次,是一对多的关系。先发射数据的 Observable 为基座。
 *
 * 结果如下;
 *
 * observableB==7 == 消息线程为:main
 * observableB==8 == 消息线程为:main
 * observableA==1 == 消息线程为:RxComputationThreadPool-1
 * observableA ==1observableB ==7== 消息线程为:RxComputationThreadPool-1
 * 收到消息==8 == 消息线程为:RxComputationThreadPool-1-1
 * observableA ==1observableB ==8== 消息线程为:RxComputationThreadPool-1
 * 收到消息==9 == 消息线程为:RxComputationThreadPool-1-1
 * observableA==2 == 消息线程为:RxComputationThreadPool-1
 * observableA ==2observableB ==7== 消息线程为:RxComputationThreadPool-1-1
 * 收到消息==9 == 消息线程为:RxComputationThreadPool-1
 * observableA ==2observableB ==8== 消息线程为:RxComputationThreadPool-1
 * 收到消息==10 == 消息线程为:RxComputationThreadPool-1-1
 * 完成 == 完成线程为:RxComputationThreadPool-1
 *
 */
private void testJoin() {
    Observable<Integer> observableA = Observable.range(1, 2).delay(1, TimeUnit.SECONDS);
    Observable<Integer> observableB = Observable.range(7, 2);
    observableA.join(observableB, new Function<Integer, ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> apply(Integer integer) throws Exception {
            Log.d(TAG, "observableA==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
            return Observable.just(integer).delay(1, TimeUnit.SECONDS);
        }
    }, new Function<Integer, ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> apply(Integer integer) throws Exception {
            Log.d(TAG, "observableB==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
            return Observable.just(integer).delay(1, TimeUnit.SECONDS);
        }
    }, new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            Log.d(TAG, "observableA ==" + integer + "observableB ==" + integer2 + "== 消息线程为:" + Thread.currentThread().getName());
            return integer + integer2;
        }
    }).subscribe(mObserver);
}

merge 示例

/**
 * 将多个Observables的输出合并,就好像它们是一个单个的Observable一样,合并的Observable发射的数据可能会交错(顺序发生变化),
 * 有一个类似的操作符 Concat 不会让数据交错,能够保持数据的顺序性。
 * 在此过程中任何一个原始Observable的onError通知都会被立即传递给观察者,而且会终止合并后的Observable。
 * 
 * 结果如下;
 * 
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 收到消息==5 == 消息线程为:main
 * 收到消息==6 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testMerge() {
    Observable<Integer> observableA = Observable.range(1, 2);
    Observable<Integer> observableB = Observable.range(5, 2);
    Observable.merge(observableA, observableB).subscribe(mObserver);
}

mergeDelayError 示例

/**
 * 将多个Observables的输出合并,就好像它们是一个单个的Observable一样,合并的Observable发射的数据可能会交错(顺序发生变化),
 * 有一个类似的操作符 concatDelayError 不会让数据交错,能够保持数据的顺序性。
 * 但是它会保留 onError 通知直到合并后的 Observable 所有的数据发射完成,在那时它才会把 onError 传递给观察者。
 * 
 * 结果如下;
 * 
 * 收到消息==100 == 消息线程为:main
 * 收到消息==0 == 消息线程为:main
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 错误:error == 错误线程为:main
 */
private void testMergeDelayError() {
    Observable<Integer> observableA = Observable.range(1, 2);
    Observable<Integer> observableB = Observable.range(5, 2);
    Observable.merge(observableA, observableB).subscribe(mObserver);
}

zip 示例

/**
 * 使用一个函数按顺序结合多个 Observable 发射的数据项,然后发射这个函数返回的结果。
 * 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据,假如两个Observable数据分布为4项,5项,则最终合并是4项
 * 
 * 结果如下;
 * 
 * 收到消息==observableA: 1 ==== observableB: 7 == 消息线程为:main
 * 收到消息==observableA: 2 ==== observableB: 8 == 消息线程为:main
 */
private void testZip() {
    Observable<Integer> observableA = Observable.range(1, 2);   // 1 2
    Observable<Integer> observableB = Observable.range(7, 3);   // 7 8 9
    Observable.zip(observableA, observableB, new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer integer, Integer integer2) throws Exception {
            return "observableA: " + integer + " ==== " + "observableB: " + integer2;
        }
    }).subscribe(mObserver);
}

startWith 示例

/**
 * startWith 在发射数据之前先发射一个指定的数据序列 (头添加),最后发射的数据永远排在最前面
 * concatWith 在发射数据后面发射一个指定的数据序列 (尾添加)
 * 
 * 结果如下;
 * 
 * 收到消息==15 == 消息线程为:main
 * 收到消息==500 == 消息线程为:main
 * 收到消息==100 == 消息线程为:main
 * 收到消息==200 == 消息线程为:main
 * 收到消息==0 == 消息线程为:main
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testStartWith() {
    Observable.range(0, 3).startWithArray(100, 200).startWith(500)
            .startWith(Observable.range(15, 1)).subscribe(mObserver);
}

ConnectableObservable

ConnectableObservable 继承自 Observable ,然而它并不是在调用 subscribe() 的时候发射数据,而是只有对其使用 connect 操作符它才会发射数据,所以可以用来更灵活地控制数据发射的时机,另外,ConnectableObservableHot Observable

publish

 /**
 * 发射的数据是共享的 ,默认在 Computation 调度器上
 * 
 * 结果如下;
 * 
 * 第1个订阅:0->time:08:04:22 == 消息线程为:main
 * 第1个订阅:1->time:08:04:22 == 消息线程为:main
 * 第1个订阅:2->time:08:04:23 == 消息线程为:main
 * 第1个订阅:3->time:08:04:24 == 消息线程为:main
 * 第2个订阅:3->time:08:04:24 == 消息线程为:RxComputationThreadPool-1
 * 第1个订阅:4->time:08:04:25 == 消息线程为:main
 * 第2个订阅:4->time:08:04:25 == 消息线程为:RxComputationThreadPool-1
 * 第1个订阅:5->time:08:04:26 == 消息线程为:main
 * 第2个订阅:5->time:08:04:26 == 消息线程为:RxComputationThreadPool-1
 * 
 */
private void testPublish() {
    final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss", Locale.CANADA);
    Observable<Long> oldObservable = Observable.interval(1, TimeUnit.SECONDS).take(6);

    // 转换成 ConnectableObservable
    ConnectableObservable<Long> connectableObservable = oldObservable.publish();

    // 可以在 subscribe 之前先调用 connect ,等到 subscribe 之后就会马上发射数据了
    connectableObservable.connect();
    Disposable disposable = connectableObservable.subscribeOn(Schedulers.io()).
            observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.d(TAG, "第1个订阅:" + aLong + "->time:" + format.format(new Date()) + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });

    // 延迟3秒再订阅
    connectableObservable.delaySubscription(3, TimeUnit.SECONDS).doOnDispose(new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "解除订阅了" + " == 解除订阅的线程为:" + Thread.currentThread().getName());
        }
    }).subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {
            addDispose(d);
        }

        @Override
        public void onNext(Long aLong) {
            Log.d(TAG, "第2个订阅:" + aLong + "->time:" + format.format(new Date()) + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });


    // 调用 connect 才会发射数据,subscribe 是不会发射数据的,可以更加灵活的控制发射时机,ConnectableObservable 是 Hot Observable
    // connectableObservable.connect();

    addDispose(disposable);
}

replay

 /**
 * 跟 publish 类似,也是转换成 ConnectableObservable ,但是观察者无论何时开始订阅,都能收到所有的数据(变成了 Cold ?)
 * <p>
 * 第1个订阅:0->time:12:28:31 == 消息线程为:main
 * 第1个订阅:1->time:12:28:32 == 消息线程为:main
 * 第1个订阅:2->time:12:28:33 == 消息线程为:main
 * 第2个订阅:0->time:12:28:33 == 消息线程为:RxComputationThreadPool-2
 * 第2个订阅:1->time:12:28:33 == 消息线程为:RxComputationThreadPool-2
 * 第2个订阅:2->time:12:28:33 == 消息线程为:RxComputationThreadPool-2
 * 第1个订阅:3->time:12:28:34 == 消息线程为:main
 * 第2个订阅:3->time:12:28:34 == 消息线程为:RxComputationThreadPool-1
 * 第1个订阅:4->time:12:28:35 == 消息线程为:main
 * 第2个订阅:4->time:12:28:35 == 消息线程为:RxComputationThreadPool-1
 * 第2个订阅:5->time:12:28:36 == 消息线程为:RxComputationThreadPool-1
 * 第1个订阅:5->time:12:28:36 == 消息线程为:main
 */
private void testReplay() {
    final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss", Locale.CANADA);
    Observable<Long> oldObservable = Observable.interval(1, TimeUnit.SECONDS).take(6);

    // 转换成 ConnectableObservable
    // 最好先限定缓存的大小,否则缓存的数据会占用很大的内存,可能会造成OOM
    ConnectableObservable<Long> connectableObservable = oldObservable.replay(100);

    // 可以在 subscribe 之前先调用 connect ,等到 subscribe 之后就会马上发射数据了
    connectableObservable.connect();
    Disposable disposable = connectableObservable.subscribeOn(Schedulers.io()).
            observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.d(TAG, "第1个订阅:" + aLong + "->time:" + format.format(new Date()) + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });

    // 延迟3秒再订阅
    Disposable disposable2 = connectableObservable.delaySubscription(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.d(TAG, "第2个订阅:" + aLong + "->time:" + format.format(new Date()) + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });

    addDispose(disposable, disposable2);
}

refCount

refCount 操作符是将 ConnectableObservable 转换成普通的 Observable ,同时又保持了Hot Observable 的特征。特别注意的是,要所有的订阅者都取消订阅后,在重新订阅时数据流才会从头开始发射,如果只有部分订阅者取消,则取消的订阅者或新的订阅者再次订阅的时候,不会再头开始发射,从当前数据发射并共享。

/**
 * 调用了 refCount 操作符之后,变成普通的 Observable ,不再需要调用 connect 来触发数据发射
 * 
 * 结果如下;
 * 
 * RefCountObservable第1个订阅:0->time:08:19:33
 * RefCountObservable第1个订阅:1->time:08:19:34
 * RefCountObservable第1个订阅:2->time:08:19:35
 * RefCountObservable第1个订阅:3->time:08:19:36
 * RefCountObservable第2个订阅:3->time:08:19:36
 * RefCountObservable第1个订阅:4->time:08:19:37
 * RefCountObservable第2个订阅:4->time:08:19:37
 * RefCountObservable第1个订阅:5->time:08:19:38
 * RefCountObservable第2个订阅:5->time:08:19:38
 */
 private void testRefCount() {
    final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss", Locale.CANADA);
    Observable<Long> oldObservable = Observable.interval(1, TimeUnit.SECONDS).take(6);

    // 转换成 ConnectableObservable
    ConnectableObservable<Long> connectableObservable = oldObservable.publish();

    // 转换成普通的 Observable,但是保留了 Hot 的特征,
    // 要所有的订阅者都取消订阅后,在重新订阅时数据流才会从头开始发射
    // 如果只有部分订阅者取消,则取消的订阅者或新的订阅者再次订阅的时候,不会再头开始发射,从当前数据发射并共享
    Observable<Long> refCountObservable = connectableObservable.refCount();

    Disposable refCountdisposable = refCountObservable.subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.d(TAG, "RefCountObservable第1个订阅:" + aLong + "->time:" + format.format(new Date()));
        }
    });

    // 延迟3秒再订阅
    Disposable refCountdisposable2 = refCountObservable.delaySubscription(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.d(TAG, "RefCountObservable第2个订阅:" + aLong + "->time:" + format.format(new Date()));
        }
    });

    addDispose(refCountdisposable, refCountdisposable2);
}