Rxjava条件和布尔操作符

Rxjava 条件和布尔操作符

RxJava 的条件和布尔操作符主要包括如下内容:

  • all,判断 Observable 发射的所有数据是否都满足某个条件,返回 SingleObserver ,满足则返回 true ,否则返回 false 。

  • amb,给定两个或多个 Observable ,它只发射第一个发射数据或通知的那个 Observable 的所有数据,不管发射的是一项数据还是一个 onError 或 onCompleted 通知,Amb 将丢弃其它所有 Observable 的发射物。

  • contains,判断一个 Observable 是否发射了一个指定的值,返回 SingleObserver ,满足则返回 true ,否则返回 false 。

  • defaultIfEmpty/switchIfEmpty,发射来自原始 Observable 的值,如果原始 Observable 没有发射任何值,就发射一个默认值。

  • sequenceEqual,判断两个 Observable 是否发射相同的数据序列,返回 SingleObserver 。如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射 true ,否则发射 false 。

  • skipUntil,丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一项数据,然后发射原始 Observable 的剩余数据。

  • skipWhile,丢弃 Observable 发射的数据,直到一个指定的条件不成立,然后发射原始 Observable 的剩余数据。

  • takeUntil,发射原始 Observable 发射的数据,直到第二个 Observable 发射了一项数据,然后停止发射。最后一次不满足条件的数据,还是会发射出去的。

  • takeWhile,发射 Observable 发射的数据,直到一个指定的条件不成立,然后停止发射原始 Observable 的剩余数据。

示例代码

这里如果使用的是 Observer ,则默认为如下实现:

mObserver = new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
    }
};

all 示例

/**
 * 判断Observable发射的所有数据是否都满足某个条件,返回 SingleObserver,满足则返回 true ,否则返回 false
 *
 * 结果如下:
 *
 * 收到消息==false == 消息线程为:main
 */
private void testAll() {
    Observable.range(0, 5).all(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer > 3;
        }
    }).subscribe(new SingleObserver<Boolean>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(Boolean aBoolean) {
            Log.d(TAG, "收到消息==" + aBoolean + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }
    });
}

contains 示例

/**
 * 判断一个Observable是否发射了一个指定的值,返回 SingleObserver,满足则返回 true ,否则返回 false
 * 
 * 结果如下;
 * 
 * 收到消息==true == 消息线程为:main
 */
private void testContains() {
    // IsEmpty 用于判定原始Observable是否没有发射任何数据。
    Observable.just(1, 2, 3).contains(2).subscribe(new SingleObserver<Boolean>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(Boolean aBoolean) {
            Log.d(TAG, "收到消息==" + aBoolean + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }
    });
}

sequenceEqual 示例

/**
 * 判断两个 Observable 是否发射相同的数据序列
 *
 * 如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射true,否则发射 false。
 *
 * 结果如下;
 *
 * 收到消息==true == 消息线程为:main
 */
private void testSequenceEqual() {
    Observable<Integer> observableA = Observable.just(1, 2, 3);
    Observable<Integer> observableB = Observable.just(1, 2, 3);
    Disposable disposable = Observable.sequenceEqual(observableA, observableB).subscribe(new Consumer<Boolean>() {
        @Override
        public void accept(Boolean aBoolean) throws Exception {
            Log.d(TAG, "收到消息==" + aBoolean + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });

    // 三个参数,传递一个用于比较两个数据项是否相同的函数
    Disposable disposable2 = Observable.sequenceEqual(observableA, observableB, new BiPredicate<Integer, Integer>() {
        @Override
        public boolean test(Integer integer, Integer integer2) throws Exception {
            return integer == integer2;
        }
    }).subscribe(new Consumer<Boolean>() {
        @Override
        public void accept(Boolean aBoolean) throws Exception {
            Log.d(TAG, "收到消息==" + aBoolean + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });
}

amb 示例

/**
 * 给定两个或多个Observables,它只发射第一个发射数据或通知的那个Observable的所有数据,
 * 不管发射的是一项数据还是一个 onError 或 onCompleted 通知,Amb 将丢弃其它所有 Observable 的发射物。
 *
 * 结果如下;
 *
 * 收到消息==4 == 消息线程为:RxComputationThreadPool-1
 * 收到消息==5 == 消息线程为:RxComputationThreadPool-1
 * 收到消息==6 == 消息线程为:RxComputationThreadPool-1
 * 完成 == 完成线程为:RxComputationThreadPool-1
 */
private void testAmb() {
    Observable<Integer> observableA = Observable.just(1, 2, 3).delay(500, TimeUnit.MILLISECONDS);
    Observable<Integer> observableB = Observable.just(4, 5, 6).delay(100, TimeUnit.MILLISECONDS);

    Observable.ambArray(observableA, observableB).subscribe(mObserver);
    // 这里等价于上面的语句
    observableA.ambWith(observableB).subscribe(mObserver);
}

defaultIfEmpty/switchIfEmpty 示例

/**
 * 发射来自原始Observable的值,如果原始Observable没有发射任何值,就发射一个默认值
 * 结果如下;
 * 
 * 收到消息==100 == 消息线程为:main
 */
private void testDefaultIfEmpty() {
    // 其实内部就是调的 switchIfEmpty 操作符并且用了just 来发射单个数据
    Observable.just(1, 2, 3).filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer > 3;
        }
    }).defaultIfEmpty(100).subscribe(mObserver);

    // switchIfEmpty 如果原始Observable没有发射数据,它发射一个备用Observable的发射物
    Observable.just(1, 2, 3).filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer > 3;
        }
    }).switchIfEmpty(Observable.just(100)).subscribe(mObserver);
}

skipUntil 示例

/**
 * 丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一项数据,然后发射原始 Observable 的剩余数据
 *
 * 结果如下;
 *
 * 收到消息==4 == 消息线程为:RxComputationThreadPool-2
 * 收到消息==5 == 消息线程为:RxComputationThreadPool-2
 * 完成 == 完成线程为:RxComputationThreadPool-2
 */
private void testSkipUntil() {
    Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
            .skipUntil(Observable.timer(3, TimeUnit.SECONDS))
            .subscribe(mObserver);
}

skipWhile 示例

/**
 * 丢弃Observable发射的数据,直到一个指定的条件不成立,然后发射原始 Observable 的剩余数据
 * 
 * 结果如下;
 * 
 * 收到消息==4 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testSkipWhile() {
    Observable.range(0, 5).skipWhile(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer < 4;
        }
    }).subscribe(mObserver);
}

takeUntil 示例

/**
 * 发射原始 Observable 发射的数据,直到第二个 Observable 发射了一项数据,然后停止发射。最后一次不满足条件的数据,还是会发射出去的。
 * 
 * 结果如下;
 * 
 * 收到消息==0 == 消息线程为:main
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 收到消息==3 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testTakeUntil() {
    Observable.range(0, 10).takeUntil(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer > 2;
        }
    }).subscribe(mObserver);

    Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
            .takeUntil(Observable.timer(3, TimeUnit.SECONDS))
            .subscribe(mObserver);
}

takeWhile 示例

/**
 * 发射Observable发射的数据,直到一个指定的条件不成立
 * 
 * 结果如下;
 * 
 * 收到消息==0 == 消息线程为:main
 * 收到消息==1 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testTakeWhile() {
    Observable.range(0, 10).takeWhile(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer < 2;
        }
    }).subscribe(mObserver);
}