RxJava背壓Flowable
1.背壓:在RxJava的異步場景中,被觀察者發送資料太快,以至于操作符和訂閱者來不及處理資料,造成buffer溢出。
2.Flowable:RxJava中專門用來支援背壓,預設隊列大小128,所有操作符強制支援背壓。
3.背壓政策:
public enum BackpressureStrategy{
MISSING, //通過create建立的Flowable,需要下遊指定背壓政策
ERROR, //放入Flowable異步緩存池的資料超限,将抛出MissingBackpressureException
BUFFER, //異步緩存池無大小限制,可無限添加資料,不會抛出異常,但會OOM
DROP, //如果Flowable異步緩存池滿了,将抛棄将要添加的資料
LATEST //如果Flowable異步緩存池滿了,将抛棄将要添加的資料,但強制添加最後一條資料
}
private void Flowable() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
}
對應的操作符:onBackpressureBuffer(); onBackpressureDrop; onBackpressureLatest();
private void Flowable() {
Flowable.interval(1000, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "accept: "+aLong);
}
});
}
4.Flowable所支援的建立操作符,用法同Observable建立操作符
private void Flowable() {
Flowable.amb();
Flowable.ambArray();
Flowable.create();
Flowable.combineLatest();
Flowable.combineLatestDelayError();
Flowable.concat();
Flowable.concatDelayError();
Flowable.concatArray();
Flowable.concatArrayDelayError();
Flowable.defer();
Flowable.fromArray();
Flowable.fromIterable();
Flowable.fromFuture();
Flowable.fromCallable();
Flowable.fromPublisher();
Flowable.generate();
Flowable.interval();
Flowable.intervalRange();
Flowable.just();
Flowable.merge();
Flowable.mergeDelayError();
Flowable.mergeArray();
Flowable.mergeArrayDelayError();
Flowable.never();
Flowable.range();
Flowable.rangeLong();
Flowable.sequenceEqual();
Flowable.switchOnNext();
Flowable.switchOnNextDelayError();
Flowable.timer();
Flowable.using();
Flowable.zip();
Flowable.zipArray();
Flowable.zipIterable();
}
5.操作符用法同Observable變換操作符
Flowable.interval(500,TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<Long, Long>() {
@Override
public Long apply(Long aLong) {
return aLong + 1;
}
});