Rxjava过滤操作符

Rxjava 过滤操作符

RxJava 的过滤操作符主要包括如下内容:

  • debounce/throttleWithTimeout,过滤一段指定的时间内的数据,发射后续的数据(后续没有数据,发射最后一项数据并回调 onCompleted ),默认在 computation 调度器上执行。

  • distinct,过滤掉重复的数据,只允许还没有发射过的数据项通过。

  • elementAt,只发射第 N 项数据,传入的索引 index 从 0 开始,如果你传递的值为 5 ,那么它会发射第 6 项的数据,返回的是 MaybeObserver 。

  • filter,指定一个函数来过滤数据项,只有通过条件的数据才会被发射。

  • ofType,是 filter 操作符的一个特殊形式,它只发射指定类型的数据。

  • first/firstElement,只发射第一个数据,在没有发射任何数据时发射一个你在参数中指定的默认值。

  • last/lastElement,只发射最后一个数据,在没有发射任何数据时发射一个你在参数中指定的默认值。

  • single,发送数据是一项的话输出此项的值,若是多个数据则抛出异常执行 onError() 方法,如果没有发射任何值,可以返回重载方法传的默认值,返回的是 SingleObserver 。

  • ignoreElements,丢弃所有的正常数据,只发射错误或完成通知,是对数据源的处理,返回 CompletableObserver 对象,而 empty()是创建空的 Observable 。

  • sample/throttleLast,定期发射 Observable 最近的数据,跟 throttleLast 操作符等价,默认在 computation 调度器上执行。

  • throttleFirst,定期发射 Observable 发射的第一项数据,默认在 computation 调度器上执行 。

  • skip,跳过指定规则的某些数据项,发射剩下的数据项。

  • take/takeLast,只发射前或后 N 项数据。

示例代码

这里如果使用的是 Observer ,则默认为如下实现:

mObserver = new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
    }
};

debounce 示例

/**
 * 过滤一段指定的时间内的数据,发射后续的数据(后续没有数据,发射最后一项数据并回调 onCompleted),默认在 computation 调度器上执行
 * throttleWithTimeout 和 debounce 效果一样
 *
 * 结果如下:
 *
 *收到消息==10 == 消息线程为:main
 * 收到消息==11 == 消息线程为:main
 * 收到消息==12 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testDebounce() {
    Observable.range(1, 12).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            Thread.sleep(100 * integer);
            // 从9开始才发射一个数据,9*100 刚好少于1秒的第一个数
            // 注意:先 sleep 后发射数据和先发射数据后 sleep 的区别
            return Observable.just(integer + "");
        }
    }).debounce(1, TimeUnit.SECONDS).subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread()).subscribe(mObserver);
}

distinct 示例

/**
 * 过滤掉重复的数据,只允许还没有发射过的数据项通过
 * 
 * 结果如下;
 * 
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 收到消息==3 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testDistinct() {
    Observable.just(1, 2, 1, 1, 2, 3).distinct().subscribe(mObserver);

    // 它只判定两个相邻的数据是否是不同的,因此只会从序列中过滤掉连续重复的数据
    Observable.just(1, 2, 1, 1, 2, 3).distinctUntilChanged().subscribe(mObserver);

    // 这个函数根据原始Observable发射的数据项产生一个 Key,然后,比较这些Key而不是数据本身,来判定两个数据是否是不同的
    Observable.just(1, 2, 1, 1, 2, 3).distinct(new Function<Number, Boolean>() {
        @Override
        public Boolean apply(Number number) throws Exception {
            return number.intValue() % 2 == 0;
        }
    }).subscribe(mObserver);

    // 和 distinct(Func1)    一样,根据一个函数产生的Key判定两个相邻的数据项是不是不同的
    Observable.just(1, 2, 1, 1, 2, 3).distinctUntilChanged(new Function<Number, Boolean>() {
        @Override
        public Boolean apply(Number number) throws Exception {
            return number.intValue() % 2 == 0;
        }
    }).subscribe(mObserver);
}

elementAt 示例

/**
 * 只发射第N项数据,传入的 index 是个索引,从 0 开始,,如果你传递给    elementAt    的值为5,那么它会发射第 六项的数据,返回 MaybeObserver
 *
 * 结果如下:
 *
 * 完成 == 完成线程为:main
 */
private void testElementAt() {
    //  如果 Index 越界了,会回调 onComplete ,但是如果你传递一 个负数索引值,它仍然会抛出一个    IndexOutOfBoundsException    异常。
    // elementAt

    // 如果 Index 越界了,会回调 onError ,但是如果你传递一 个负数索引值,它仍然会抛出一个    IndexOutOfBoundsException    异常。
    //     elementAtOrError(int)

    //  如果索引值大于数据 项数,它会发射一个默认值(通过额外的参数指定),而不是抛出异常。
    //  但是如果你传递一 个负数索引值,它仍然会抛出一个    IndexOutOfBoundsException    异常。
    // elementAt(int,T)

    Observable.range(0, 9).elementAt(11).subscribe(new MaybeObserver<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(Integer integer) {
            Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
        }
    });
}

