天天看點

OkHttp Dispatcher的排程過程分析

Dispatcher是負責對okhttp所有的請求進行排程管理的類。可以通過Dispatcher擷取,或者取消所有請求。這裡指的一個請求就是對應的Call,并不是指Request,下面出現的所有的請求都是指Call。這裡通過分析跟蹤okhttp發送請求的過程來分析Dispatcher是如何維護和排程我們發出的所有請求的。

Call其實就是對Request的封裝。

OkHttp請求方式

通過okhttp發送請求主要有兩種方式。

  1. 通過

    execute()

    調用,此時request會被馬上發出, 直到傳回response或者發生錯誤前會一直阻塞。可以了解為一個立即執行的同步請求。
  2. 通過

    enqueue()

    調用,此時request将會在未來的某個時間點被執行,具體由dispatcher進行排程,這種方式是異步傳回結果的。可以了解為會被盡快執行的一個異步請求。

第一種方式

通過

execute()

調用,一般是這樣的

通過OkHttpClient的代碼可以看出

newCall()

方法其實是new了一個RealCall,是以這裡直接檢視RealCall的

execute()

方法。

@Override public Call newCall(Request request) {
  return new RealCall(this, request, false /* for web socket */);
}
           

RealCall的

execute()

方法,這裡隻看下核心的代碼:

@Override public Response execute() throws IOException {
  synchronized (this) {
  //此處除去一些其他代碼
  //...
  try {
    //通知Dispatcher這個Call正在被執行,同時将此Call交給Dispatcher
    //Dispatcher可以對此Call進行管理
    client.dispatcher().executed(this);
    //請求的過程,注意這行代碼是阻塞的,直到傳回result!
    Response result = getResponseWithInterceptorChain();
    if (result == null) throw new IOException("Canceled");
    return result;
  } finally {
    //此時這個請求已經執行完畢了,通知Dispatcher,不要再維護這個Call了
    client.dispatcher().finished(this);
  }
}
           

首先注意這行代碼

client.dispatcher().executed(this);
           

它是調用的Dispatcher的

executed()

方法,注意看方法名是executed并不是execute。接下來去Dispatcher裡看下這個方法做了什麼。

/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}
           

看注釋就明白了,這裡他隻是一個通知的作用,通知Dispatcher我這個call立即要被或者正在被執行,然後Dispatcher會把加入一個名為runningSyncCalls的雙端隊列中,這個隊列中存儲着所有的正在運作的同步請求。這樣Dispatcher就可以很友善的對所有的同步請求進行管理了。既然有添加,那麼也應該有删除,在請求執行完畢時調用了這行代碼:

client.dispatcher().finished(this);
           

通過字面意思了解他應該就是删除的操作,通知Dispatcher這個請求已經被執行完畢了。這裡暫時了解為調用finished方法就是将此call從runningSyncCalls中移除,後面會再讨論finished方法的細節。

因為同步請求是被馬上執行的,是以Dispatcher能對同步請求進行的排程也隻有cancel了。具體可以通過調用

Dispatcher.cancelAll()

方法進行取消。

是以真正執行請求的隻有這行代碼了。

Response result = getResponseWithInterceptorChain();
           

這個方法先不管他,就可以了解為這行代碼的執行就是請求從發出到完成的過程。在分析攔截器的實作原理的時候再來讨論。

第二種方式

client.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {

    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {

    }
});
           

通過上面我們已經知道這裡調用的也是RealCall的enqueue方法,我們直接來看代碼:

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
  //判斷是否已經執行過了
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  //捕獲調用棧的資訊,用來分析連接配接洩露
  captureCallStackTrace();
  //封裝一個AsyncCall交給Dispatcher排程
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
           

通過上面的代碼可以看出調用

enqueue()

方法,其實是調用了Dispatcher的

enqueue()

方法,并且new了一個AsyncCall作為參數。AsyncCall為RealCall的一個内部類,下面繼續看AsyncCall類裡到底做了什麼。

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
    //...
    /**
    *真正執行送出請求的地方,為了看起來清晰,精簡了部分代碼
    */
    @Override protected void execute() {
      try {
        //請求的過程,注意這裡也是阻塞的
        Response response = getResponseWithInterceptorChain();
        //先不管這個Interceptor是幹嘛的,下面的代碼可以了解為:
        //如果沒有被取消,并且沒有發生異常,回調onResponse方法。
        //如果發生了異常或者被取消,回調onFailure方法。
        if (retryAndFollowUpInterceptor.isCanceled()) {
          //此請求被取消了,回調onFailure
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          //此請求成功了,回調onResponse
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
          //發生了異常,回調onFailure
          responseCallback.onFailure(RealCall.this, e);
      } finally {
        //通知Dispatcher Call被執行完畢了
        client.dispatcher().finished(this);
      }
    }
  }
           

可以看到AsyncCall的

execute()

就是具體請求執行的地方,隻不過和上面的RealCall的

execute()

方法相比,多了回調的處理。retryAndFollowUpInterceptor其實是負責請求逾時的重試和重定向操作的,

retryAndFollowUpInterceptor.isCanceled()

就是用來判斷這個請求是否被取消了,這裡就不深入展開了。那麼AsyncCall的

execute()

方法是怎麼被執行的呢,繼續來看AsyncCall的父類NamedRunnable。

