使用 Rxjava 实现 EventBus

第一版实现

第一版 Rxbus 实现 ,完成了最基本的功能,也考虑了多线程的处理,但是没有考虑背压的情况。

public class RxBus1 {

    private final Subject<Object> mBus;

    private RxBus1() {
        mBus = PublishSubject.create().toSerialized();
    }

    public static RxBus1 get() {
        return Holder.BUS;
    }

    public void post(Object obj) {
        mBus.onNext(obj);
    }

    public <T> Observable<T> toObservable(Class<T> tClass) {
        return mBus.ofType(tClass);
    }

    public Observable<Object> toObservable() {
        return mBus;
    }

    private static class Holder {
        private static final RxBus1 BUS = new RxBus1();
    }
}

注册事件:

Disposable disposable = RxBus1.get().toObservable(TestEvent.class).compose(TransformerUtil.io_main()).subscribe(new Consumer<TestEvent>() {
    @Override
    public void accept(TestEvent testEvent) throws Exception {
        mContent.setText(testEvent.content);
    }
});

发送事件:

RxBus1.get().post(new TestEvent("发送消息了"));

第二版实现

第二版 Rxbus 实现 ,完成了最基本的功能,也考虑了多线程的处理,也考虑背压的情况。但是,没有对异常进行处理,一但遇到了异步,那么订阅者就会无法再收到事件。

public class RxBus2 {

    private final FlowableProcessor<Object> mBus;

    private RxBus2() {
        mBus = PublishProcessor.create().toSerialized();
    }

    public static RxBus2 get() {
        return Holder.BUS;
    }

    public void post(Object obj) {
        mBus.onNext(obj);
    }

    public <T> Flowable<T> toFlowable(Class<T> tClass) {
        return mBus.ofType(tClass);
    }

    public Flowable<Object> toFlowable() {
        return mBus;
    }

    public boolean hasSubscribers() {
        return mBus.hasSubscribers();
    }

    private static class Holder {
        private static final RxBus2 BUS = new RxBus2();
    }
}

注册事件:

Disposable disposable2 = RxBus2.get().toFlowable(TestEvent.class).compose(TransformerUtil.io_main_flowable()).subscribe(new Consumer<TestEvent>() {
    @Override
    public void accept(TestEvent testEvent) throws Exception {
        mContent.setText(testEvent.content);
    }
});

发送事件:

RxBus2.get().post(new TestEvent("发送消息了"));

第三版实现

在第二版的基础上,增加了对异常的处理,这里用到了一个第三方库 https://github.com/JakeWharton/RxRelay 。RxRelay 中的各个 Relay ,即是 Observable 类型,也是 Consumer 的类型 ,它们是一个没有 onComplete 和 onError 的 subject 。这个版本没有背压处理,也没有粘性事件处理。

public class RxBus3 {

    private final Relay<Object> mBus;

    private RxBus3() {
        mBus = PublishRelay.create().toSerialized();
    }

    public static RxBus3 get() {
        return Holder.BUS;
    }

    public void post(Object obj) {
        mBus.accept(obj);
    }

    public <T> Observable<T> toObservable(Class<T> tClass) {
        return mBus.ofType(tClass);
    }

    public Observable<Object> toObservable() {
        return mBus;
    }

    public boolean hasSubscribers() {
        return mBus.hasObservers();
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete, onSubscribe);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete, onSubscribe);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                                   Action onComplete) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
    }

    public void unregister(Disposable disposable) {
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
        }
    }

    private static class Holder {
        private static final RxBus3 BUS = new RxBus3();
    }
}

注册事件:

Disposable disposable3 = RxBus3.get().register(TestEvent.class, new Consumer<TestEvent>() {
    @Override
    public void accept(TestEvent testEvent) throws Exception {
        // 空指针,不会 Crash
        String nullPoint = null;
        System.out.println(nullPoint.substring(0));
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable t) throws Exception {
        // 会回调到这里,能够减少 APP 的 crash
        mContent.setText(t.getMessage());
    }
});

发送事件:

new Thread(new Runnable() {
    @Override
    public void run() {
        RxBus3.get().post(new TestEvent(Thread.currentThread().getName() + "发送消息了3"));
    }
}).start();

第四版实现

在第三版的基础上,增加了粘性事件处理。

public class RxBus4 {

    private final Relay<Object> mBus;
    private final Map<Class<?>, Object> mStickyEventMap;

    private RxBus4() {
        mBus = PublishRelay.create().toSerialized();
        mStickyEventMap = new ConcurrentHashMap<>();
    }

    public static RxBus4 get() {
        return Holder.BUS;
    }

    public void post(Object obj) {
        mBus.accept(obj);
    }

    public void postSticky(Object event) {
        synchronized (mStickyEventMap) {
            mStickyEventMap.put(event.getClass(), event);
        }
        mBus.accept(event);
    }

    public <T> Observable<T> toObservable(Class<T> eventType) {
        return mBus.ofType(eventType);
    }

    public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
        synchronized (mStickyEventMap) {
            Observable<T> observable = mBus.ofType(eventType);
            final Object event = mStickyEventMap.get(eventType);
            if (event != null) {
                // 合并成同一个
                return observable.mergeWith(Observable.create(new ObservableOnSubscribe<T>() {
                    @Override
                    public void subscribe(ObservableEmitter<T> emitter) throws Exception {
                        emitter.onNext(eventType.cast(event));
                    }
                }));
            } else {
                return observable;
            }
        }
    }

    public Observable<Object> toObservable() {
        return mBus;
    }

    public boolean hasSubscribers() {
        return mBus.hasObservers();
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete, onSubscribe);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete, onSubscribe);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                                   Action onComplete) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
    }

    public <T> Disposable registerSticky(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
        return toObservableSticky(eventType).observeOn(scheduler).subscribe(onNext);
    }

    public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext) {
        return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
    }

    public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
        return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
    }

    public <T> T removeStickyEvent(Class<T> eventType) {
        synchronized (mStickyEventMap) {
            return eventType.cast(mStickyEventMap.remove(eventType));
        }
    }

    public void removeAllStickyEvents() {
        synchronized (mStickyEventMap) {
            mStickyEventMap.clear();
        }
    }

    public void unregister(Disposable disposable) {
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
        }
    }

    private static class Holder {
        private static final RxBus4 BUS = new RxBus4();
    }
}

注册事件:

Disposable disposable4 = RxBus4.get().registerSticky(TestEvent.class, new Consumer<TestEvent>() {
    @Override
    public void accept(TestEvent testEvent) throws Exception {
        mContent.setText(testEvent.content);
    }
});

发送事件:

RxBus4.get().postSticky(new TestEvent("这是粘性事件"));