第一版实现
第一版 Rxbus 实现 ,完成了最基本的功能,也考虑了多线程的处理,但是没有考虑背压的情况。
public class RxBus1 {
private final Subject<Object> mBus;
private RxBus1() {
mBus = PublishSubject.create().toSerialized();
}
public static RxBus1 get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Observable<T> toObservable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Observable<Object> toObservable() {
return mBus;
}
private static class Holder {
private static final RxBus1 BUS = new RxBus1();
}
}
注册事件:
Disposable disposable = RxBus1.get().toObservable(TestEvent.class).compose(TransformerUtil.io_main()).subscribe(new Consumer<TestEvent>() {
@Override
public void accept(TestEvent testEvent) throws Exception {
mContent.setText(testEvent.content);
}
});
发送事件:
RxBus1.get().post(new TestEvent("发送消息了"));
第二版实现
第二版 Rxbus 实现 ,完成了最基本的功能,也考虑了多线程的处理,也考虑背压的情况。但是,没有对异常进行处理,一但遇到了异步,那么订阅者就会无法再收到事件。
public class RxBus2 {
private final FlowableProcessor<Object> mBus;
private RxBus2() {
mBus = PublishProcessor.create().toSerialized();
}
public static RxBus2 get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Flowable<T> toFlowable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Flowable<Object> toFlowable() {
return mBus;
}
public boolean hasSubscribers() {
return mBus.hasSubscribers();
}
private static class Holder {
private static final RxBus2 BUS = new RxBus2();
}
}
注册事件:
Disposable disposable2 = RxBus2.get().toFlowable(TestEvent.class).compose(TransformerUtil.io_main_flowable()).subscribe(new Consumer<TestEvent>() {
@Override
public void accept(TestEvent testEvent) throws Exception {
mContent.setText(testEvent.content);
}
});
发送事件:
RxBus2.get().post(new TestEvent("发送消息了"));
第三版实现
在第二版的基础上,增加了对异常的处理,这里用到了一个第三方库 https://github.com/JakeWharton/RxRelay 。RxRelay 中的各个 Relay ,即是 Observable 类型,也是 Consumer 的类型 ,它们是一个没有 onComplete 和 onError 的 subject 。这个版本没有背压处理,也没有粘性事件处理。
public class RxBus3 {
private final Relay<Object> mBus;
private RxBus3() {
mBus = PublishRelay.create().toSerialized();
}
public static RxBus3 get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.accept(obj);
}
public <T> Observable<T> toObservable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Observable<Object> toObservable() {
return mBus;
}
public boolean hasSubscribers() {
return mBus.hasObservers();
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext);
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
Action onComplete, Consumer onSubscribe) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete, onSubscribe);
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
Action onComplete) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete);
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
Action onComplete, Consumer onSubscribe) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete, onSubscribe);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
Action onComplete) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
}
public void unregister(Disposable disposable) {
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
}
private static class Holder {
private static final RxBus3 BUS = new RxBus3();
}
}
注册事件:
Disposable disposable3 = RxBus3.get().register(TestEvent.class, new Consumer<TestEvent>() {
@Override
public void accept(TestEvent testEvent) throws Exception {
// 空指针,不会 Crash
String nullPoint = null;
System.out.println(nullPoint.substring(0));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable t) throws Exception {
// 会回调到这里,能够减少 APP 的 crash
mContent.setText(t.getMessage());
}
});
发送事件:
new Thread(new Runnable() {
@Override
public void run() {
RxBus3.get().post(new TestEvent(Thread.currentThread().getName() + "发送消息了3"));
}
}).start();
第四版实现
在第三版的基础上,增加了粘性事件处理。
public class RxBus4 {
private final Relay<Object> mBus;
private final Map<Class<?>, Object> mStickyEventMap;
private RxBus4() {
mBus = PublishRelay.create().toSerialized();
mStickyEventMap = new ConcurrentHashMap<>();
}
public static RxBus4 get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.accept(obj);
}
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
mBus.accept(event);
}
public <T> Observable<T> toObservable(Class<T> eventType) {
return mBus.ofType(eventType);
}
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = mBus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
// 合并成同一个
return observable.mergeWith(Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
emitter.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
public Observable<Object> toObservable() {
return mBus;
}
public boolean hasSubscribers() {
return mBus.hasObservers();
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext);
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
Action onComplete, Consumer onSubscribe) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete, onSubscribe);
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
Action onComplete) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete);
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
Action onComplete, Consumer onSubscribe) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete, onSubscribe);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
Action onComplete) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
}
public <T> Disposable registerSticky(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
return toObservableSticky(eventType).observeOn(scheduler).subscribe(onNext);
}
public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext) {
return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
}
public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
}
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.remove(eventType));
}
}
public void removeAllStickyEvents() {
synchronized (mStickyEventMap) {
mStickyEventMap.clear();
}
}
public void unregister(Disposable disposable) {
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
}
private static class Holder {
private static final RxBus4 BUS = new RxBus4();
}
}
注册事件:
Disposable disposable4 = RxBus4.get().registerSticky(TestEvent.class, new Consumer<TestEvent>() {
@Override
public void accept(TestEvent testEvent) throws Exception {
mContent.setText(testEvent.content);
}
});
发送事件:
RxBus4.get().postSticky(new TestEvent("这是粘性事件"));