Dispatcher是負責對okhttp所有的請求進行排程管理的類。可以通過Dispatcher擷取,或者取消所有請求。這裡指的一個請求就是對應的Call,并不是指Request,下面出現的所有的請求都是指Call。這裡通過分析跟蹤okhttp發送請求的過程來分析Dispatcher是如何維護和排程我們發出的所有請求的。
Call其實就是對Request的封裝。
OkHttp請求方式
通過okhttp發送請求主要有兩種方式。
- 通過
調用,此時request會被馬上發出, 直到傳回response或者發生錯誤前會一直阻塞。可以了解為一個立即執行的同步請求。execute()
- 通過
調用,此時request将會在未來的某個時間點被執行,具體由dispatcher進行排程,這種方式是異步傳回結果的。可以了解為會被盡快執行的一個異步請求。enqueue()
第一種方式
通過
execute()
調用,一般是這樣的
通過OkHttpClient的代碼可以看出
newCall()
方法其實是new了一個RealCall,是以這裡直接檢視RealCall的
execute()
方法。
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
RealCall的
execute()
方法,這裡隻看下核心的代碼:
@Override public Response execute() throws IOException {
synchronized (this) {
//此處除去一些其他代碼
//...
try {
//通知Dispatcher這個Call正在被執行,同時将此Call交給Dispatcher
//Dispatcher可以對此Call進行管理
client.dispatcher().executed(this);
//請求的過程,注意這行代碼是阻塞的,直到傳回result!
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
//此時這個請求已經執行完畢了,通知Dispatcher,不要再維護這個Call了
client.dispatcher().finished(this);
}
}
首先注意這行代碼
client.dispatcher().executed(this);
它是調用的Dispatcher的
executed()
方法,注意看方法名是executed并不是execute。接下來去Dispatcher裡看下這個方法做了什麼。
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
看注釋就明白了,這裡他隻是一個通知的作用,通知Dispatcher我這個call立即要被或者正在被執行,然後Dispatcher會把加入一個名為runningSyncCalls的雙端隊列中,這個隊列中存儲着所有的正在運作的同步請求。這樣Dispatcher就可以很友善的對所有的同步請求進行管理了。既然有添加,那麼也應該有删除,在請求執行完畢時調用了這行代碼:
client.dispatcher().finished(this);
通過字面意思了解他應該就是删除的操作,通知Dispatcher這個請求已經被執行完畢了。這裡暫時了解為調用finished方法就是将此call從runningSyncCalls中移除,後面會再讨論finished方法的細節。
因為同步請求是被馬上執行的,是以Dispatcher能對同步請求進行的排程也隻有cancel了。具體可以通過調用
Dispatcher.cancelAll()
方法進行取消。
是以真正執行請求的隻有這行代碼了。
Response result = getResponseWithInterceptorChain();
這個方法先不管他,就可以了解為這行代碼的執行就是請求從發出到完成的過程。在分析攔截器的實作原理的時候再來讨論。
第二種方式
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
通過上面我們已經知道這裡調用的也是RealCall的enqueue方法,我們直接來看代碼:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
//判斷是否已經執行過了
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//捕獲調用棧的資訊,用來分析連接配接洩露
captureCallStackTrace();
//封裝一個AsyncCall交給Dispatcher排程
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
通過上面的代碼可以看出調用
enqueue()
方法,其實是調用了Dispatcher的
enqueue()
方法,并且new了一個AsyncCall作為參數。AsyncCall為RealCall的一個内部類,下面繼續看AsyncCall類裡到底做了什麼。
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
//...
/**
*真正執行送出請求的地方,為了看起來清晰,精簡了部分代碼
*/
@Override protected void execute() {
try {
//請求的過程,注意這裡也是阻塞的
Response response = getResponseWithInterceptorChain();
//先不管這個Interceptor是幹嘛的,下面的代碼可以了解為:
//如果沒有被取消,并且沒有發生異常,回調onResponse方法。
//如果發生了異常或者被取消,回調onFailure方法。
if (retryAndFollowUpInterceptor.isCanceled()) {
//此請求被取消了,回調onFailure
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
//此請求成功了,回調onResponse
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
//發生了異常,回調onFailure
responseCallback.onFailure(RealCall.this, e);
} finally {
//通知Dispatcher Call被執行完畢了
client.dispatcher().finished(this);
}
}
}
可以看到AsyncCall的
execute()
就是具體請求執行的地方,隻不過和上面的RealCall的
execute()
方法相比,多了回調的處理。retryAndFollowUpInterceptor其實是負責請求逾時的重試和重定向操作的,
retryAndFollowUpInterceptor.isCanceled()
就是用來判斷這個請求是否被取消了,這裡就不深入展開了。那麼AsyncCall的
execute()
方法是怎麼被執行的呢,繼續來看AsyncCall的父類NamedRunnable。
/**
* Runnable implementation which always sets its thread name.
*/
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
//注意這裡調用了execute方法
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
可以看到NamedRunnable其實就是一個實作了Runnable接口的抽象類,并且在run方法中調用了
execute()
。也就是說AsyncCal其實就是一個Runnable,當這個Runnable被調用的時候
execute()
方法自然會被調用。看到這裡就很清晰了,再回過頭來看RealCall的
enqueue()
中調用的這段代碼
//封裝一個AsyncCall交給Dispatcher排程
client.dispatcher().enqueue(new AsyncCall(responseCallback));
其實這裡的
new AsyncCall(responseCallback)
就是new了一個封裝的Runnable對象,這個Runnable的執行,就是整個請求的發起與回調的過程。好啦,這裡搞明白了其實調用
Dispatcher().enqueue()
方法傳遞過去的是一個Runnable對象,接下來就去Dispatcher中看下,他對這個Runnable做了什麼。
synchronized void enqueue(AsyncCall call) {
//判斷正在執行的異步請求數沒有達到門檻值,并且每一個Host的請求數也沒有達到門檻值
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//加入到正在執行隊列,并立即執行
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
//加入到等待隊列
readyAsyncCalls.add(call);
}
}
上面的代碼中又出現了兩個雙端隊列,runningAsyncCalls和readyAsyncCalls,加上上面出現的runningSyncCalls可以看到Dispatcher一共維護了3個請求隊列,分别是
- runningAsyncCalls,正在請求的異步隊列
- readyAsyncCalls,準備請求的異步隊列\等待請求的異步隊列
- runningSyncCalls,正在請求的同步隊列
還出現了一個方法
executorService()
,接下來看下這個方法是幹嘛的。
private ExecutorService executorService;
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(, Integer.MAX_VALUE, , TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看出來這個方法就是以懶漢的方式建立最大容量為
Integer.MAX_VALUE
, 存活等待時間為60S的線程池(其實這裡的最大容量并沒什麼用,因為他的最大容量不會超過runningAsyncCalls的size,即設定的并發請求數的門檻值)。
executorService().execute(call)
就是把這個請求丢進去執行。那麼
enqueue()
方法執行的過程大概就是,首先判斷目前正在執行的異步請求總數是否已經達到的門檻值(預設為64),針對每個host的同時請求數量是否達到了門檻值(預設為5)。如果都沒有達到那麼将這個請求加入到runningAsyncCalls隊列中,馬上執行。
否則,會将這個請求加入到readyAsyncCalls中,準備執行。那麼readyAsyncCalls中的請求時何時被調用的呢?掐指一算,應該是在runningAsyncCalls中某些請求被執行完畢時,不滿足上面的兩個條件自然會被調用。是不是呢?接下來看上面一直忽略的Dispatcher的**三個**finished方法:
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
//異步請求結束時調用此方法
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
//同步請求結束時調用此方法
finished(runningSyncCalls, call, false);
}
/**
*将執行完畢的call從相應的隊列移除
*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//從相應的隊列中移除相應的call,如果不包含,抛異常
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//是否需要提升Call的級别
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//如果沒有任何需要執行的請求,那麼執行idleCallBack
if (runningCallsCount == && idleCallback != null) {
idleCallback.run();
}
}
可以看出來不管是異步調用結束,還是同步調用結束,最終都是調用的這個被private修飾的finished方法,都會将完成的call從相應的隊列中移除。唯一不同的是調用時傳遞的promoteCalls參數不同,異步請求結束時傳入的是true,同步請求時結束傳入的是false。并且會根據這個flag來判斷是否執行
promoteCalls()
方法,接下來看
promoteCalls()
裡做了什麼。
/**
*提升call的優先級
*/
private void promoteCalls() {
//runningAsyncCalls已經滿了,不能再加了
if (runningAsyncCalls.size() >= maxRequests) return;
//沒有請求在readyAsyncCalls等着被執行
if (readyAsyncCalls.isEmpty()) return;
//周遊準備隊列裡的請求
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//判斷該請求的host是否小于每個host最大請求門檻值
if (runningCallsForHost(call) < maxRequestsPerHost) {
//将該請求從readyAsyncCalls移除,加入runningAsyncCalls并執行
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
//如果runningAsyncCalls數量已經達到門檻值,終止周遊
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
可以看出
promoteCalls()
方法就是試圖去readyAsyncCalls中取出Call來加入runningAsyncCalls中執行。是以上面的兩個finished方法調用方式的差別也就明晰了。同步調用結束因為并沒有涉及到runningAsyncCalls中的任何東西,對runningAsyncCalls沒任何影響,是以不需要調用promoteCalls。而異步的調用結束意味着runningAsyncCalls中會出現一個空位值,是以它會調用promoteCalls去嘗試從readyAsyncCalls中拉一個進來。
總結
好啦 到這裡整個dispatcher的排程分析算是完成了。總結起來其實他就是維護了三個隊列,三個隊列中包含了正在執行或者将要執行的所有請求。總結起來就是:
- 當發送一個異步請求時:如果runningAsyncCalls沒達到門檻值,那麼會将這個請求加入到runningAsyncCalls立即執行,否則會将這個請求加入到readyAsyncCalls中等待執行。當一個異步請求執行完畢時會試圖去執行readyAsyncCalls中的請求。
- 當發送一個同步請求時:該請求會直接加入到runningSyncCalls中,并且馬上開始執行,注意這個執行并不是由Dispatcher排程的。
- 所有異步執行的請求都會通過executorService線程池來執行,這是個懶漢方式建立的線程池。
整個大緻的流程
。
歡迎閱讀下一篇interceptor原了解析