Rxjava创建操作符

Rxjava 创建操作符

RxJava 的创建操作符主要包括如下内容:

  • create,通过调用观察者的方法从头创建一个 Observable 。

  • defer,直到有观察者订阅时才创建 Observable ,并且为每个观察者创建一个新的 Observable 。

  • empty,创建一个不发射任何数据直接通知完成的 Observable 。

  • never,创建一个不发射数据也不终止的 Observable 。

  • error,创建一个不发射数据直接以一个错误终止的 Observable ,需要一个 Throwable 参数。

  • from,将其它种类的对象和数据类型转换为 Observable ,对于 Iterable 和数组,产生的 Observable 会发射 Iterable 或数组的每一项数据。

  • just,将单个数据转换为发射那个数据的 Observable 。

  • interval,创建一个按给定时间间隔发射整数序列的 Observable,默认在 computation 调度器上执行,操作 UI 需要线程切换。

  • timer,它在指定延迟之后发射一个零值,默认在 computation 调度器上执行,操作 UI 需要线程切换。

  • range,发射一个范围内的有序整数序列,你可以指定范围的起始和长度。

  • Repeat,重复地发射数据,如果不指定次数,默认无限,如果传 0 ,则不发射任何数据,直接回调完成,如果传入负数,会报错。如果传入的值大于 0 ,则会再重复地发射 n-1 次,即如果传 1 ,则跟正常发射的内容一样,再重复发射 0 次。不是创建操作符。

示例代码

这里如果使用的是 Observer ,则默认为如下实现:

mObserver = new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
    }
};

create 示例

/**
 * 通过调用观察者的方法从头创建一个Observable
 * 
 * 结果如下:
 * 
 * 收到消息==测试数据 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
public void testCreateObservable() {
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            if (!emitter.isDisposed()) {
                emitter.onNext(data);
                emitter.onComplete();
                // 这里发射的数据不再接收了
                emitter.onNext("新测试数据");
                emitter.onNext("新测试数据");
            }
        }
    }).subscribe(mObserver);
}

defer 示例

private String data = "测试数据";

/**
 * 直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable
 * 
 * 结果如下:
 * 
 * 收到消息==新的测试数据 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testDeferObservable() {
    Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
        @Override
        public ObservableSource<? extends String> call() throws Exception {
            return Observable.just(data);
        }
    });
    data = "新的测试数据";
    observable.subscribe(mObserver);
}

empty/never/error 示例

/**
 * 创建一个不发射任何数据直接通知完成的 Observable
 * 
 * 结果如下:
 * 
 * 完成 == 完成线程为:main
 */
private void testEmptyObservable() {
  Observable.empty().subscribe(mObserver);
}

/**
 * 创建一个不发射数据也不终止的Observable
 * 没有任何内容输出
 */
private void testNeverObservable() {
  Observable.never().subscribe(mObserver);
}

/**
 * 创建一个不发射数据直接以一个错误终止的Observable,需要一 个 Throwable 参数
 * 
 * 结果如下:
 * 
 * 错误:testErrorObservable == 错误线程为:main
 */
private void testErrorObservable() {
    Observable.error(new Throwable("testErrorObservable")).observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io()).subscribe(mObserver);
}

from 示例

/**
 * 将其它种类的对象和数据类型转换为Observable,对于 Iterable 和数组,产生的 Observable 会发射 Iterable 或数组的每一项数据
 *
 * 结果如下:
 * 
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 收到消息==3 == 消息线程为:main
 * 完成 == 完成线程为:main
 * 
 */
private void testFromObservable() {
    String[] array = {"1", "2", "3"};
    Observable.fromArray(array).subscribe(mObserver);
}

just 示例

/**
 * 将单个数据转换为发射那个数据的Observable
 *
 * 结果如下:
 * 
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 收到消息==3 == 消息线程为:main
 */
private void testJustObservable() {
    String[] array = {"1", "2", "3"};
    Disposable disposable = Observable.just(array).subscribe(new Consumer<String[]>() {
        @Override
        public void accept(String[] strings) throws Exception {
            for (String string : strings) {
                Log.d(TAG, "收到消息==" + string + " == 消息线程为:" + Thread.currentThread().getName());
            }
        }
    });
}

interval 示例

/**
 * 创建一个按固定时间间隔发射整数序列的Observable,默认在 computation    调度器上执行,操作UI需要线程切换
 * 
 * 结果如下:
 *
 * 收到消息==0 == 消息线程为:main
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * ......
 */
private void testIntervalObservable() {
    Observable.interval(1, 1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).
            subscribe(mObserver);
    // 按给定的初始值和数量,再按固定时间间隔发射整数序列
    Observable.intervalRange(10, 60, 0, 1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).
            subscribe(mObserver);
}

timer 示例

/**
 * 它在指定延迟之后发射一个零值,默认在 computation 调度器上执行,操作UI需要线程切换
 *
 * 结果如下(2秒后才有输出):
 *
 * 收到消息==0 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testTimerObservable() {
    Observable.timer(2, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(mObserver);
}

range/rangeLong 示例

/**
 * 发射一个范围内的有序整数序列,你可以指定范围的起始和长度
 * 
 * 结果如下:
 * 
 * 收到消息==5 == 消息线程为:main
 * 收到消息==6 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testRangeObservable() {
    Observable.range(5, 2).subscribe(mObserver);
    Observable.rangeLong(5, 2).subscribe(mObserver);
}

repeat 示例

/**
 * 重复地发射数据,如果不指定次数,默认无限,如果传0,则不发射任何数据,直接回调完成,如果传入负数,会报错
 * 如果传入的值大于0,则会再重复地发射 n-1 次,即如果传1,则跟正常发射的内容一样,再重复发射0次
 *
 * 结果如下:
 * 
 * 收到消息==1 == 消息线程为:main
 * 收到消息==2 == 消息线程为:main
 * 完成 == 完成线程为:main
 */
private void testRepeatObservable() {
    Observable.range(1, 2).repeat(1).subscribe(mObserver);
}