-
-
-
- RxJava是什麼
- 好在哪?
- 準備知識
- 響應式程式設計
- 資料流
- 異步
- 小結
- 觀察者模式
- 響應式程式設計
- 解決問題的模型
- map解決的模型
- 建立操作
- 變換操作 f
- 通知操作
- flatMap解決模型
- map解決的模型
- RxJava中主要類介紹
- Observable
- Observer
- 主要類圖
- RxJava中的是如何實作兩個模型的?
- 建立可觀測序列Observable
- 建立變換後的可觀測序列
- 建立觀察者Observer
- 如何關聯被觀察者與觀察者,形成資料流。
- 模型一需求解決方案
- 需求
- 重貼一下上面的代碼
- 源碼分析
- 模型二解決方案
- 需求
- 代碼
- 源碼分析
- 線程控制
- 其他操作符
-
-
在看此文之前建議看下,https://www.jianshu.com/p/9ee9fa13eeef這篇文章,隻有圖,沒有字
RxJava是什麼
一個基于觀察者模式的異步任務架構
好在哪?
好在用
RxJava
做的異步請求更簡明更清晰
舉例
需求:在IO線程上執行三個網絡請求操作分别為
,
query(A)
,
query(B)
,且
query(C)
依賴于
query(B)
傳回的結果,同樣
query(A)
依賴于
query(C)
傳回的結果
query(B)
用android的異步架構得這麼寫(僞代碼):
Server server = ...;
server.makeRequest(new Query('A'), new Listener(){
onSuccess(boolean b){
if(b){
server.makeRequest(new new Query('B'), new Listener(){
onSuccess(boolean b){
if(b){
server.makeRequest(new Query('C'), new Listener(){
onSuccess(boolean b){
}
})
}
}
})
}
}
})
用Rxjava隻需要這麼寫(僞代碼)
Observable.just("A").flatMap((s) -> {
return makeRequest(new Query(s));
}).flatMap((aBoolean) -> {
if(aBoolean) return makeRequest(new Query("B"));
return null;
}).flatMap((aBoolean) -> {
if(aBoolean) return makeRequest(new Query("C"));
return null;
}).subscribeOn(Scheduals.io);
public static Observable<Boolean> makeRequest(Query query){
return Observable.just(query)
.map(new Function<Query, Boolean>() {
@Override
public Boolean apply(Query query) throws Exception {
//TODO
return true;
}
});
}
非常簡潔,避免了回調地獄,之後會通過分析源碼,去思考能夠避免回調地獄的原因
準備知識
響應式程式設計
響應式程式設計是一種通過異步和資料流來建構事物關系的程式設計模型
資料流
是兩個事物(在這裡我們了解為函數)間關系的橋梁,且隻有一個方向,即從上遊實體到下遊實體。舉個例子與
f(x1)
之間如何産生關系?
g(x2)
做為f的輸入,當
x1
生成後會通過資料(事件)流通知
f(x1)
執行,這裡的
g(x2)
就是上遊實體,
f(x1)
就是下遊實體。但如果有這樣的需求,三個獨立的函數
g(x2)
都完成後再通知
f(x1),f(x2),f(x3)
?應該怎樣去建構他們的關系?就是我們接下來要講用異步的方式去建構
g(x2)
異步
資料流不能完全建構出函數之間的關系。如資料流一節所說是互相獨立的,他們之間的關系是獨立的。這種獨立的關系就可以用異步來表示。是以解決上一節的問題便是讓
f(x1),f(x2),f(x3)
在各自的線程中執行,完成後再用資料流通知給
f(x1),f(x2),f(x3)
g(x)
小結
異步是為區分無關的事物,資料流是為了聯系起有關的事物。那麼如何實作資料流傳遞呢?就用到下面的觀察者模式
觀察者模式
觀察者模式面向的需求是:A對象對B對象的某種變化高度敏感,當B對象發生變化時,A對象需要瞬間做出反應,一般實作觀察者模式需要有觀察者即A對象,有被觀察者
Observer
即B對象,在實作的時候B對象需要持有A對象的引用,這樣當B對象發生變化時,B對象才能通過A對象的引用讓A對象做出反應,android中的典型實作便是監聽器事件,
Observable
是被觀察者,
View
是觀察者用
OnClickListener
,讓
setOnClickListener()
持有
View
的引用,當
OnClickListener
監聽到點選事件時便通知
View
進行處理。這樣子就簡單的實作了資料流從B->A的傳遞。
OnClickListener
解決問題的模型
RxJava
可以通過很多操作符(就是RxJava中的一些方法)解決許多問題模型, 盡然它是異步任務架構,我們就來看看它是怎麼處理異步任務問題模型的,隻解釋其中兩種比較典型的問題模型。
map解決的模型
由模型圖可知,首先我們需要建立可觀測序列,然後再用觀察者模式去通知它的下遊實體
map
操作(其實模型中的虛線基本上是由觀察者模式和異步實作的),在Map操作完成後形成了另一個可觀測序列,在用觀察者模式去通知這個序列依次輸出。這樣的模型可用來解決如下需求:子線程執行一個耗時任務,執行完成後傳回給主線程
通過模型圖可知,建立操作後需要通知變換操作,這個通知就用觀察者模式實作。而變換操作是獨立的而且在子線程,是以需要通過異步來實作,且變換操作執行完成後要通知給主線程的。是以也要使用觀察者模式
建立操作
如前文所述,建立操作可以看做是一個函數,由于
f(x)
要通知下遊的,是以這裡的
f(x)
是被觀察者,在RxJava裡用
f(x)
表示被觀察者去發起通知。在RxJava中
Observable
為just,假設這裡的輸入x為”A”,是以其建立操作為
f
.
Observable.just('A')
變換操作 f
同理這裡的變換操作為,需要運作在子線程,要用
map
實作。
Handler
通知操作
而子線程的通知操作也要用觀察者模式實作,其需要引用一個觀察者,這個觀察者需要自己定義,也就是說某個耗時的轉換操作在子線程運作完成後,要發送到你自己定義的主線程的觀察者中
flatMap解決模型
從模型圖我們可以看到,FlatMap裡面的資料有兩個特點:
- 資料被分成了n個
- 這n個資料也在可觀測的序列上
對應的 ,它能解決兩個基本需求:
- 原始的單個資料是集合類,比如
List<String>
,
FlatMap
可以把它們變成一個個
String
.
- 這每個String都在可觀測序列上,是以也能有通知操作的能力
是以對于第一點他能夠簡單周遊二維資料,舉個例子:需求是周遊所有學生選的課程
final List<Student> list = ...
Disposable disposable = Observable.fromIterable(list)
/*将學生的課程發送出去,從學生執行個體得到課程執行個體,再發射出去*/
.flatMap(new Function<Student, ObservableSource<Student.Course>>() {
@Override
public ObservableSource<Student.Course> apply(Student student) throws Exception {
//Log.d(TAG,"flatmap student name = "+student.name);
return Observable.fromIterable(student.getCourseList());
}
})
/*接受到學生的課程*/
.subscribe(new Consumer<Student.Course>() {
@Override
public void accept(Student.Course course) throws Exception {
System.out.printf("Consumer accept course = " + course.getName());
}
});
}
對于第二點,它能夠解決的是,網絡請求嵌套的問題。舉個例子(該例子引用自https://blog.csdn.net/jdsjlzx/article/details/51493552):需求是
queryA
和
queryB
. 并且queryB的運作依賴于queryA的結果
RxJava中主要類介紹
Observable
相當于觀察者模式中的被觀察者
Observer
相當于觀察者模式中的觀察者
主要類圖
RxJava中的是如何實作兩個模型的?
先以
map
為例
建立可觀測序列Observable
-
:just
Observable observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9);
建立變換後的可觀測序列
-
:map
Observable observable2 = observable.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
//模拟網絡請求
Thread.sleep();
return ;
}
})
建立觀察者Observer
Consumer consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//TODO
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
};
如何關聯被觀察者與觀察者,形成資料流。
在RxJava中,subscribe()這裡既是訂閱,其預設狀态也發生了變化. 我們可以用鍊式調用把他們串起來
上述代碼實作了在主線程傳遞序列,但實際上可以了解為循環了上述序列,但這隻是一個同步的實作。而RxJava是一個異步架構,能夠很友善的進行線程切換,隻需要在合适的位置加上subscrieOn,observeOn即可,接着上面的例子
observable2
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
subscribe(consumer);
該例子實作了
map
操作在子線程運作,然後切換回主線程通知觀察者執行. 下面深入具體的源碼去分析一下上面這個例子
模型一需求解決方案
需求
請求網絡,有結果傳回主線程
重貼一下上面的代碼
Observable.just(, , , , , , , , )
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
//模拟網絡請求
Thread.sleep();
return ;
}
}) .subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//TODO
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
源碼分析
- 可觀測序列的建立操作
-
:時序圖為just(item)
方法解釋如下onAssembly
,就是new ObservableFromArray<T>(items)
這個被觀察者中儲存ObservableFromArray
這個數組items
-
:存儲了map()
(map的上遊實體)的引用,我們用ObservableFromArray
表示,與ofa_this
對象的引用我們用Function
表示fun1
-
:存儲了其上遊被觀察者subscribeOn()
的引用ObservableMap
和IO排程器map_this
-
:存儲了observeOn()
的上遊的引用ObservableSubscribeOn
和UI線程排程器sub_this
- 思考:為什麼目前對象要存儲之前對象所對應的
引用?Observable
因為後面需要用到這些引用去訂閱對應的觀察者Observer,如下圖
-
- 訂閱操作:
- 思考:上遊如何通知下遊呢?
分析模型一可知,因為中間有線程切換操作加資料轉換操作,是以資料流必須流經這兩個實體。是以如果想讓頂層通知到最後的底層的話,必須要經過中間層,讓資料流一層一層傳遞。又根據觀察者模式,需要下遊的observer訂閱上遊的observable,才能讓資料從上遊流向下遊。
- 源碼中的解決方案 上圖步驟1的源碼如下,2,3的原理同1:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { //subscribe(observer)中的observer即是LambdaObserver LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } //ObservableSubscribeOn.java @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); //ObserveOnObserver存儲了LambdaObserver的執行個體ls,且在訂閱前操作中存儲的ObserveOnObserver訂閱被觀察者者source source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
- 思考:上遊如何通知下遊呢?
- 通知操作:RxJava中訂閱完成後,因為之前已經訂閱過,是以上遊可以調用onNext方法直接通知下遊 在mapObserver.onNext()操作中會執行如下代碼:
-
最後在執行LambdaObserver的onNext中執行消費者函數:@Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { //回調map中定義的函數,mapper即上文存儲的函數 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } //利用之前儲存的actual執行個體調用其下遊的onNext actual.onNext(v); }
@Override public void onNext(T t) { if (!isDisposed()) { try { //可以看到回調了觀察者内部定義的函數 onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); get().dispose(); onError(e); } } }
- 另一個結合reforit的例子,一圖以蔽之,其中
和bodyObservale
都是callExecuteObservale
中的被觀察者,省略了網絡請求的建立被觀察者創操作,代碼如下:reforit
對應的解釋圖如下:disposable = Network.getGankApi() .getBeauties(, page) .map(new Function<GankBeautyResult, List<Item>>() { @Override public List<Item> apply(GankBeautyResult gankBeautyResult) throws Exception { return null; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<Item>>() { @Override public void accept(@NonNull List<Item> items) throws Exception { swipeRefreshLayout.setRefreshing(false); pageTv.setText(getString(R.string.page_with_number, MapFragment.this.page)); adapter.setItems(items); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { swipeRefreshLayout.setRefreshing(false); Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show(); } });
模型二解決方案
需求假設有一個資料結構『學生』,要列印出每個學生所需要修的所有課程的名稱呢?
代碼
源碼分析final List<Student> list = ... Disposable disposable = Observable.fromIterable(list) /*将學生的課程發送出去,從學生執行個體得到課程執行個體,再發射出去*/ .flatMap(new Function<Student, ObservableSource<Student.Course>>() { @Override public ObservableSource<Student.Course> apply(Student student) throws Exception { //Log.d(TAG,"flatmap student name = "+student.name); return Observable.fromIterable(student.getCourseList()); } }) /*接受到學生的課程*/ .subscribe(new Consumer<Student.Course>() { @Override public void accept(Student.Course course) throws Exception { System.out.printf("Consumer accept course = " + course.getName()); } }); }
- 訂閱前的操作
- 訂閱操作
- 通知操作
-
通知在訂閱操作訂閱的觀察者fro_this
-MergeObserver
收到通知,執行訂閱前操作的裡面的MergeObserver
,代碼如下,其傳回的p是一個fun2
類型,這樣就實作了相當于把轉換後的資料放入了對應的可觀測序列,根據模型二可知,下一步就是要将每個可觀測序列中的菱形資料提出來在放入一個Observable
進行輸出。這個提出來的過程就是Observable
實作的。可以了解為做了一個subscribeInner
操作。這個過程邏輯有點複雜,因為涉及到并發控制,是以略過。map
@Override public void onNext(T t) { // safeguard against misbehaving sources if (done) { return; } ObservableSource<? extends U> p; try { //執行fun2函數 p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); onError(e); return; } if (maxConcurrency != Integer.MAX_VALUE) { synchronized (this) { if (wip == maxConcurrency) { sources.offer(p); return; } wip++; } } //内部訂閱 subscribeInner(p); }
-
線程控制
從模型一中我們可以總計如下:- subscribeOn()切換子線程是在訂閱過程中切換的
- observerOn()切換成主線程是在通知的過程中
- 是以上面兩個一個操縱訂閱過程的線程,一個操縱通知過程的線程,猜測可以産生出任何一種你想要的線程切換功能
- 舉個栗子了解一下:
Observable .map // 操作1 .flatMap // 操作2 .subscribeOn(io)//操作3 .map //操作4 .flatMap //操作5 .observeOn(main)//操作6 .map //操作7 .flatMap //操作8 .subscribeOn(io) //操作9 .subscribe(handleData)
我們可以發現,第一個操作9的線程切換沒有産生效果,是以總結如下:subscribeOn隻能調用一次,因為如果有多次,隻會有一次有效果,observeOn()可以多次調用實作了你想要的線程的多次切換。
其他操作符
當然還有其他操作符,留給後面繼續讨論,不過掌握了map和flatmap後,後面的有些操作符應該就不會特别難了解,具體的可見官網
- 另一個結合reforit的例子,一圖以蔽之,其中