Rxjava变换操作符

Rxjava 变换操作符

RxJava 的变换操作符主要包括如下内容:

  • buffer,定期收集 Observable 的数据放进一个集合,然后发射这些数据集合,而不是一次发射一个。

  • flatMap,将一个发射数据的 Observable 变换为多个 Observable ,然后将它们发射的数据合并后放进一个单独的 Observable ,不保证数据源的顺序性

  • concatMap,将一个发射数据的 Observable 变换为多个 Observable ,然后将它们发射的数据合并后放进一个单独的 Observable ,保证数据源的顺序性

  • map,对 Observable 发射的每一项数据应用一个函数,执行变换操作。

  • groupBy,将 Observable 拆分为 Observable 集合,将原始 Observable 发射的数据按照 KEY 分组,每一个 Observable 发射不同的数据。

  • window,与 buffer 类似,只是它在发射之前把收集到的数据放进单独的 Observable , 而不是放进一个集合。

  • scan,原始 Observable 发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起应用这个函数来产生它自己的第二项数。

示例代码

这里如果使用的是 Observer ,则默认为如下实现:

mObserver = new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
    }
};

buffer 示例

/**
 * 定期收集Observable的数据放进一个集合,然后发射这些数据集合,而不是一次发射一个
 *
 * 结果如下;
 *
 * 收到消息==[0, 1] == 消息线程为:main
 * 收到消息==[5, 6] == 消息线程为:main
 * 收到消息==[10] == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testBuffer() {

    Observable.range(0, 11).buffer(2, 5)
            .subscribe(mObserver);

    // 定期以    List    的形式发射新的数据,每个时间段,收集来自原始 Observable的数据
    Observable.range(0, 11000).buffer(500, TimeUnit.MILLISECONDS, 10)
            .subscribe(mObserver);
}

// 默认会跳过 count 的数量,可以调用两个参数的方法来指定跳过的数量
public final Observable<List<T>> buffer(int count) {
    return buffer(count, count);
}

flatMap 示例

/**
 * 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
 * 
 * 不保证数据源的顺序性
 */
private void testFlatMap() {

    Disposable disposable = Observable.fromArray(mStudentList.toArray(new Student[mStudentList.size()])).flatMap(new Function<Student, ObservableSource<Course>>() {
        @Override
        public ObservableSource<Course> apply(Student student) throws Exception {
            return Observable.fromArray(student.mCourses.toArray(new Course[student.mCourses.size()]))
                    .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
        }
    }).subscribe(new Consumer<Course>() {
        @Override
        public void accept(Course course) throws Exception {
            Log.d(TAG, "flatMap:" + course.name);
        }
    });
}

map 示例

/**
 * 对Observable发射的每一项数据应用一个函数,执行变换操作
 *
 * 结果如下:
 *
 * 收到消息==false == 消息线程为:main
 * 收到消息==false == 消息线程为:main
 * 收到消息==true == 消息线程为:main
 * 收到消息==true == 消息线程为:main
 * 完成 == 完成线程为:main
 * 
 */
private void testMap() {

    Observable.range(0, 10).map(new Function<Integer, Boolean>() {

        @Override
        public Boolean apply(Integer integer) throws Exception {
            return integer > 7;
        }
    }).subscribe(mObserver);
}

groupBy 示例

/**
 * 将 Observable 拆分为 Observable 集合,将原始 Observable 发射的数据按照 KEY 分组,每一个 Observable 发射不同的数据
 * 
 * 结果如下:
 * 
 * GroupBy Key = 1
 * 收到消息==3 == 消息线程为:main
 * GroupBy Key = 0
 * 收到消息==9 == 消息线程为:main
 * 收到消息==15 == 消息线程为:main
 * 收到消息==21 == 消息线程为:main
 * 收到消息==27 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testGroupBy() {
    Disposable groupBy = Observable.range(1, 10).groupBy(new Function<Integer, Integer>() {

        @Override
        public Integer apply(Integer integer) throws Exception {
            // 根据原始数据进行分组,这里生成的是 KEY
            return integer % 2;
        }
    }, new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer integer) throws Exception {
            // 对分完组之后的数据的值进行变换处理,这里对值进行处理
            return integer * 3;
        }
    }).subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
        @Override
        public void accept(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
            Log.d(TAG, "GroupBy Key = " + integerIntegerGroupedObservable.getKey());

            // 只打印奇数值
            if (integerIntegerGroupedObservable.getKey() == 1) {
                integerIntegerGroupedObservable.subscribe(mObserver);
            }
        }
    });
}

上面的示例会根据 KEY 分成两组,所以 accept 方法会回调两次,如果返回的是奇数就打印数据。

scan 示例

/**
 * 原始 Observable 发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起应用这个函数来产生它自己的第二项数
 * 
 * 结果如下:
 * 
 * 1 - 3 - 6 - 10 - 15
 */
private void testScan() {
    Observable.range(1, 5).scan(new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            Log.d(TAG, "apply: integer:" + integer + "  integer2 " + integer2);
            return integer + integer2;
        }
    }).subscribe(mObserver);
}

window 示例

/**
 * 与 buffer 类似,只是它在发射之前把收集到的数据放进单独的Observable, 而不是放进一个集合
 * 
 * 结果如下:
 * 
 * 收到消息==0 == 消息线程为:main
 * 收到消息==1 == 消息线程为:main
 * 完成 == 完成线程为:main
 * 收到消息==2 == 消息线程为:main
 * 收到消息==3 == 消息线程为:main
 * 完成 == 完成线程为:main
 * 收到消息==4 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testWindow() {
    Disposable window = Observable.range(0, 5).window(2).subscribe(new Consumer<Observable<Integer>>() {

        @Override
        public void accept(Observable<Integer> integerObservable) throws Exception {
            integerObservable.subscribe(mObserver);
        }
    });
}