filter 示例

/**
 * 指定一个函数来过滤数据项,只有通过条件的数据才会被发射
 * 
 * 结果如下:
 * 
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 收到消息==3 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testFilter() {
    Observable.range(1, 5).filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer < 4;
        }
    }).subscribe(mObserver);
}

ofType 示例

/**
 * 是 filter     操作符的一个特殊形式。它过滤一个Observable只发射指定类型的数据
 *
 * 结果如下:
 *
 * 收到消息==one == 消息线程为:main
 * 收到消息==two == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testOfType() {
    Observable.just(0, "one", 6, 4, "two", 8).ofType(String.class).subscribe(mObserver);
}

first/firstElement 示例

/**
 * 只发射第一个数据
 * 
 * 结果如下:
 *
 * 收到消息==1 == 消息线程为:main
 */
private void testFirst() {
    // 只发射第一个数据,返回一个 MaybeObservable,如果有数据发射,不会再回调 onComplete ,这是 MaybeObservable 的特征
    Disposable firstElement = Observable.just(1, 2, 3).firstElement().subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });

    //  只发射第一个数据,在没有发射任何数据时发射一个你在参数中指定的默认值,返回一个 SingleObservable
    Disposable firstDefault = Observable.<Integer>empty().first(100).subscribe(new Consumer<Integer>() {

        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });
}

single 示例

/**
 * 发送数据是一项的话输出此项的值,若是多个数据则抛出异常执行onError()方法,返回 SingleObserver
 * 如果没有发射任何值,可以用重载方法传的默认值
 *
 * 结果如下;
 *
 * 错误:Sequence contains more than one element! == 错误线程为:main
 */
private void testSingle() {
    Observable.range(0, 20).filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer > 10;
        }
    }).single(100).subscribe(new SingleObserver<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(Integer integer) {
            Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
        }
    });
}

ignoreElements 示例

/**

 * 丢弃所有的正常数据,只发射错误或完成通知,是对数据源的处理,返回 CompletableObserver 对象,而 empty()是创建空的 Observable
 * 
 * 结果如下;
 * 
 * 错误:测试异常 == 错误线程为:main
 */
private void testIgnoreElements() {
    Observable.range(0, 10).concatWith(Observable.error(new Throwable("测试异常")))
            .ignoreElements().subscribe(new CompletableObserver() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
        }
    });
}

throttleFirst 示例

/**
 * 定期发射Observable发射的数据项的第一项 ,默认在 computation 调度器上执行
 *
 * 结果如下;
 *
 * 收到消息==0 == 消息线程为:main
 * 收到消息==5 == 消息线程为:main
 * 收到消息==9 == 消息线程为:main
 * 收到消息==13 == 消息线程为:main
 * 收到消息==17 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testthrottleFirst() {
    Observable.range(0, 20).flatMap(new Function<Integer, ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> apply(Integer integer) throws Exception {
            Thread.sleep(500);
            return Observable.just(integer);
        }
    }).throttleFirst(2, TimeUnit.SECONDS).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()).subscribe(mObserver);
}

skip 示例

private void testSkip() {
    //  忽略Observable发射的前N项数据,只保留之后的数据。
    Observable.interval(1, TimeUnit.SECONDS).skip(2).subscribe(mObserver);

    // 丢弃原始Observable开始的那段时间发射的数据,时长和时间单位通过参数指定
    Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS).skip(5, TimeUnit.SECONDS)
            .subscribe(mObserver);

    // 忽略原始Observable发射的后N项数据,只保留之前的数据。
    Observable.range(0, 10).skipLast(5).subscribe(mObserver);

    // 丢弃在原始Observable的最后一段时间内发射的数据
    Observable.range(0, 100).flatMap(new Function<Integer, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Integer integer) throws Exception {
            Thread.sleep(200);
            return Observable.just(integer);
        }
    }).skipLast(5, TimeUnit.SECONDS).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()).subscribe(mObserver);
}

take 示例

/**
 * take 只发射前面的N项数据 take(1) 等价于 first
 * takeLast 只发射后面的N项数据 takeLast(1) 等价于 last
 */
private void testTake() {
    // 只发射前面4个数据
     Observable.range(1, 5).take(2).subscribe(mObserver);
    // 只发射后面4个数据
    Observable.range(1, 10).takeLast(4).subscribe(mObserver);

    // 只获取3秒内发射的数据
    Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
            .take(3, TimeUnit.SECONDS)
            .subscribe(mObserver);
}