/**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      //注意這裡調用了execute方法
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}
           

可以看到NamedRunnable其實就是一個實作了Runnable接口的抽象類,并且在run方法中調用了

execute()

。也就是說AsyncCal其實就是一個Runnable,當這個Runnable被調用的時候

execute()

方法自然會被調用。看到這裡就很清晰了,再回過頭來看RealCall的

enqueue()

中調用的這段代碼

//封裝一個AsyncCall交給Dispatcher排程
client.dispatcher().enqueue(new AsyncCall(responseCallback));
           

其實這裡的

new AsyncCall(responseCallback)

就是new了一個封裝的Runnable對象,這個Runnable的執行,就是整個請求的發起與回調的過程。好啦,這裡搞明白了其實調用

Dispatcher().enqueue()

方法傳遞過去的是一個Runnable對象,接下來就去Dispatcher中看下,他對這個Runnable做了什麼。

synchronized void enqueue(AsyncCall call) {
  //判斷正在執行的異步請求數沒有達到門檻值,并且每一個Host的請求數也沒有達到門檻值
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    //加入到正在執行隊列,并立即執行
    runningAsyncCalls.add(call);
    executorService().execute(call);
  } else {
    //加入到等待隊列
    readyAsyncCalls.add(call);
  }
}
           

上面的代碼中又出現了兩個雙端隊列,runningAsyncCalls和readyAsyncCalls,加上上面出現的runningSyncCalls可以看到Dispatcher一共維護了3個請求隊列,分别是

  1. runningAsyncCalls,正在請求的異步隊列
  2. readyAsyncCalls,準備請求的異步隊列\等待請求的異步隊列
  3. runningSyncCalls,正在請求的同步隊列

還出現了一個方法

executorService()

,接下來看下這個方法是幹嘛的。

private ExecutorService executorService;
public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(, Integer.MAX_VALUE, , TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}
           

可以看出來這個方法就是以懶漢的方式建立最大容量為

Integer.MAX_VALUE

, 存活等待時間為60S的線程池(其實這裡的最大容量并沒什麼用,因為他的最大容量不會超過runningAsyncCalls的size,即設定的并發請求數的門檻值)。

executorService().execute(call)

就是把這個請求丢進去執行。那麼

enqueue()

方法執行的過程大概就是,首先判斷目前正在執行的異步請求總數是否已經達到的門檻值(預設為64),針對每個host的同時請求數量是否達到了門檻值(預設為5)。如果都沒有達到那麼将這個請求加入到runningAsyncCalls隊列中,馬上執行。

否則,會将這個請求加入到readyAsyncCalls中,準備執行。那麼readyAsyncCalls中的請求時何時被調用的呢?掐指一算,應該是在runningAsyncCalls中某些請求被執行完畢時,不滿足上面的兩個條件自然會被調用。是不是呢?接下來看上面一直忽略的Dispatcher的**三個**finished方法:

/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
  //異步請求結束時調用此方法
  finished(runningAsyncCalls, call, true);
}

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
  //同步請求結束時調用此方法
  finished(runningSyncCalls, call, false);
}
/**
*将執行完畢的call從相應的隊列移除
*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
  int runningCallsCount;
  Runnable idleCallback;
  synchronized (this) {
    //從相應的隊列中移除相應的call,如果不包含,抛異常
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    //是否需要提升Call的級别
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
  }
    //如果沒有任何需要執行的請求,那麼執行idleCallBack
  if (runningCallsCount ==  && idleCallback != null) {
    idleCallback.run();
  }
}
           

可以看出來不管是異步調用結束,還是同步調用結束,最終都是調用的這個被private修飾的finished方法,都會将完成的call從相應的隊列中移除。唯一不同的是調用時傳遞的promoteCalls參數不同,異步請求結束時傳入的是true,同步請求時結束傳入的是false。并且會根據這個flag來判斷是否執行

promoteCalls()

方法,接下來看

promoteCalls()

裡做了什麼。

/**
*提升call的優先級
*/
private void promoteCalls() {
  //runningAsyncCalls已經滿了,不能再加了
  if (runningAsyncCalls.size() >= maxRequests) return; 
  //沒有請求在readyAsyncCalls等着被執行
  if (readyAsyncCalls.isEmpty()) return; 
  //周遊準備隊列裡的請求
  for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    AsyncCall call = i.next();
    //判斷該請求的host是否小于每個host最大請求門檻值
    if (runningCallsForHost(call) < maxRequestsPerHost) {
      //将該請求從readyAsyncCalls移除,加入runningAsyncCalls并執行
      i.remove();
      runningAsyncCalls.add(call);
      executorService().execute(call);
    }
    //如果runningAsyncCalls數量已經達到門檻值,終止周遊
    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
  }
}
           

可以看出

promoteCalls()

方法就是試圖去readyAsyncCalls中取出Call來加入runningAsyncCalls中執行。是以上面的兩個finished方法調用方式的差別也就明晰了。同步調用結束因為并沒有涉及到runningAsyncCalls中的任何東西,對runningAsyncCalls沒任何影響,是以不需要調用promoteCalls。而異步的調用結束意味着runningAsyncCalls中會出現一個空位值,是以它會調用promoteCalls去嘗試從readyAsyncCalls中拉一個進來。

總結

好啦 到這裡整個dispatcher的排程分析算是完成了。總結起來其實他就是維護了三個隊列,三個隊列中包含了正在執行或者将要執行的所有請求。總結起來就是:

  1. 當發送一個異步請求時:如果runningAsyncCalls沒達到門檻值,那麼會将這個請求加入到runningAsyncCalls立即執行,否則會将這個請求加入到readyAsyncCalls中等待執行。當一個異步請求執行完畢時會試圖去執行readyAsyncCalls中的請求。
  2. 當發送一個同步請求時:該請求會直接加入到runningSyncCalls中,并且馬上開始執行,注意這個執行并不是由Dispatcher排程的。
  3. 所有異步執行的請求都會通過executorService線程池來執行,這是個懶漢方式建立的線程池。
整個大緻的流程
OkHttp Dispatcher的排程過程分析

歡迎閱讀下一篇interceptor原了解析