天天看點

OkHttp3源碼詳解(六)Okhttp任務隊列工作原理

1 概述

1.1 引言

android

完成非阻塞式的異步請求的時候都是通過啟動子線程的方式來解決,子線程執行完任務的之後通過

handler

的方式來和主線程來完成通信。無限制的建立線程,會給系統帶來大量的開銷。如果在高并發的任務下,啟用個線程池,可以不斷的複用裡面不再使用和有效的管理線程的排程和數量的管理。就可以節省系統的成本,有效的提高執行效率。

1.2 線程池ThreadPoolExecutor

okhttp的線程池對象存在于Dispatcher類中。執行個體過程如下

1.  public  synchronized  ExecutorService executorService()  {
2.  if  (executorService ==  null)  {
3.  executorService =  ,  Integer.MAX_VALUE,  ,  TimeUnit.SECONDS,
4.  new  SynchronousQueue<Runnable>(),  Util.threadFactory("OkHttp Dispatcher",  false));
5.  }
6.  return executorService;
7.  }
           

1.2 Call對象

了解源碼或使用過

okhttp

的都知道。

okttp

的操作元是Call對象。異步的實作是

RealCall.AsyncCall

。而

AsyncCall

是實作的一個

Runnable

接口。

1.  final  class  AsyncCall  extends  NamedRunnable  {}
           

是以Call本質就是一個

Runable

線程操作元肯定是放進excutorService中直接啟動的。

2 線程池的複用和管理

2.1 圖解

為了完成排程和複用,定義了兩個隊列分别用作等待隊列和執行任務的隊列。這兩個隊列都是

Dispatcher

成員變量。Dispatcher是一個控制執行,控制所有Call的分發和任務的排程、通信、清理等操作。這裡隻介紹異步排程任務。

1.  /** Ready async calls in the order they'll be run. */
2.  private  final  Deque<AsyncCall> readyAsyncCalls =  new  ArrayDeque<>();

4.  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
5.  private  final  Deque<AsyncCall> runningAsyncCalls =  new  ArrayDeque<>();
           

《okhttp連接配接池複用機制》

文章中我們在緩存Connection連接配接的時候也是使用的Deque雙端隊列。這裡同樣的方式,可以友善在隊列頭添加元素,移除尾部的元素。

2.2 過程分析

Call

代用

equeue

方法的時候

1.  synchronized  void enqueue(AsyncCall call)  {
2.  if  (runningAsyncCalls.size()  < maxRequests && runningCallsForHost(call)  < maxRequestsPerHost)  {
3.  runningAsyncCalls.add(call);
4.  executorService().execute(call);
5.  }  else  {
6.  readyAsyncCalls.add(call);
7.  }
8.  }
           

方法中滿足執行隊列裡面不足最大線程數

maxRequests

并且Call對應的host數目不超過

maxRequestsPerHost

的時候直接把call對象直接推入到執行隊列裡,并啟動線程任務(

Call

本質是一個

Runnable

)。否則,目前線程數過多,就把他推入到等待隊列中。

Call

執行完肯定需要在

runningAsyncCalls

隊列中移除這個線程。那麼

readyAsyncCalls

隊列中的線程在什麼時候才會被執行呢。

追溯下

AsyncCall

線程的執行方法

1.  @Override
2.  protected  void execute()  {
3.  boolean signalledCallback =  false;
4.  try  {
5.  Response response = getResponseWithInterceptorChain(forWebSocket);
6.  if  (canceled)  {
7.  signalledCallback =  true;
8.  responseCallback.onFailure(RealCall.this,  new  IOException("Canceled"));
9.  }  else  {
10.  signalledCallback =  true;
11.  responseCallback.onResponse(RealCall.this, response);
12.  }
13.  }  catch  (IOException e)  {
14.  if  (signalledCallback)  {
15.  // Do not signal the callback twice!
16.  Platform.get().log(INFO,  "Callback failure for "  + toLoggableString(), e);
17.  }  else  {
18.  responseCallback.onFailure(RealCall.this, e);
19.  }
20.  }  finally  {
21.  client.dispatcher().finished(this);
22.  }
23.  }
24.  }
           

這裡做了核心request的動作,并把失敗和回複資料的結果通過

responseCallback

回調到Dispatcher。執行操作完畢了之後不管有無異常都會進入到

dispactcher

finished

方法。

1.  private  <T>  void finished(Deque<T> calls, T call,  boolean promoteCalls)  {
2.  int runningCallsCount;
3.  Runnable idleCallback;
4.  synchronized  (this)  {
5.  if  (!calls.remove(call))  throw  new  AssertionError("Call wasn't in-flight!");
6.  if  (promoteCalls) promoteCalls();
7.  runningCallsCount = runningCallsCount();
8.  idleCallback =  this.idleCallback;
9.  }

11.  && idleCallback !=  null)  {
12.  idleCallback.run();
13.  }
14.  }
           

在這裡call在runningAsyncCalls隊列中被移除了,重新計算了目前正在執行的線程數量。并且調用了

promoteCalls()

看來是來調整任務隊列的,跟進去看下

1.  private  void promoteCalls()  {
2.  if  (runningAsyncCalls.size()  >= maxRequests)  return;  // Already running max capacity.
3.  if  (readyAsyncCalls.isEmpty())  return;  // No ready calls to promote.

5.  for  (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext();  )  {
6.  AsyncCall call = i.next();

8.  if  (runningCallsForHost(call)  < maxRequestsPerHost)  {
9.  i.remove();
10.  runningAsyncCalls.add(call);
11.  executorService().execute(call);
12.  }

14.  if  (runningAsyncCalls.size()  >= maxRequests)  return;  // Reached max capacity.
15.  }
16.  }
           

原來實在這裡對

readyAsyncCalls

進行排程的。最終會在

readyAsyncCalls

中通過

remove

操作把元素疊代取出并移除之後加入到runningAsyncCalls的執行隊列中執行操作。

ArrayDeque

是非線程安全的是以

finished

在調用

promoteCalls

的時候都在

synchronized

塊中執行的。執行等待隊列線程當然的前提是

runningAsyncCalls

線程數沒有超上線,而且等待隊列裡面有等待的任務。

以上完成了線程線程池的複用和線程的管理工作。

小結,Call在執行任務通過Dispatcher把單元任務優先推到執行隊列裡進行操作,如果操作完成再執行等待隊列的任務。

原文連結:

https://www.bbsmax.com/A/VGzlBLg85b/

繼續閱讀