1 概述
1.1 引言
android
完成非阻塞式的異步請求的時候都是通過啟動子線程的方式來解決,子線程執行完任務的之後通過 handler
的方式來和主線程來完成通信。無限制的建立線程,會給系統帶來大量的開銷。如果在高并發的任務下,啟用個線程池,可以不斷的複用裡面不再使用和有效的管理線程的排程和數量的管理。就可以節省系統的成本,有效的提高執行效率。
1.2 線程池ThreadPoolExecutor
okhttp的線程池對象存在于Dispatcher類中。執行個體過程如下
1. public synchronized ExecutorService executorService() {
2. if (executorService == null) {
3. executorService = , Integer.MAX_VALUE, , TimeUnit.SECONDS,
4. new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
5. }
6. return executorService;
7. }
1.2 Call對象
了解源碼或使用過
okhttp
的都知道。
okttp
的操作元是Call對象。異步的實作是
RealCall.AsyncCall
。而
AsyncCall
是實作的一個
Runnable
接口。
1. final class AsyncCall extends NamedRunnable {}
是以Call本質就是一個
Runable
線程操作元肯定是放進excutorService中直接啟動的。
2 線程池的複用和管理
2.1 圖解
為了完成排程和複用,定義了兩個隊列分别用作等待隊列和執行任務的隊列。這兩個隊列都是
Dispatcher
成員變量。Dispatcher是一個控制執行,控制所有Call的分發和任務的排程、通信、清理等操作。這裡隻介紹異步排程任務。
1. /** Ready async calls in the order they'll be run. */
2. private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
4. /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
5. private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
在
《okhttp連接配接池複用機制》文章中我們在緩存Connection連接配接的時候也是使用的Deque雙端隊列。這裡同樣的方式,可以友善在隊列頭添加元素,移除尾部的元素。
2.2 過程分析
Call
代用
equeue
方法的時候
1. synchronized void enqueue(AsyncCall call) {
2. if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
3. runningAsyncCalls.add(call);
4. executorService().execute(call);
5. } else {
6. readyAsyncCalls.add(call);
7. }
8. }
方法中滿足執行隊列裡面不足最大線程數
maxRequests
并且Call對應的host數目不超過
maxRequestsPerHost
的時候直接把call對象直接推入到執行隊列裡,并啟動線程任務(
Call
本質是一個
Runnable
)。否則,目前線程數過多,就把他推入到等待隊列中。
Call
執行完肯定需要在
runningAsyncCalls
隊列中移除這個線程。那麼
readyAsyncCalls
隊列中的線程在什麼時候才會被執行呢。
追溯下
AsyncCall
線程的執行方法
1. @Override
2. protected void execute() {
3. boolean signalledCallback = false;
4. try {
5. Response response = getResponseWithInterceptorChain(forWebSocket);
6. if (canceled) {
7. signalledCallback = true;
8. responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
9. } else {
10. signalledCallback = true;
11. responseCallback.onResponse(RealCall.this, response);
12. }
13. } catch (IOException e) {
14. if (signalledCallback) {
15. // Do not signal the callback twice!
16. Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
17. } else {
18. responseCallback.onFailure(RealCall.this, e);
19. }
20. } finally {
21. client.dispatcher().finished(this);
22. }
23. }
24. }
這裡做了核心request的動作,并把失敗和回複資料的結果通過
responseCallback
回調到Dispatcher。執行操作完畢了之後不管有無異常都會進入到
dispactcher
的
finished
方法。
1. private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
2. int runningCallsCount;
3. Runnable idleCallback;
4. synchronized (this) {
5. if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
6. if (promoteCalls) promoteCalls();
7. runningCallsCount = runningCallsCount();
8. idleCallback = this.idleCallback;
9. }
11. && idleCallback != null) {
12. idleCallback.run();
13. }
14. }
在這裡call在runningAsyncCalls隊列中被移除了,重新計算了目前正在執行的線程數量。并且調用了
promoteCalls()
看來是來調整任務隊列的,跟進去看下
1. private void promoteCalls() {
2. if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
3. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
5. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
6. AsyncCall call = i.next();
8. if (runningCallsForHost(call) < maxRequestsPerHost) {
9. i.remove();
10. runningAsyncCalls.add(call);
11. executorService().execute(call);
12. }
14. if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
15. }
16. }
原來實在這裡對
readyAsyncCalls
進行排程的。最終會在
readyAsyncCalls
中通過
remove
操作把元素疊代取出并移除之後加入到runningAsyncCalls的執行隊列中執行操作。
ArrayDeque
是非線程安全的是以
finished
在調用
promoteCalls
的時候都在
synchronized
塊中執行的。執行等待隊列線程當然的前提是
runningAsyncCalls
線程數沒有超上線,而且等待隊列裡面有等待的任務。
以上完成了線程線程池的複用和線程的管理工作。
小結,Call在執行任務通過Dispatcher把單元任務優先推到執行隊列裡進行操作,如果操作完成再執行等待隊列的任務。
原文連結:
https://www.bbsmax.com/A/VGzlBLg85b/