RxJava系列教程:
1. RxJava使用介紹 【視訊教程】
2. RxJava操作符
• Creating Observables(Observable的建立操作符) 【視訊教程】
• Transforming Observables(Observable的轉換操作符) 【視訊教程】
• Filtering Observables(Observable的過濾操作符) 【視訊教程】
• Combining Observables(Observable的組合操作符) 【視訊教程】
• Error Handling Operators(Observable的錯誤處理操作符) 【視訊教程】
• Observable Utility Operators(Observable的輔助性操作符) 【視訊教程】
• Conditional and Boolean Operators(Observable的條件和布爾操作符) 【視訊教程】
• Mathematical and Aggregate Operators(Observable數學運算及聚合操作符) 【視訊教程】
• 其他如observable.toList()、observable.connect()、observable.publish()等等; 【視訊教程】
3. RxJava Observer與Subcriber的關系 【視訊教程】
4. RxJava線程控制(Scheduler) 【視訊教程】
5. RxJava 并發之資料流發射太快如何辦(背壓(Backpressure)) 【視訊教程】
observeOn和subscribeOn都是對observable的一種操作,差別就是subscribeOn改變了observable本身産生事件的schedule以及發出事件後相關處理事件的程式所在的scheduler,而obseveron僅僅是改變了對發出事件後相關處理事件的程式所在的scheduler。
或許你會問,這有多大的差別嗎?的确是有的,比如說産生observable事件是一件費時可能會卡主線程的操作(比如說擷取網絡資料),那麼subscribeOn就是你的選擇,這樣可以避免卡住主線程。
兩者最主要的差别是影響的範圍不同,observeOn is more limited,但是卻是可以多次調用,多次改變不同的接受者所在的scheduler,在調用這個函數之後的observable造成影響。而subscribeOn則是一次性的,無論在什麼地方調用,總是從改變最原始的observable開始影響整個observable的處理。
subscribeOn()和observeOn()的差別
- subscribeOn()主要改變的是訂閱的線程,即call()執行的線程;
- observeOn()主要改變的是發送的線程,即onNext()執行的線程。
subscribeOn
我們先看一個例子。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("a");
subscriber.onNext("b");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String integer) {
System.out.println(integer);
}
});
運作如下:
a
b
我們看一下subscribeOn()中,都幹了什麼
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
很明顯,會走if之外的方法。
在這裡我們可以看到,我們又建立了一個Observable對象,但建立時傳入的參數為OperatorSubscribeOn(this,scheduler),我們看一下此對象以及其對應的構造方法
OperatorSubscribeOn代碼:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
可以看到,OperatorSubscribeOn實作Onsubscribe,并且由其構造方法可知,他儲存了上一個Observable對象,并儲存了Scheduler對象。
這裡做個總結。
把Observable.create()建立的稱之為Observable_1,OnSubscribe_1。把subscribeOn()建立的稱之為Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。
那麼,前兩步就是建立了兩個的observable,和OnSubscribe,并且OnSubscribe_2中儲存了Observable_1的應用,即source。
調用Observable_2.subscribe()方法會調用OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。
下面分析下call()方法。
- inner.schedule()改變了線程,此時Action的call()在我們指定的線程中運作。
- Subscriber被包裝了一層。
- source.unsafeSubscribe(s);,注意source是Observable_1對象。
unsafeSubscribe方法代碼:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
代碼很多,關鍵代碼:
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
該方法即調用了OnSubscribe_1.call()方法。注意,此時的call()方法在我們指定的線程中運作。那麼就起到了改變線程的作用。
對于以上線程,我們可以總結,其有如下流程:
- Observable.create() : 建立了Observable_1和OnSubscribe_1;
- subscribeOn(): 建立Observable_2和OperatorSubscribeOn(OnSubscribe_2),同時OperatorSubscribeOn儲存了Observable_1的引用。
- observable_2.subscribe(Observer):
- 調用OperatorSubscribeOn的call()。call()改變了線程的運作,并且調用了Observable_1.unsafeSubscribe(s);
-
Observable_1.unsafeSubscribe(s);,該方法的實作中調用了OnSubscribe_1的call()。
從這個可以了解,無論我們的subscribeOn()放在哪裡,他改變的是subscribe()的過程,而不是onNext()的過程。
那麼如果有多個subscribeOn(),那麼線程會怎樣執行呢。如果按照我們的邏輯,有以下程式
Observable.just("ss")
.subscribeOn(Schedulers.io()) // ----1---
.subscribeOn(Schedulers.newThread()) //----2----
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
那麼,我們根據之前的源碼分析其執行邏輯。
- Observable.just(“ss”),建立Observable_1,OnSubscribe_1
- Observable_1.subscribeOn(Schedulers.io()):建立observable_2,OperatorSubscribeOn_2并儲存Observable_1的引用。
- observable_2.subscribeOn(Schedulers.newThread()):建立Observable_3,OperatorSubscribeOn_3并儲存Observable_2的引用。
- Observable_3.subscribe():
- 調用OperatorSubscribeOn_3.call(),改變線程為Schedulers.newThread()。
- 調用OperatorSubscribeOn_2.call(),改變線程為Schedulers.io()。
-
調用OnSubscribe_1.call(),此時call()運作在Schedulers.io()。
根據以上邏輯分析,會按照1的線程進行執行。
subscribeOn如何工作,關鍵代碼其實就是一行代碼:
注意它所在的位置,是在worker的call裡面,說白了,就是把source.subscribe這一行調用放在指定的線程裡,那麼總結起來的結論就是:
subscribeOn的調用,改變了調用前序列所運作的線程。
observeOn
看一下observeOn()源碼:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
這裡引出了新的操作符lift
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
這裡不再介紹了,詳見:http://blog.csdn.net/jdsjlzx/article/details/51686152
在lift()中,有如下關鍵代碼:
OperatorObserveOn.call()核心代碼:
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
我們看到其傳回了ObserveOnSubscriber< T>,注意:此時隻調用了call()方法,但call()方法中并沒有改變線程的操作,此時為subscribe()過程。
我們直奔重點,因為,我們了解到其改變的是onNext()過程,那麼我們肯定要看一下ObserveOnSubscriber.onNext()找找在哪改變線程
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
這裡做了兩件事,首先把結果緩存到一個隊列裡,然後調用schedule啟動傳入的worker
我們這裡需要注意下:
在調用observeOn前的序列,把結果傳入到onNext就是它的工作,它并不關心後續的流程,是以工作就到這裡就結束了,剩下的交給ObserveOnSubscriber繼續。
onNext方法最後調用了schedule(),從方法名可以看到,其肯定是改變線程用的,并且該方法經過一番循環之後,調用了該類的call()方法。
protected void schedule() {
if (counter.getAndIncrement() == ) {
recursiveScheduler.schedule(this);
}
}
recursiveScheduler 就是之前我們傳入的Scheduler,我們一般會在observeOn傳入AndroidScheluders.mainThread()。
scheduler中調用的call()方法
// only execute this from schedule()
@Override
public void call() {
long missed = L;
long currentEmission = emitted;
// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;
// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(localOn.getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == L) {
break;
}
}
}
call()中有localChild.onNext(localOn.getValue(v));調用。
在Scheduler啟動後, 我們在Observable.subscribe(a)傳入的a就是這裡的child, 我們看到,在call中終于調用了它的onNext方法,把真正的結果傳了出去,但是在這裡,我們是工作在observeOn的線程上的。
總結起來的結論就是:
- observeOn 對調用之前的序列默不關心,也不會要求之前的序列運作在指定的線程上
- observeOn 對之前的序列産生的結果先緩存起來,然後再在指定的線程上,推送給最終的subscriber
observeOn改變的是onNext()調用。
subcribeOn和observeOn 對比分析
Observable
.map // 操作
.flatMap // 操作
.subscribeOn(io)
.map //操作
.flatMap //操作
.observeOn(main)
.map //操作
.flatMap //操作
.subscribeOn(io) //!!特别注意
.subscribe(handleData)
有如上邏輯,則我們對其運作進行分析。
首先,我們需要先明白其内部執行的邏輯。
在調用subscribe之後,邏輯開始運作。分别調用每一步OnSubscribe.call(),注意:自下往上。當運作到最上,即Observable.create()後,我們在其中調用了subscriber.onNext(),于是程式開始自上往下執行每一個對象的subscriber.onNext()方法。最終,直到subscribe()中的回調。
其次,從上面對subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()是在onNext()中作用。
那麼對于以上的邏輯,我們可以得出如下結論:
- 操作1,2,3,4在io線程中,因為在如果沒有observeOn()影響,他們的回調操作預設在訂閱的線程中。而我們的訂閱線程在subscribeOn(io)發生了改變。注意他們執行的先後順序。
- 操作5,6在main線程中運作。因為observeOn()改變了onNext().
- 特别注意那一個邏輯沒起到作用
再簡單點總結就是
- subscribeOn的調用切換之前的線程。
- observeOn的調用切換之後的線程。
- observeOn之後,不可再調用subscribeOn 切換線程
複雜情況
我們經常多次使用subscribeOn切換線程,那麼以後是否可以組合observeOn和subscribeOn達到自由切換的目的呢?
組合是可以的,但是他們的執行順序是有條件的,如果仔細分析的話,可以知道observeOn調用之後,再調用subscribeOn是無效的,原因是什麼?
因為subscribeOn改變的是subscribe這句調用所在的線程,大多數情況,産生内容和消費内容是在同一線程的,是以改變了産生内容所在的線程,就改變了消費内容所在的線程。
經過上面的闡述,我們知道,observeOn的工作原理是把消費結果先緩存,再切換到新線程上讓原始消費者消費,它和生産者是沒有一點關系的,就算subscribeOn調用了,也隻是改變observeOn這個消費者所在的線程,和OperatorObserveOn中存儲的原始消費者一點關系都沒有,它還是由observeOn控制。
@扔物線 大神給的總結:
- 下面提到的“操作”包括産生事件、用操作符操作事件以及最終的通過 subscriber 消費事件;
- 隻有第一subscribeOn() 起作用(是以多個 subscribeOn() 無意義);
- 這個 subscribeOn() 控制從流程開始的第一個操作,直到遇到第一個 observeOn();
- observeOn() 可以使用多次,每個 observeOn() 将導緻一次線程切換(),這次切換開始于這次 observeOn() 的下一個操作;
- 不論是 subscribeOn() 還是 observeOn(),每次線程切換如果不受到下一個 observeOn() 的幹預,線程将不再改變,不會自動切換到其他線程。
參考文章:
https://segmentfault.com/a/1190000004856071
http://blog.csdn.net/lisdye2/article/details/51113837
最後需要注意的是:
預設情況下, doOnSubscribe() 執行在 subscribe() 發生的線程;而如果在 doOnSubscribe() 之後有subscribeOn() 的話,它将執行在離它最近的 subscribeOn() 所指定的線程。
想要了解更多請參考部落格:RxJava中的doOnSubscribe預設執行線程分析
這個(學習總結)部落格花了3個小時多,看源碼真的很頭疼,希望以後能有所提高。