天天看點

RxJava(七)背壓Flowable

RxJava背壓Flowable

1.背壓:在RxJava的異步場景中,被觀察者發送資料太快,以至于操作符和訂閱者來不及處理資料,造成buffer溢出。

2.Flowable:RxJava中專門用來支援背壓,預設隊列大小128,所有操作符強制支援背壓。

3.背壓政策:

public enum BackpressureStrategy{
        MISSING,    //通過create建立的Flowable,需要下遊指定背壓政策
        ERROR,      //放入Flowable異步緩存池的資料超限,将抛出MissingBackpressureException
        BUFFER,     //異步緩存池無大小限制,可無限添加資料,不會抛出異常,但會OOM
        DROP,       //如果Flowable異步緩存池滿了,将抛棄将要添加的資料
        LATEST      //如果Flowable異步緩存池滿了,将抛棄将要添加的資料,但強制添加最後一條資料
    }
           
private void Flowable() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) {
                for (int i = 0; i < 10; i++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        },BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        });
    }
           

對應的操作符:onBackpressureBuffer(); onBackpressureDrop; onBackpressureLatest();

private void Flowable() {
        Flowable.interval(1000, TimeUnit.MILLISECONDS)
                .onBackpressureBuffer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "accept: "+aLong);
            }
        });
    }
           

4.Flowable所支援的建立操作符,用法同Observable建立操作符

private void Flowable() {
        Flowable.amb();
        Flowable.ambArray();
        Flowable.create();
        Flowable.combineLatest();
        Flowable.combineLatestDelayError();
        Flowable.concat();
        Flowable.concatDelayError();
        Flowable.concatArray();
        Flowable.concatArrayDelayError();
        Flowable.defer();
        Flowable.fromArray();
        Flowable.fromIterable();
        Flowable.fromFuture();
        Flowable.fromCallable();
        Flowable.fromPublisher();
        Flowable.generate();
        Flowable.interval();
        Flowable.intervalRange();
        Flowable.just();
        Flowable.merge();
        Flowable.mergeDelayError();
        Flowable.mergeArray();
        Flowable.mergeArrayDelayError();
        Flowable.never();
        Flowable.range();
        Flowable.rangeLong();
        Flowable.sequenceEqual();
        Flowable.switchOnNext();
        Flowable.switchOnNextDelayError();
        Flowable.timer();
        Flowable.using();
        Flowable.zip();
        Flowable.zipArray();
        Flowable.zipIterable();
    }
           

5.操作符用法同Observable變換操作符

Flowable.interval(500,TimeUnit.MILLISECONDS)
        .onBackpressureBuffer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .map(new Function<Long, Long>() {
            @Override
            public Long apply(Long aLong) {
                return aLong + 1;
            }
        });