Rxjava 创建操作符
RxJava 的创建操作符主要包括如下内容:
create,通过调用观察者的方法从头创建一个 Observable 。
defer,直到有观察者订阅时才创建 Observable ,并且为每个观察者创建一个新的 Observable 。
empty,创建一个不发射任何数据直接通知完成的 Observable 。
never,创建一个不发射数据也不终止的 Observable 。
error,创建一个不发射数据直接以一个错误终止的 Observable ,需要一个 Throwable 参数。
from,将其它种类的对象和数据类型转换为 Observable ,对于 Iterable 和数组,产生的 Observable 会发射 Iterable 或数组的每一项数据。
just,将单个数据转换为发射那个数据的 Observable 。
interval,创建一个按给定时间间隔发射整数序列的 Observable,默认在 computation 调度器上执行,操作 UI 需要线程切换。
timer,它在指定延迟之后发射一个零值,默认在 computation 调度器上执行,操作 UI 需要线程切换。
range,发射一个范围内的有序整数序列,你可以指定范围的起始和长度。
Repeat,重复地发射数据,如果不指定次数,默认无限,如果传 0 ,则不发射任何数据,直接回调完成,如果传入负数,会报错。如果传入的值大于 0 ,则会再重复地发射 n-1 次,即如果传 1 ,则跟正常发射的内容一样,再重复发射 0 次。不是创建操作符。
示例代码
这里如果使用的是 Observer ,则默认为如下实现:
mObserver = new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.d(TAG, "收到消息==" + o + " == 消息线程为:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "错误:" + e.getMessage() + " == 错误线程为:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "完成" + " == 完成线程为:" + Thread.currentThread().getName());
}
};
create 示例
/**
* 通过调用观察者的方法从头创建一个Observable
*
* 结果如下:
*
* 收到消息==测试数据 == 消息线程为:main
* 完成 == 完成线程为:main
*/
public void testCreateObservable() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
if (!emitter.isDisposed()) {
emitter.onNext(data);
emitter.onComplete();
// 这里发射的数据不再接收了
emitter.onNext("新测试数据");
emitter.onNext("新测试数据");
}
}
}).subscribe(mObserver);
}
defer 示例
private String data = "测试数据";
/**
* 直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable
*
* 结果如下:
*
* 收到消息==新的测试数据 == 消息线程为:main
* 完成 == 完成线程为:main
*/
private void testDeferObservable() {
Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just(data);
}
});
data = "新的测试数据";
observable.subscribe(mObserver);
}
empty/never/error 示例
/**
* 创建一个不发射任何数据直接通知完成的 Observable
*
* 结果如下:
*
* 完成 == 完成线程为:main
*/
private void testEmptyObservable() {
Observable.empty().subscribe(mObserver);
}
/**
* 创建一个不发射数据也不终止的Observable
* 没有任何内容输出
*/
private void testNeverObservable() {
Observable.never().subscribe(mObserver);
}
/**
* 创建一个不发射数据直接以一个错误终止的Observable,需要一 个 Throwable 参数
*
* 结果如下:
*
* 错误:testErrorObservable == 错误线程为:main
*/
private void testErrorObservable() {
Observable.error(new Throwable("testErrorObservable")).observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io()).subscribe(mObserver);
}
from 示例
/**
* 将其它种类的对象和数据类型转换为Observable,对于 Iterable 和数组,产生的 Observable 会发射 Iterable 或数组的每一项数据
*
* 结果如下:
*
* 收到消息==1 == 消息线程为:main
* 收到消息==2 == 消息线程为:main
* 收到消息==3 == 消息线程为:main
* 完成 == 完成线程为:main
*
*/
private void testFromObservable() {
String[] array = {"1", "2", "3"};
Observable.fromArray(array).subscribe(mObserver);
}
just 示例
/**
* 将单个数据转换为发射那个数据的Observable
*
* 结果如下:
*
* 收到消息==1 == 消息线程为:main
* 收到消息==2 == 消息线程为:main
* 收到消息==3 == 消息线程为:main
*/
private void testJustObservable() {
String[] array = {"1", "2", "3"};
Disposable disposable = Observable.just(array).subscribe(new Consumer<String[]>() {
@Override
public void accept(String[] strings) throws Exception {
for (String string : strings) {
Log.d(TAG, "收到消息==" + string + " == 消息线程为:" + Thread.currentThread().getName());
}
}
});
}
interval 示例
/**
* 创建一个按固定时间间隔发射整数序列的Observable,默认在 computation 调度器上执行,操作UI需要线程切换
*
* 结果如下:
*
* 收到消息==0 == 消息线程为:main
* 收到消息==1 == 消息线程为:main
* 收到消息==2 == 消息线程为:main
* ......
*/
private void testIntervalObservable() {
Observable.interval(1, 1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).
subscribe(mObserver);
// 按给定的初始值和数量,再按固定时间间隔发射整数序列
Observable.intervalRange(10, 60, 0, 1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).
subscribe(mObserver);
}
timer 示例
/**
* 它在指定延迟之后发射一个零值,默认在 computation 调度器上执行,操作UI需要线程切换
*
* 结果如下(2秒后才有输出):
*
* 收到消息==0 == 消息线程为:main
* 完成 == 完成线程为:main
*/
private void testTimerObservable() {
Observable.timer(2, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(mObserver);
}
range/rangeLong 示例
/**
* 发射一个范围内的有序整数序列,你可以指定范围的起始和长度
*
* 结果如下:
*
* 收到消息==5 == 消息线程为:main
* 收到消息==6 == 消息线程为:main
* 完成 == 完成线程为:main
*/
private void testRangeObservable() {
Observable.range(5, 2).subscribe(mObserver);
Observable.rangeLong(5, 2).subscribe(mObserver);
}
repeat 示例
/**
* 重复地发射数据,如果不指定次数,默认无限,如果传0,则不发射任何数据,直接回调完成,如果传入负数,会报错
* 如果传入的值大于0,则会再重复地发射 n-1 次,即如果传1,则跟正常发射的内容一样,再重复发射0次
*
* 结果如下:
*
* 收到消息==1 == 消息线程为:main
* 收到消息==2 == 消息线程为:main
* 完成 == 完成线程为:main
*/
private void testRepeatObservable() {
Observable.range(1, 2).repeat(1).subscribe(mObserver);
}