作者——閑魚技術鲲鳴
RxJava是Java對于反應式程式設計的一個實作架構,是一個基于事件的、提供實作強大且優雅的異步調用程式的代碼庫。18年以來,由淘寶技術部發起的應用架構更新項目,希望通過反應式架構、全異步化的改造,提升系統整體性能和機器資源使用率,減少網絡延時,資源的重複使用,并為業務快速創新提供靈活的架構支撐。在閑魚的基礎鍊路諸如商品批量更新、訂單批量查詢等,都利用了RxJava的異步程式設計能力。
不過,RxJava是入門容易精通難,一不小心遍地坑。今天來一起看下RxJava的使用方式、基本原理、注意事項。
1.開始之前
讓我們先看下,使用RxJava之前,我們曾經寫過的回調代碼存在的痛點。
當我們的應用需要處理使用者事件、異步調用時,随着流式事件的複雜性和處理邏輯的複雜性的增加,代碼的實作難度将爆炸式增長。比如我們有時需要處理多個事件流的組合、處理事件流的異常或逾時、在事件流結束後做清理工作等,如果需要我們從零實作,勢必要小心翼翼地處理回調、監聽、并發等很多棘手問題。
還有一個被稱作“回調地獄”的問題,描述的是代碼的不可讀性。
Code 1.1
// 示例引自callbackhell.com
fs.readdir(source, function (err, files) {
if (err) {
console.log('Error finding files: ' + err)
} else {
files.forEach(function (filename, fileIndex) {
console.log(filename)
gm(source + filename).size(function (err, values) {
if (err) {
console.log('Error identifying file size: ' + err)
} else {
console.log(filename + ' : ' + values)
aspect = (values.width / values.height)
widths.forEach(function (width, widthIndex) {
height = Math.round(width / aspect)
console.log('resizing ' + filename + 'to ' + height + 'x' + height)
this.resize(width, height).write(dest + 'w' + width + '_' + filename, function(err) {
if (err) console.log('Error writing file: ' + err)
})
}.bind(this))
}
})
})
}
})
以上js代碼有兩個明顯槽點: 1.由于傳入的層層回調方法,代碼結尾出現一大堆的 }) ; 2. 代碼書寫的順序與代碼執行的順序相反:後面出現回調函數會先于之前行的代碼先執行。
而如果使用了RxJava,我們處理回調、異常等将得心應手。
2.引入RxJava
假設現在要異步地獲得一個使用者清單,然後将結果進行處理,比如展示到ui或者寫到緩存,我們使用RxJava後代碼如下:
Code 2.1
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception {
System.out.println(Thread.currentThread().getName() + "----TestRx.subscribe");
List<UserDo> result = userService.getAllUser();
for (UserDo st : result) {emitter.onNext(st);}
}
});
Observable<String> map = observable.map(s -> s.toString());
// 建立訂閱關系
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub1 = " + o)/*更新到ui*/);
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub2 = " + o)/*寫緩存*/,
e-> System.out.println("e = " + e)),
()->System.out.println("finish")));
userService.getAllUser()是一個普通的同步方法,但是我們把它包到了一個Observable中,當有結果傳回時,将user逐個發送至監聽者。第一個監聽者更新ui,第二個監聽者寫到緩存。并且當上遊發生異常時,進行列印;在事件流結束時,列印finish。
另外還可以很友善的配置上遊逾時時間、調用線程池、fallback結果等,是不是非常強大。
需要注意的是,RxJava代碼就像上面例子中看起來很容易上手,可讀性也很強,但是如果了解不充分,很容易出現意想不到的bug:初學者可能會認為,上面的代碼中,一個user清單傳回後,每個元素會被異步地發送給兩個下遊的觀察者,這兩個觀察者在各自的線程内列印結果。但事實卻不是這樣:userService.getAllUser()會被調用兩次(每當建立訂閱關系時方法getAllUser()都會被重新調用),而user清單被查詢出後,會同步的發送給兩個觀察者,觀察者也是同步地列印出每個元素。即sub1 = user1,sub1 = user2,sub1 = user3,sub2 = user1,sub2 = user2,sub2 = user3。
可見,如果沒有其他配置,RxJava預設是同步阻塞的!!!那麼,我們如何使用它的異步非阻塞能力呢,我們接着往下看。
Code 2.2
Observable
.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable fromCallable");
Thread.sleep(1000); // imitate expensive computation
return "event";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.map(i->{
System.out.println(Thread.currentThread().getName() + "----observable map");
return i;
})
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(Thread.currentThread().getName() + "----inputStr=" + str));
System.out.println(Thread.currentThread().getName() + "----end");
Thread.sleep(2000); // <--- wait for the flow to finish. In RxJava the default Schedulers run on daemon threads
我們用Observable.fromCallable()代替code2.1中最底層的Observable.create方法,來建立了一個Observable(即被觀察者)。fromCallable方法建立的是一個lazy的Observable,隻有當有人監聽它時,傳入的代碼才被執行。(關于這一點,我們後面會講,這裡隻是為了展示有很多種建立Observable的方式)。
然後通過subscribeOn(Schedulers.io())指定了被觀察者執行的線程池。observeOn(Schedulers.single())指定了下遊觀察者(map方法實際也是一個觀察者)執行的線程池。map方法如同很多流式程式設計api一樣,将上遊的每個元素轉化成另一個元素。最後又通過observeOn(Schedulers.newThread())制定了目前下遊的觀察者,即最後的subscribe中傳入的觀察者(lambda方式)執行的線程池。
上面的代碼執行後,通過列印的線程名可以看出,被觀察者、map、觀察者均是不同的線程,并且,主線程最後的"end"會先執行,也就是實作了異步非阻塞。
3. 使用方式
本文不是RxJava的接口文檔,不會詳細介紹每個api,隻是簡單講下一些常見或者特殊api,進一步闡述RxJava的能力。
3.1 基本元件
RxJava的核心原理其實非常簡單。可類比觀察者模式。Observable是被觀察者,作為資料源産生資料。Observer是觀察者,消費上遊的資料源。
每個Observable可注冊多個Observer。但是預設情況下,每當有注冊發生時,Observable的生産方法subscribe都會被調用。如果想隻生産一次,可以調用Observable.cached方法。
被觀察者Observable還有多個變體,如Single、Flowable。Single代表隻産生一個元素的資料源。Flowable是支援背壓的資料源。通過背壓設計,下遊監聽者可以向上遊回報資訊,可以達到控制發送速率的功能。
Observable和Observer是通過裝飾器模式層層包裝達到進而串聯起來。轉換API如map等,會建立一個新的ObservableMap(基層自Observable),包裝原始的Observable作為source,而在真正執行時,先做轉換操作,再發給下遊的觀察者。
Scheduler是RxJava為多線程執行提供的支援類,它将可以将生産者或者消費者的執行邏輯包裝成一個Worker,送出到架構提供的公共線程池中,如Schedulers.io()、Schedulers.newThread()等。便于了解,可以将Schedulers類比做線程池,Worker類比做線程池中的線程。可以通過Observable.subscribeOn和Observable.observeOn分别制定被觀察者和觀察者執行的線程,來達到異步非阻塞。
RxJava核心架構圖如下:

3.2 轉換API
- map: 見Code 2.2,一對一轉換,如同很多流式程式設計api一樣,将上遊的每個元素轉化成另一個元素
- flatMap: 一對多轉換,将上遊的每個元素轉化成0到多個元素。類比Java8:Stream.flatMap内傳回的是stream,Observerable.flatMap内傳回的是Observerable。注意,本方法非常強大,很多api底層都是基于此方法。并且由于flatMap傳回的多個Observerable是互相獨立的,可以基于這個特點,實作并發。
3.3 組合API
- merge:将兩個事件流合并成一個時間流,合并後的事件流的順序,與上流兩個流中元素到來的時間順序一緻。
- zip: 逐個接收上遊多個流的每個元素,并且一對一的組合起來,轉換後發送給下遊。示例見code3.1
code 3.1
//第一個流每1秒輸出一個偶數
Observable<Long> even = Observable.interval(1000, TimeUnit.MILLISECONDS).map(i -> i * 2L);
//第二個流每3秒輸出一個奇數
Observable<Long> odd = Observable.interval(3000, TimeUnit.MILLISECONDS).map(i -> i * 2L + 1);
//zip也可以傳入多個流,這裡隻傳入了兩個
Observable.zip(even, odd, (e, o) -> e + "," + o).forEach(x -> {
System.out.println("observer = " + x);
});
/* 輸出如下,可以看到,當某個流有元素到來時,會等待其他所有流都有元素到達時,才會合并處理然後發給下遊
observer = 0,1
observer = 2,3
observer = 4,5
observer = 6,7
...
*/
代碼code 3.1看起來沒什麼問題,兩個流并發執行,最後用zip等待他們的結果。但是卻隐藏了一個很重要的問題:RxJava預設是同步、阻塞的!!當我們想去仿照上面的方式并發發送多個請求,最後用zip監聽所有結果時,很容易發先一個詭異的現象, code 3.2的代碼中,ob2的代碼總是在ob1執行之後才會執行,并不是我們預期的兩個請求并發執行。而列印出來的線程名也可以看到,兩個Single是在同一個線程中順序執行的!
code 3.2
// Single是隻傳回一個元素的Observable的實作類
Single<String> ob1 = Single.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable 1");
TimeUnit.SECONDS.sleep(3);
return userService.queryById(1).getName();
});
Single<String> ob2 = Single.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable 2");
TimeUnit.SECONDS.sleep(1);
return userService.queryById(1).getName();
});
String s = Single.zip(ob1, ob2,
(e, o) -> {System.out.println(e + "++++" + o);
那為什麼code 3.1的兩個流能夠并發執行呢?閱讀源碼可以發現zip的實作其實就是先訂閱第一個流,再訂閱第二個流,那麼預設當然是順序執行。但是通過Observable.interval建立的流,預設會被送出到 Schedulers.computation()提供的線程池中。關于線程池,本文後面會講解。
3.4 建立API
- create :最原始的create和subscribe,其他建立方法都基于此
code 3.3
// 傳回的子類是ObservableCreate
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("event");
emitter.onNext("event2");
emitter.onComplete();
}
});
// 訂閱observable
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(Thread.currentThread().getName() + " ,TestRx.onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println(Thread.currentThread().getName() + " ,s = " + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " ,TestRx.onComplete");
}
});
- just : Observable.just("e1","e2"); 簡單的建立一個Observable,發出指定的n個元素。
- interval:code 3.1已給出示例,建立一個按一定間隔不斷産生元素的Observable,預設執行在Schedulers.comutation()提供的線程池中
- defer:産生一個延遲建立的Observable。 有點繞:Observable.create等建立出來的被觀察者雖然是延遲執行的,隻有有人訂閱的時候才會真正開始生成資料。但是建立Observable的方法卻是立即執行的。而 Observable.defer方法會在有人訂閱的時候才開始建立Observable。如代碼Code3.4
public String myFun() {
String now = new Date().toString();
System.out.println("myFun = " + now);
return now;
}
public void testDefer(){
// 該代碼會立即執行myFun()
Observable<String> ob1 = Observable.just(myFun());
// 該代碼會在産生訂閱時,才會調用myFun(), 可類比Java8的Supplier接口
Observable<String> ob2 = Observable.defer(() -> Observable.just(myFun()) );
}
- fromCallable :産生一個延遲建立的Observable,簡化的defer方法。Observable.fromCallable(() -> myFun()) 等同于Observable.defer(() -> Observable.just(myFun()) );
4.基本原理
RxJava的代碼,就是觀察者模式+裝飾器模式的展現。
4.1 Observable.create
見代碼code 3.3,create方法接收一個ObserverableOnSubscribe接口對象,我們定義了了發送元素的代碼,create方法傳回一個ObserverableCreate類型對象(繼承自Observerable抽象類)。跟進create方法原碼,直接傳回new出來的ObserverableCreate,它包裝了一個source對象,即傳入的ObserverableOnSubscribe。
code4.1
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//onAssembly預設直接傳回ObservableCreate
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Create方法就這麼簡單,隻需要記住它傳回了一個包裝了source的Observerble。
4.2 Observerable.subscribe(observer)
看下code3.3中建立訂閱關系時(observalbe.subscribe)發生了什麼:
code4.2
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) {... } catch (Throwable e) {... }
}
Observable是一個抽象類,定義了subscribe這個final方法,最終會調用subscribeActual(observer);而subscribeActual是由子類實作的方法,自然我們需要看ObserverableCreate實作的該方法。
code4.3
//ObserverableCreate實作的subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent); //source是ObservableOnSubscribe,即我們寫的生産元素的代碼
} catch (Throwable ex) {...}
}
- 将觀察者observer包裝到一個CreateEmitter裡。
- 調用observer的onSubscribe方法,傳入這個emitter。
- 調用source(即生産代碼接口)的subscribe方法,傳入這個emitter。
第二步中,直接調用了我們寫的消費者的onSubscribe方法,很好了解,即建立訂閱關系的回調方法。
重點在第三步,source.subscribe(parent); 這個parent是包裝了observer的emitter。還記得source就是我們寫的發送事件的代碼。其中手動調用了emitter.onNext()來發送資料。那麼我們CreateEmitter.onNext()做了什麼
code4.4
public void onNext(T t) {
if (t == null) {...}
if (!isDisposed()) { observer.onNext(t); }
}
!isDisposed()判斷若訂閱關系還沒取消,則調用observer.onNext(t);這個observer就是我們寫的消費者,code 3.3中我們重寫了它的onNext方法來print接收到的元素。
以上就是RxJava最基本的原理,其實邏輯很簡單,就是在建立訂閱關系的時候,直接調用生産邏輯代碼,然後再生産邏輯的onNext中,調用了觀察者observer.onNext。時序圖如下。
顯然,最基本的原理,完全解耦了和異步回調、多線程的關系。
4.2 Observable.map
通過最簡答的map方法,看下轉換api做了什麼。
如Code2.1中,調用map方法,傳入一個轉換函數,可以一對一地将上遊的元素轉換成另一種類型的元素。
code4.5
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
code4.5是Observable定義的final的map方法,可見map方法将this(即原始的observer)和轉換函數mapper包裝到一個ObservableMap中(ObservableMap也繼承Observable),然後傳回這個ObservableMap(onAssembly預設什麼都不做)。
由于ObservableMap也是一個Observable,是以他的subscribe方法會在建立訂閱者時被層層調用到,subscribe是Observable定義的final方法,最終會調用到他實作的subscribeAcutal方法。
code4.6
//ObservableMap的subscribeActual
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
可以看到ObservableMap的subscribeActual中,将原始的觀察者t和變換函數function包裝到了一個新的觀察者MapObserver中,并将它訂閱到被觀察者source上。
我們知道,發送資料的時候,觀察者的onNext會被調用,是以看下MapObserver的onNext方法
code4.7
@Override
public void onNext(T t) {
if (done) {return; }
if (sourceMode != NONE) { actual.onNext(null);return;}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {...}
actual.onNext(v);
}
code4.7中可以看到mapper.apply(t)将變換函數mapper施加到每個元素t上,變換後得到v,最後調用actual.onNext(v)将v發送給下遊觀察者actual(actual為code4.6中建立MapObserver時傳入的t)。
總結一下例如map之類的變換api的原理:
- map方法傳回一個ObservableMap,包裝了原始的觀察者t和變換函數function
- ObservableMap繼承自AbstractObservableWithUpstream(它繼承自Observable)
- 訂閱發生時,observable的final方法subscribe()會調用實作類的subscribeActual
- ObservableMap.subscribeActual中建立MapObserver(包裝了原observer),訂閱到原Observable
- 發送資料onNext被調用時,先apply變換操作,再調用原observer的onNext,即傳給下遊觀察者
4.3 線程排程
代碼Code 2.2中給出了線程排程的示例。subscribeOn(Schedulers.io())指定了被觀察者執行的線程池。observeOn(Schedulers.single())指定了下遊觀察者執行的線程池。經過了上面的學習,很自然的能夠明白,原理還是通過裝飾器模式,将Observable和Observer層層包裝,丢到線程池裡執行。我們以observeOn()為例,見code4.8。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//observeOn(Scheduler) 傳回ObservableObserveOn(繼承自Observable)
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
// Observable的subscribe方法最終會調用到ObservableObserveOn.subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//建立一個ObserveOnObserver包裝了原觀察者、worker,把它訂閱到source(原observable)
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
- observeOn(Scheduler) 傳回ObservableObserveOn
- ObservableObserveOn繼承自Observable
- 是以subscribe方法最終會調用到ObservableObserveOn重寫的subscribeActual方法
- subscribeActual傳回一個ObserveOnObserver(是一個Observer)包裝了真實的observer和worker
根據Observer的邏輯,發送資料時onNext方法會被調用,是以要看下ObserveOnObserver的onNext方法:
code4.9
public void onNext(T t) {
if (done) { return; }
if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t);}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this); //this是ObserveOnObserver,他同樣實作了Runable
}
}
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal(); //最終會調用actual.onNext(v) , 即調用被封裝的下遊觀察者,v是emmiter
}
}
- 最終生産者代碼中調用onNext時,會調用schedule方法
- schedule方法中,會送出自身(ObserveOnObserver)到線程池
- 而run方法會調用onNext(emmiter)
可見,RxJava線程排程的機制就是通過observeOn(Scheduler)将發送元素的代碼onNext(emmiter)送出到線程池裡執行。
5.使用注意
最後,給出幾個我們在開發中總結的注意事項,避免大家踩坑。
5.1 适用場景
并不是所有的IO操作、異步回調都需要使用RxJava來解決,比如如果我們隻是一兩個RPC服務的調用組合,或者每個請求都是獨立的處理邏輯,那麼引入RxJava并不會帶來多大的收益。下面給出幾個最佳的适用場景。
- 處理UI事件
- 異步響應和處理IO結果
- 事件或資料 是由無法控制的生産者推送過來的
- 組合接收到的事件
下面給一個閑魚商品批量補資料的使用場景:
背景:算法推薦了使用者的一些商品,目前隻有基礎資訊,需要調用多個業務接口,補充使用者和商品的附加業務資訊,如使用者頭像、商品視訊連接配接、商品首圖等。并且根據商品的類型不同,填充不同的垂直業務資訊。
難點:1. 多個接口存在前後依賴甚至交叉依賴;2. 每個接口都有可能逾時或者報錯,繼而影響後續邏輯;3.根據不同的依賴接口特點,需要單獨控制逾時和fallback。整個接口也需要設定整體的逾時和fallback。
方案:如果隻是多個接口獨立的異步查詢,那麼完全可以使用CompletableFuture。但基于它對組合、逾時、fallback支援不友好,并不适用于此場景。我們最終采用RxJava來實作。下面是大緻的代碼邏輯。代碼中的HsfInvoker是阿裡内部将普通HSF接口轉為Rx接口的工具類,預設運作到單獨的線程池中,是以能實作并發調用。
// 查找目前使用者的所有商品
Single<List<IdleItemDO>> userItemsFlow =
HSFInvoker.invoke(() -> idleItemReadService.queryUserItems(userId, userItemsQueryParameter))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturnItem(errorRes)
.map(res -> {
if (!res.isSuccess()) {
return emptyList;
}
return res.getResult();
})
.singleOrError();
//補充商品,依賴userItemsFlow
Single<List<FilledItemInfo>> fillInfoFlow =
userItemsFlow.flatMap(userItems -> {
if (userItems.isEmpty()) {
return Single.just(emptyList);
}
Single<List<FilledItemInfo>> extraInfo =
Flowable.fromIterable(userItems)
.flatMap(item -> {
//查找商品extendsDo
Flowable<Optional<ItemExtendsDO>> itemFlow =
HSFInvoker.invoke(() -> newItemReadService.query(item.getItemId(), new ItemQueryParameter()))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturnItem(errorRes)
.map(res -> Optional.ofNullable(res.getData()));
//視訊url
Single<String> injectFillVideoFlow =
HSFInvoker.invoke(() -> videoFillManager.getVideoUrl(item))
.timeout(100, TimeUnit.MILLISECONDS)
.onErrorReturnItem(fallbackUrl);
//填充首圖
Single<Map<Long, FrontCoverPageDO>> frontPageFlow =
itemFlow.flatMap(item -> {
...
return frontCoverPageManager.rxGetFrontCoverPageWithTpp(item.id);
})
.timeout(200, TimeUnit.MILLISECONDS)
.onErrorReturnItem(fallbackPage);
return Single.zip(itemFlow, injectFillVideoFlow, frontPageFlow, (a, b, c) -> fillInfo(item, a, b, c));
})
.toList(); //轉成商品List
return extraInfo;
});
//頭像資訊
Single<Avater> userAvaterFlow =
userAvaterFlow = userInfoManager.rxGetUserAvaters(userId).timeout(150, TimeUnit.MILLISECONDS).singleOrError().onErrorReturnItem(fallbackAvater);
//組合使用者頭像和商品資訊,一并傳回
return Single.zip(fillInfoFlow, userAvaterFlow,(info,avater) -> fillResult(info,avater))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturn(t -> errorResult)
.blockingGet(); //最後阻塞式的傳回
可以看到,通過引入RxJava,對于逾時控制、兜底政策、請求回調、結果組合都能更友善的支援。
5.2 Scheduler線程池
RxJava2 内置多個 Scheduler 的實作,但是我們建議使用Schedulers.from(executor)指定線程池,這樣可以避免使用架構提供的預設公共線程池,防止單個長尾任務block其他線程執行,或者建立了過多的線程導緻OOM。
5.3 CompletableFuture
當我們的邏輯比較簡單,隻想異步調用一兩個RPC服務的時,完全可以考慮使用Java8提供的CompletableFuture實作,它相較于Future是異步執行的,也可以實作簡單的組合邏輯。
5.4 并發
單個Observable始終是順序執行的,不允許并發地調用onNext()。
code5.1
Observable.create(emitter->{
new Thread(()->emitter.onNext("a1")).start();
new Thread(()->emitter.onNext("a2")).start();
})
但是,每個Observable可以獨立的并發執行。
code5.2
Observable ob1 = Observable.create(e->new Thread(()->e.onNext("a1")).start());
Observable ob2 = Observable.create(e->new Thread(()->e.onNext("a2")).start());
Observable ob3 = Observable.merge(ob1,ob2);
ob3中組合了ob1和ob2兩個流,每個流是獨立的。(這裡需要注意,這兩個流能并發執行,還有一個條件是他們的發送代碼運作在不同線程,就如果code3.1和code3.2中的示例一樣,雖然兩個流是獨立的,但是如果不送出到不同的線程中,還是順序執行的)。
5.5 背壓
在 RxJava 2.x 中,隻有 Flowable 類型支援背壓。當然,Observable 能解決的問題,對于 Flowable 也都能解決。但是,其為了支援背壓而新增的額外邏輯導緻 Flowable 運作性能要比 Observable 慢得多,是以,隻有在需要處理背壓場景時,才建議使用 Flowable。如果能夠确定上下遊在同一個線程中工作,或者上下遊工作在不同的線程中,而下遊處理資料的速度高于上遊發射資料的速度,則不會産生背壓問題,就沒有必要使用Flowable。關于Flowable的使用,由于篇幅原因,就不在本文闡述。
5.6 逾時
強烈建議設定異步調用的逾時時間,用timeout和onErrorReturn方法設定逾時的兜底邏輯,否則這個請求将一直占用一個Observable線程,當大量請求到來時,也會導緻OOM。
6.結語
目前,閑魚的多個業務場景都采用RxJava做異步化,大大降低了開發同學的異步開發成本。同時在多請求響應組合、并發處理都有很好的性能表現。自帶的逾時邏輯和兜底政策,在批量業務資料進行中能保證可靠性,是使用者流暢體驗的強力支撐。