Rxjava的背压机制

Rxjava的背压机制

在 RxJava 中,会遇到被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息,这就是典型的背压场景。

背压: 是指在异步场景下,被观察者发送事件速度远快于观察者处理的速度,从而导致下游的 buffer 溢出的现象。

注意:必须要在异步的场景下才会出现,即被观察者和观察者处于不同的线程中。

Rxjava的背压策略

在 RxJava2.x 中,用 Flowable 来专门支持背压,默认队列为 128 ,并且要求所有的操作符强制支持背压

Flowable 共有 5 种背压策略:

public enum BackpressureStrategy {

    // 此策略表示,通过 Create方法创建的 Flowable没有指定背压策略,不会对通过 OnNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略
    MISSING,

    // 此策略表示,如果放入 Flowable的异步缓存池中的数据超限了,则会抛出 Missing Backpressure Exception异常
    ERROR,

    // 此策略表示, Flowable 的异步缓存池同 Observable 一样,没有固定大小,可以无限制添加数据,不会抛出 Missing Backpressure Exception异常,但会导致 OOM(Out Of Memory)
    BUFFER,

    // 此策略表示,如果 Flowable 的异步缓存池满了,则会丢掉将要放入缓存池的数据
    DROP,

    // 此策略表示,如果 Flowable 的异步缓存池满了,则会丢掉将要放入缓存池的数据,但是总能获取到最后最新的数据
    LATEST
}

没有背压处理的情况

/**
 * 演示没有背压处理的情况 ,会内存溢出
 */
private void testBackPressureExpection() {
    Disposable disposable = Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; ; i++) {
                emitter.onNext(i);
            }
        }
    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            TimeUnit.SECONDS.sleep(2);
            Log.d(TAG, "收到消息==" + integer + " == 消息线程为:" + Thread.currentThread().getName());
        }
    });
}

MissingBackpressureException 异常的情况

/**
 * 会报 MissingBackpressureException,默认缓存量为128
 */
private void testStrategyError() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 129; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

MISSING 策略的情况

/**
 * MISSING 指没有用任何策略,会报 MissingBackpressureException
 * 可以通过 onBackpressureDrop() 、onBackpressureBuffer() 等设置策略
 * 如果是系统的操作符,为了避免背压,也可以用 onBackpressureDrop() 、onBackpressureBuffer() 等设置策略
 */
private void testStrategyMissing() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 129; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.MISSING).onBackpressureBuffer().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });

}

测试传入的策略

/**
 * Latest 当缓存满了会丢弃数据,但总能获取到最后最新的数据
 * Drop 当缓存满了会丢弃数据,获取的是丢弃部分数据后缓存的数据,不是最后最新的
 */
public void testStrategy(BackpressureStrategy strategy) {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 10000; i++) {
                emitter.onNext(i);
            }
        }
    }, strategy).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                    request();
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

测试同步的 requested

/**
 * 测试同步的 requested ,默认为 0
 */
public void testSyncRequested() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "First requested = " + emitter.requested());
            boolean flag;
            for (int i = 0; ; i++) {
                flag = false;
                // 等于0就不发射数据了,即使策略是 Error ,也没有超出默认的缓存量
                while (emitter.requested() == 0) {
                    if (!flag) {
                        Log.d(TAG, "Oh no! I can't emit value!");
                        flag = true;
                    }
                }
                emitter.onNext(i);
                Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
            }
        }
    }, BackpressureStrategy.ERROR)
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                    request();
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

测试异步的 requested

/**
 * 异步的情况下,requested 默认为128,即默认可以处理128个数据
 * 下游能够处理,即 requested !=0 时,才会发送数据
 * 如果等于0,会根据策略 drop 、latest 、buffer 、error 等来处理数据,等待下一次的 request
 * <p>
 * 注意:
 * 同步的时候,下游 request 多少,上游的 requested 就是多少
 * 异步的时候,下游请求的多少跟上游无关,上游的 requested 默认是128,每当取出96个数据时,requested 增加95
 */
public void testAsynRequested() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "First requested = " + emitter.requested());
            boolean flag;
            for (int i = 0; ; i++) {
                flag = false;
                // 等于0就不发射数据了,即使策略是 Error ,也没有超出默认的缓存量
                while (emitter.requested() == 0) {
                    if (emitter.isCancelled()) {
                        break;
                    }
                    if (!flag) {
                        Log.d(TAG, "Oh no! I can't emit value!");
                        flag = true;
                    }
                }
                emitter.onNext(i);
                Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
            }
        }
    }, BackpressureStrategy.ERROR)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}