天天看點

OkHttp 源碼解析(2) Dispatcher 工作機制

上一節說到,發送一個請求會到RealCall的execute()和enqueue(CallBack callBack) 方法中,首先看一下這倆個方法的實作。

@Override public Response execute() throws IOException {
  synchronized (this) {// 判斷這個Call有沒有執行過,每一個Call隻能執行一次
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  try {
    client.dispatcher().executed(this);// 最終交給client的任務排程器處理
    Response result = getResponseWithInterceptorChain();// 使用攔截器處理傳回結果
    if (result == null) throw new IOException("Canceled");
    return result;
  } finally {
    client.dispatcher().finished(this);
  }
}
           

再看看異步的方法

@Override 
public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  client.dispatcher().enqueue(new AsyncCall(responseCallback));// 還是交給dispatcher處理
}
           

AsynCall 是RealCall的内部類,實作NameRunnable接口,NameRunable為Runnable的實作,在run()方法裡會執行execute()抽象方法,是以線上程池運作AsynCall時會調用AsynCall的execute()方法,下面看一下AsynCall的execute()方法

@Override 
protected void execute() {
  boolean signalledCallback = false;
  try {
    Response response = getResponseWithInterceptorChain();
    if (retryAndFollowUpInterceptor.isCanceled()) {
      signalledCallback = true;
      responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
    } else {
      signalledCallback = true;
      responseCallback.onResponse(RealCall.this, response);
    }
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    client.dispatcher().finished(this);
  }
}
           

成功或者失敗會調用responseCallBack的onResponse()、onFailure()方法;

既然請求最終是Dispatcher處理,那麼就看一下Dispatcher

首先,Dispatcher維護三個陣列,等待運作的異步Calls陣列,正在執行的異步Calls陣列,同步的Calls陣列

/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
           

Dispatcher的同步方法

/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);// 将RealCall放入執行陣列中
}
           

Dispatcher的異步方法

synchronized void enqueue(AsyncCall call) {
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    runningAsyncCalls.add(call);// 将Call放入正在執行的異步陣列中
    executorService().execute(call);// 線程池中執行這個Call
  } else {
    readyAsyncCalls.add(call);// Call 添加到readyAsyncCalls 陣列中,等待條件滿足執行
  }
}
           

異步回調會調用promoteCall,從等待隊列加入運作隊列并開始執行

private void promoteCalls() {  
  if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.  
  if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.  

  for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {  
    AsyncCall call = i.next();  

    if (runningCallsForHost(call) < maxRequestsPerHost) {  
      i.remove();  
      runningAsyncCalls.add(call);  
      executorService().execute(call);  
    }  

    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.  
  }  
}  
           

Dispatcher大概這些内容。

Dispatcher發送完請求後,接着就是通過強大的攔截器Intercepter對Response的一些處理

在擷取response時,可以看到都會調用

Response result = getResponseWithInterceptorChain();

Response getResponseWithInterceptorChain() throws IOException {  
  // Build a full stack of interceptors.  
  List<Interceptor> interceptors = new ArrayList<>();  
  interceptors.addAll(client.interceptors());  
  interceptors.add(retryAndFollowUpInterceptor);  
  interceptors.add(new BridgeInterceptor(client.cookieJar()));//call request 與network處理  
  interceptors.add(new CacheInterceptor(client.internalCache()));//緩存處理  
  interceptors.add(new ConnectInterceptor(client));//連接配接處理  
  if (!forWebSocket) {  
    interceptors.addAll(client.networkInterceptors());  
  }  
  interceptors.add(new CallServerInterceptor(forWebSocket));  

  Interceptor.Chain chain = new RealInterceptorChain(  
      interceptors, null, null, null, , originalRequest);  
  return chain.proceed(originalRequest);  
}  
           

攔截器的工作機制下一節,再做研究吧!

繼續閱讀