package java.util.concurrent;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.security.AccessControlContext;
import java.security.ProtectionDomain;
import java.security.Permissions;
/**
* 一個ExecutorService運作ForkJoinTask 任務。
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
* {@code ForkJoinPool}為非{@code ForkJoinTask}用戶端提供送出入口,同時也提供管理和監控操作。
* A {@code ForkJoinPool} provides the entry point for submissions
* from non-{@code ForkJoinTask} clients, as well as management and
* monitoring operations.
*
* {@code ForkJoinPool}與其他類型的{@link ExecutorService}的差別主要在于采用了<em>work-steal </em>:
* <p>A {@code ForkJoinPool} differs from other kinds of {@link
* ExecutorService} mainly by virtue of employing
* <em>work-stealing</em>:
* 池中的所有線程都試圖查找和執行送出給池和/或由其他活動任務建立的任務
* (如果不存在工作,則最終阻塞等待工作)。
* all threads in the pool attempt to find and
* execute tasks submitted to the pool and/or created by other active
* tasks (eventually blocking waiting for work if none exist). This
* 當大多數任務生成其他子任務時(就像大多數{@code ForkJoinTask}一樣),
* 以及許多小任務從外部用戶端送出到池時,這就支援了高效的處理。
* enables efficient processing when most tasks spawn other subtasks
* (as do most {@code ForkJoinTask}s), as well as when many small
* tasks are submitted to the pool from external clients. Especially
* 特别是在構造函數中将<em>asyncMode</em>設定為true時,{@code ForkJoinPool}
* 可能也适合用于從未加入的事件類型任務。
* when setting <em>asyncMode</em> to true in constructors, {@code
* ForkJoinPool}s may also be appropriate for use with event-style
* tasks that are never joined.
*
* 靜态{@link #commonPool()}是可用的,并且适合于大多數應用程式。
* <p>A static {@link #commonPool()} is available and appropriate for
* most applications.
* 公共池由任何沒有顯式送出到指定池的ForkJoinTask使用。
* The common pool is used by any ForkJoinTask that
* is not explicitly submitted to a specified pool. Using the common
* 使用公共池通常會減少資源的使用(它的線程在不使用期間緩慢地回收,在随後使用時恢複)。
* pool normally reduces resource usage (its threads are slowly
* reclaimed during periods of non-use, and reinstated upon subsequent
* use).
*
* 對于需要單獨或自定義池的應用程式,可以使用給定的目标并行度級别構造{@code ForkJoinPool};
* 預設情況下,等于可用處理器的數量。
* <p>For applications that require separate or custom pools, a {@code
* ForkJoinPool} may be constructed with a given target parallelism
* level; by default, equal to the number of available processors. The
* 池嘗試通過動态添加、挂起或恢複内部工作線程來維護足夠的活動(或可用)線程,
* 即使有些任務因等待加入其他任務而停止。
* pool attempts to maintain enough active (or available) threads by
* dynamically adding, suspending, or resuming internal worker
* threads, even if some tasks are stalled waiting to join others.
* 但是,在面對阻塞的I/O或其他非托管同步時,不能保證這樣的調整。
* 嵌套的{@link ManagedBlocker}接口允許擴充所适應的同步類型。
* However, no such adjustments are guaranteed in the face of blocked
* I/O or other unmanaged synchronization. The nested {@link
* ManagedBlocker} interface enables extension of the kinds of
* synchronization accommodated.
*
* 除了執行和生命周期控制方法之外,這個類還提供狀态檢查方法(例如{@link #getStealCount}),
* 用于幫助開發、調優和監視fork/join應用程式。
* <p>In addition to execution and lifecycle control methods, this
* class provides status check methods (for example
* {@link #getStealCount}) that are intended to aid in developing,
* tuning, and monitoring fork/join applications. Also, method
* 而且,方法{@link #toString}以友善非正式監視的形式傳回池狀态訓示。
* {@link #toString} returns indications of pool state in a
* convenient form for informal monitoring.
*
* 與其他ExecutorServices一樣,下表總結了三種主要的任務執行方法。
* <p>As is the case with other ExecutorServices, there are three
* main task execution methods summarized in the following table.
* 它們主要用于目前池中尚未參與fork/join計算的用戶端。
* These are designed to be used primarily by clients not already
* engaged in fork/join computations in the current pool. The main
* 這些方法的主要形式接受{@code ForkJoinTask}的執行個體,但是重載的形式也
* 允許混合執行普通的{@code Runnable}或{@code Callable}。
* forms of these methods accept instances of {@code ForkJoinTask},
* but overloaded forms also allow mixed execution of plain {@code
* Runnable}- or {@code Callable}- based activities as well. However,
* 但是,已經在池中執行的任務通常應該使用表中列出的計算内表單,除非使用通常
* 不連接配接的異步事件樣式的任務,在這種情況下,方法的選擇幾乎沒有差別。
* tasks that are already executing in a pool should normally instead
* use the within-computation forms listed in the table unless using
* async event-style tasks that are not usually joined, in which case
* there is little difference among choice of methods.
*
* <table BORDER CELLPADDING=3 CELLSPACING=1>
* <caption>Summary of task execution methods</caption>
* <tr>
* <td></td>
* <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
* <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
* </tr>
* <tr>
* <td> <b>Arrange async execution</b></td>
* <td> {@link #execute(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork}</td>
* </tr>
* <tr>
* <td> <b>Await and obtain result</b></td>
* <td> {@link #invoke(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#invoke}</td>
* </tr>
* <tr>
* <td> <b>Arrange exec and obtain Future</b></td>
* <td> {@link #submit(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
* </tr>
* </table>
*
* 公共池預設情況下是用預設參數構造的,但是可以通過設定三個
* {@linkplain System#getProperty System properties}來控制這些參數:
* <p>The common pool is by default constructed with default
* parameters, but these may be controlled by setting three
* {@linkplain System#getProperty system properties}:
* <ul>
* <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
* 并行級别,非負整數
* - the parallelism level, a non-negative integer
*
* <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
* ForkJoinWorkerThreadFactory 類名
* - the class name of a {@link ForkJoinWorkerThreadFactory}
*
* <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
* - the class name of a {@link UncaughtExceptionHandler}
* </ul>
*
* 如果一個SecurityManager存在且沒有指定工廠,則預設池使用一個工廠提供的線程不啟用Permissions 。
* If a {@link SecurityManager} is present and no factory is
* specified, then the default pool uses a factory supplying
* threads that have no {@link Permissions} enabled.
* 系統類加載器用于加載這些類。
* The system class loader is used to load these classes.
* 建立這些設定有任何錯誤,使用預設參數。
* Upon any error in establishing these settings, default parameters
* are used.
* 通過将parallelism屬性設定為0,和/或使用可能傳回{@code null}的工廠,
* 可以禁用或限制公共池中線程的使用。
* It is possible to disable or limit the use of threads in
* the common pool by setting the parallelism property to zero, and/or
* using a factory that may return {@code null}. However doing so may
* 但是這樣做可能會導緻未加入的任務永遠不會執行。
* cause unjoined tasks to never be executed.
*
* 實作注意事項 :此實作将運作的最大線程數限制為32767.嘗試建立大于最大數目
* 的池導緻IllegalArgumentException 。 ( (1 << 15) - 1)
* <p><b>Implementation notes</b>: This implementation restricts the
* maximum number of running threads to 32767. Attempts to create
* pools with greater than the maximum number result in
* {@code IllegalArgumentException}.
*
* 此實作僅在池關閉或内部資源耗盡時拒絕送出的任務(即抛出RejectedExecutionException )。
* <p>This implementation rejects submitted tasks (that is, by throwing
* {@link RejectedExecutionException}) only when the pool is shut down
* or internal resources have been exhausted.
*
* @since 1.7
* @author Doug Lea
*/
/**
* 1、外部送出任務時,如果線程在workQueues中對應的索引位置沒有workQueue,則會建立一個workQueue,
* 并放入workQueues中對應的索引位置。
*
* 2、注冊工作線程ForkJoinWorkerThread時,其構造方法會調用ForkJoinPool的registerWorker() 方法,
* 方面裡面會建立workQueue,并注冊到 workQueues中,最後傳回給 ForkJoinWorkerThread,然後線程
* 裡面也會儲存此 workQueue。線程建立時沒有初始化工作隊列,線程運作的時候才會調用growArray()方法
* 配置設定工作隊列ForkJoinTask<?>[]。
*
* 3、b = q.base, ((a.length - 1) & b 的作用是?
* 每取出一個元素base都會增加(擴容時也不會重置或者減少),每插入一個元素top也會增加,top - base 結果
* 為隊列中的任務個數,隻要 top - base 不大于隊列array的長度,那麼array中就還能存儲任務,base 一直增加,
* 任務多的時候就會超過a.length,但是任務也一直在被取出,是以隊列可以循環使用的,(a.length - 1) & b
* 計算任務儲存的位置。從隊列的中間索引位置開始,循環使用。
*
* 4、CountedCompleter 和 一般ForkJoinTask任務在等待任務完成上的不同,CountedCompleter隻能等待根任務完成,
* 而一般的ForkJoinTask可以等待子任務完成,是以join()在幫助任務完成時,一般的ForkJoinTask隻需要判斷其他
* 工作線程目前竊取的任務是否是等待完成的任務即可,而CountedCompleter 任務需要一級一級往上判斷是否是其子
* 任務,而不能通過判斷工作線程目前竊取的任務是否是目标任務。
*
* 5、join()方法進入等待之前,會進行補償,如果池中隻剩下一個活動的工作線程了,那麼會注冊新的工作線程去執行任務,
* 注冊成功後,線程才可以進入等待狀态,是以活動的線程數會大于 parallelism。
*
*/
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
/*
* 實作概述
* Implementation Overview
*
* 這個類和它的嵌套類提供了一組工作線程的主要功能和控制:
* This class and its nested classes provide the main
* functionality and control for a set of worker threads:
* 來自非fj(ForkJoin)線程的送出将進入送出隊列。
* Submissions from non-FJ threads enter into submission queues.
* Workers接受這些任務,并通常将它們分解為可能被其他workers竊取的子任務。
* Workers take these tasks and typically split them into subtasks
* that may be stolen by other workers.
*
* 優先級規則優先處理來自其自身隊列的任務(LIFO或FIFO,取決于模式),
* 然後随機FIFO處理其他隊列中的任務
* Preference rules give
* first priority to processing tasks from their own queues (LIFO
* or FIFO, depending on mode), then to randomized FIFO steals of
* tasks in other queues.
*
* 工作隊列
* WorkQueues
* ==========
*
* 大多數操作發生在工作竊取隊列中(在嵌套的類工作隊列中)。
* Most operations occur within work-stealing queues (in nested
* class WorkQueue).
* 這些特别形式的雙隊列隻能支援四種可能終端操作的三種,push,pop, and poll(又稱 偷),
* These are special forms of Deques that
* support only three of the four possible end-operations -- push,
* pop, and poll (aka steal), under the further constraints that
* 在進一步的限制下,push和pop隻能從擁有的線程中調用(或者,正如這裡所擴充的,在一個鎖中調用),
* 而poll可以從其他線程中調用。
* push and pop are called only from the owning thread (or, as
* extended here, under a lock), while poll may be called from
* other threads.
* 如果您不熟悉它們,那麼您可能想要閱讀Herlihy和Shavit的書《多處理器程式設計的藝術》
* (The Art of Multiprocessor programming),第16章對這些進行了更詳細的描述。
* (If you are unfamiliar with them, you probably
* want to read Herlihy and Shavit's book "The Art of
* Multiprocessor programming", chapter 16 describing these in
* more detail before proceeding.)
* 主要的工作竊取隊列設計與Chase和Lev的論文《"Dynamic Circular Work-Stealing Deque"
* 大緻相似,
* The main work-stealing queue
* design is roughly similar to those in the papers "Dynamic
* Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
* (http://research.sun.com/scalable/pubs/index.html) and
* "Idempotent work stealing" by Michael, Saraswat, and Vechev,
* PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
* See also "Correct and Efficient Work-Stealing for Weak Memory
* Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
* (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
* analysis of memory ordering (atomic, volatile etc) issues.
* 主要的差別最終源于GC需求,即我們盡可能快地取消插槽,以便在生成大量
* 任務的程式中保持盡可能小的記憶體占用。
* The main differences ultimately stem from GC requirements that we
* null out taken slots as soon as we can, to maintain as small a
* footprint as possible even in programs generating huge numbers
* of tasks.
* 為了實作這一點,我們将CAS仲裁pop和poll (steal)從索引(“base”和“top”)轉移到槽本身。
* To accomplish this, we shift the CAS arbitrating pop
* vs poll (steal) from being on the indices ("base" and "top") to
* the slots themselves.
* 是以,成功的pop和poll主要需要使用CAS把槽從非空設定為空。
* So, both a successful pop and poll
* mainly entail a CAS of a slot from non-null to null. Because
* 因為我們依賴引用的CAS操作,我們不需要在 base 或 top 上标記位。
* we rely on CASes of references, we do not need tag bits on base
* 它們是在任何基于循環數組的隊列中使用的簡單int(例如ArrayDeque)。
* or top. They are simple ints as used in any circular
* array-based queue (see for example ArrayDeque).
* 對索引的更新仍然必須以一種確定 top == base 意味着隊列為空的方式排序,
* 否則可能會在push、pop或poll尚未完全送出時使隊列顯示為非空。
* Updates to the
* indices must still be ordered in a way that guarantees that top
* == base means the queue is empty, but otherwise may err on the
* side of possibly making the queue appear nonempty when a push,
* pop, or poll have not fully committed.
* 注意,這意味着poll操作(單獨考慮)不是沒有等待的。
* Note that this means
* that the poll operation, considered individually, is not
* 一個小偷不能成功地繼續,直到另一個正在進行的小偷(或者,如果之前是空的,push)完成。
* wait-free. One thief cannot successfully continue until another
* in-progress one (or, if previously empty, a push) completes.
* 但是,總的來說,我們至少確定了機率性的非阻塞性。
* However, in the aggregate, we ensure at least probabilistic
* non-blockingness.
* 如果一個嘗試竊取失敗,小偷總是選擇一個不同的随機目标受害者嘗試下一步。
* If an attempted steal fails, a thief always
* chooses a different random victim target to try next. So, in
* 是以,為了讓一個小偷繼續前進,它足以完成任何正在進行的poll或任何空隊列上的push。
* order for one thief to progress, it suffices for any
* in-progress poll or new push on any empty queue to
* 這就是為什麼我們通常使用pollAt方法及其變體,它們隻在表面的base索引上嘗試一次,
* 或者考慮其他操作,而不是使用poll方法。
* complete. (This is why we normally use method pollAt and its
* variants that try once at the apparent base index, else
* consider alternative actions, rather than method poll.)
*
* 這種方法還支援一種使用者模式,在這種模式中,本地任務處理采用FIFO而不是LIFO順序,
* 隻需使用poll而不是pop。
* This approach also enables support of a user mode in which local
* task processing is in FIFO, not LIFO order, simply by using
* poll rather than pop.
* 這在從不連接配接任務的消息傳遞架構中非常有用。
* This can be useful in message-passing
* frameworks in which tasks are never joined.
* 但是,這兩種模式都不考慮親和性、負載、緩存位置等,是以很少在給定的機器上提供可能的最佳性能,
* 但是通過對這些因素進行平均,可以提供良好的吞吐量。
* However neither
* mode considers affinities, loads, cache localities, etc, so
* rarely provide the best possible performance on a given
* machine, but portably provide good throughput by averaging over
* 此外,即使我們确實試圖利用這些資訊,我們通常也沒有利用這些資訊的基礎。
* these factors. (Further, even if we did try to use such
* information, we do not usually have a basis for exploiting it.
* 例如,一些任務集從緩存親和性中獲益,而另一些任務集則受到緩存污染影響的損害。
* For example, some sets of tasks profit from cache affinities,
* but others are harmed by cache pollution effects.)
*
* 工作隊列也以類似的方式用于送出給池的任務。我們不能将這些任務混合在用
* 于竊取工作的隊列中(這将影響lifo/fifo處理)。
* WorkQueues are also used in a similar way for tasks submitted
* to the pool. We cannot mix these tasks in the same queues used
* for work-stealing (this would contaminate lifo/fifo
* processing).
* 相反,我們使用一種散列的形式,将送出隊列與送出線程随機關聯起來。
* Instead, we randomly associate submission queues
* with submitting threads, using a form of hashing. The
* ThreadLocalRandom探測值用作選擇現有隊列的散列代碼,可以在與其他送出者
* 争用時随機重新定位。
* ThreadLocalRandom probe value serves as a hash code for
* choosing existing queues, and may be randomly repositioned upon
* contention with other submitters.
* 本質上,送出者就像工作者一樣,除了他們被限制在執行他們送出的本地任務
* (或者在CountedCompleters的情況下,其他具有相同根任務)。
* In essence, submitters act
* like workers except that they are restricted to executing local
* tasks that they submitted (or in the case of CountedCompleters,
* others with the same root task).
* 但是,由于大多數共享/外部隊列操作比内部隊列操作更昂貴,而且在穩定狀态下,
* 外部送出者将與從業人員争奪CPU,是以如果所有從業人員都處于活動狀态,
* ForkJoinTask.join和相關方法将禁止它們重複地幫助處理任務。
* However, because most
* shared/external queue operations are more expensive than
* internal, and because, at steady state, external submitters
* will compete for CPU with workers, ForkJoinTask.join and
* related methods disable them from repeatedly helping to process
* tasks if all workers are active.
* 在共享模式下插入任務需要一個鎖(主要是在調整大小的情況下進行保護),
* 但是我們隻使用一個簡單的自旋鎖(在字段qlock中使用位),因為送出者遇到
* 繁忙的隊列時會繼續嘗試或建立其他隊列
* Insertion of tasks in shared
* mode requires a lock (mainly to protect in the case of
* resizing) but we use only a simple spinlock (using bits in
* field qlock), because submitters encountering a busy queue move
* on to try or create other queues -- they block only when
* 它們隻在建立和注冊新隊列時阻塞。
* creating and registering new queues.
*
* 管理
* Management
* ==========
*
* 工作竊取的主要吞吐量優勢源于分散控制 -- workers通常從他們自己或彼此那裡接受任務。
* The main throughput advantages of work-stealing stem from
* decentralized control -- workers mostly take tasks from
* themselves or each other. We cannot negate this in the
* 我們不能在執行其他管理職責時否定這一點。
* implementation of other management responsibilities. The main
* 避免瓶頸的主要政策是将幾乎所有本質上的原子控制狀态打包成兩個volatile變量,
* 這兩個變量最常被讀取(而不是寫入)作為狀态和一緻性檢查。
* tactic for avoiding bottlenecks is packing nearly all
* essentially atomic control state into two volatile variables
* that are by far most often read (not written) as status and
* consistency checks.
*
* 字段“ctl”包含64位,包含原子地添加、停用、(在事件隊列上)排隊、
* 退出隊列,和/或重新激活workers 所需的所有資訊。
* Field "ctl" contains 64 bits holding all the information needed
* to atomically decide to add, inactivate, enqueue (on an event
* queue), dequeue, and/or re-activate workers.
* 為了啟用這種封裝,我們将最大并行度限制為(1<<15)-1(遠遠超出正常的操作範圍),
* 以允許ids、計counts及其negations(用于門檻值設定)适合16位字段。
* To enable this
* packing, we restrict maximum parallelism to (1<<15)-1 (which is
* far in excess of normal operating range) to allow ids, counts,
* and their negations (used for thresholding) to fit into 16bit
* fields.
*
* 字段“plock”是一種帶有飽和關閉位(類似于每個隊列的“qlocks”)的序列鎖,
* 主要保護對工作隊列數組的更新,以及啟用shutdown。
* Field "plock" is a form of sequence lock with a saturating
* shutdown bit (similarly for per-queue "qlocks"), mainly
* protecting updates to the workQueues array, as well as to
* enable shutdown.
* 當作為鎖使用時,它通常隻被短暫地持有,是以在短暫的旋轉之後幾乎總是可用,
* 但是我們在需要時使用基于螢幕的備份政策來阻塞。
* When used as a lock, it is normally only very
* briefly held, so is nearly always available after at most a
* brief spin, but we use a monitor-based backup strategy to
* block when needed.
*
* 記錄工作隊列。工作隊列記錄在“workQueues”數組中,該數組在首次使用時建立,
* 并在必要時擴充。
* Recording WorkQueues. WorkQueues are recorded in the
* "workQueues" array that is created upon first use and expanded
* if necessary.
* 在記錄新workers和移除終止worker時,對數組的更新通過一個鎖互相保護,
* 但是數組是可并發讀的,并且可以直接通路。
* Updates to the array while recording new workers
* and unrecording terminated ones are protected from each other
* by a lock but the array is otherwise concurrently readable, and
* accessed directly.
* 為了簡化基于索引的操作,數組大小總是2的幂,并且所有的讀取器必須容忍空槽。
* To simplify index-based operations, the
* array size is always a power of two, and all readers must
* tolerate null slots.
* 工作隊列的索引是奇數。共享(送出)隊列的索引是偶數的,最多64個槽,
* 即使數組需要擴充以添加更多的workers,也會限制增長。
* Worker queues are at odd indices. Shared
* (submission) queues are at even indices, up to a maximum of 64
* slots, to limit growth even if array needs to expand to add
* 以這種方式将它們組合在一起可以簡化和加速任務掃描。
* more workers. Grouping them together in this way simplifies and
* speeds up task scanning.
*
* 所有worker線程的建立都是按需的,由任務送出、終止workers的替換,
* 和/或阻塞工作的補償觸發。
* All worker thread creation is on-demand, triggered by task
* submissions, replacement of terminated workers, and/or
* compensation for blocked workers.
* 但是,所有其他支援代碼都被設定為與其他政策一起工作。
* However, all other support
* code is set up to work with other policies.
* 為了確定我們不持有worker引用(這會阻止GC),所有對工作隊列的通路都是
* 通過對工作隊列數組的索引進行的(這是一些混亂代碼結構的一個來源)。
* To ensure that we
* do not hold on to worker references that would prevent GC, ALL
* accesses to workQueues are via indices into the workQueues
* array (which is one source of some of the messy code
* constructions here).
* 實際上,workQueues數組是一種弱引用機制。是以,例如,ctl的等待隊列字段存儲索引,
* 而不是引用。
* In essence, the workQueues array serves as
* a weak reference mechanism. Thus for example the wait queue
* field of ctl stores indices, not references.
* 對相關方法(例如signalWork)中的工作隊列的通路必須同時進行索引檢查和空檢查IDs。
* Access to the
* workQueues in associated methods (for example signalWork) must
* both index-check and null-check the IDs.
* 所有這些通路都通過提前傳回來忽略壞的IDs,因為這隻與終止相關,
* 在這種情況下,放棄是可以的。
* All such accesses
* ignore bad IDs by returning out early from what they are doing,
* since this can only be associated with termination, in which
* case it is OK to give up.
* 工作隊列數組的所有用法還将檢查它是否為非空(即使以前是非空)。
* All uses of the workQueues array
* also check that it is non-null (even if previously
* 這允許在終止期間為空,這是目前不需要的,但仍然是基于資源撤銷的shutdown方案的一個選項。
* non-null). This allows nulling during termination, which is
* currently not necessary, but remains an option for
* resource-revocation-based shutdown schemes. It also helps
* 它還有助于減少異常陷阱代碼的JIT釋出,這往往會使某些方法中的控制流變得不必要地複雜。
* reduce JIT issuance of uncommon-trap code, which tends to
* unnecessarily complicate control flow in some methods.
*
* 事件隊列。與HPC的工作竊取架構不同,我們不能讓workers在無法立即找到任務
* 的情況下無限期地掃描任務,并且我們不能啟動/恢複workers,除非出現可用的任務。
* Event Queuing. Unlike HPC work-stealing frameworks, we cannot
* let workers spin indefinitely scanning for tasks when none can
* be found immediately, and we cannot start/resume workers unless
* there appear to be tasks available. On the other hand, we must
* 另一方面,在送出或生成新任務時,我們必須快速地促使它們采取行動。
* quickly prod them into action when new tasks are submitted or
* 在許多情況下,激活worker的啟動時間是總體性能的主要限制因素
* (在程式啟動時,JIT編譯和配置設定會加劇這種限制)。
* generated. In many usages, ramp-up time to activate workers is
* the main limiting factor in overall performance (this is
* compounded at program start-up by JIT compilation and
* 是以我們盡可能地簡化它。
* allocation). So we try to streamline this as much as possible.
* 當workers找不到工作時,我們将他們放入事件等待隊列中,然後讓他們park/unpark。
* We park/unpark workers after placing in an event wait queue
* when they cannot find work.
* 這個“queue”實際上是一個簡單的Treiber堆棧,以ctl的“id”字段為首,加上一個15位的
* 計數器值(它反映了一個worker被滅活的次數)來避免ABA影響(我們隻需要像worker線程
* 一樣多的版本号)。
* This "queue" is actually a simple
* Treiber stack, headed by the "id" field of ctl, plus a 15bit
* counter value (that reflects the number of times a worker has
* been inactivated) to avoid ABA effects (we need only as many
* version numbers as worker threads). Successors are held in
* Successors被WorkQueue.nextWait 字段儲存。
* field WorkQueue.nextWait.
* Queuing處理幾個固有的競争,主要是一個任務生産線程可能看不到(和signalling)
* 另一個線程放棄尋找工作,但還沒有進入等待隊列。
* Queuing deals with several intrinsic
* races, mainly that a task-producing thread can miss seeing (and
* signalling) another thread that gave up looking for work but
* has not yet entered the wait queue.
* 我們通過在新等待工作者被添加到等待隊列之前和之後都需要對所有workers進行
* 全面掃描(通過反複調用方法scan())來解決這個問題。
* We solve this by requiring
* a full sweep of all workers (via repeated calls to method
* scan()) both before and after a newly waiting worker is added
* to the wait queue.
* 因為排隊的workers實際上可能在重新掃描而不是等待,是以我們設定并清除工作隊列的“parker”字段,
* 以減少不必要的取消unpark的調用。
* Because enqueued workers may actually be
* rescanning rather than waiting, we set and clear the "parker"
* field of WorkQueues to reduce unnecessary calls to unpark.
* 這需要再次檢查,以避免錯過信号。
* (This requires a secondary recheck to avoid missed signals.)
* 請注意:關于Thread.interrupts在parking和其他阻塞周圍的不同尋常的約定:
* Note the unusual conventions about Thread.interrupts
* surrounding parking and other blocking:
* 因為中斷隻用于警示線程檢查終止,這是在阻塞時無論如何都進行檢查,我們在任何
* 調用park之前清除狀态(使用Thread.interrupted),是以park并不會立即傳回
* 由于狀态被設定通過一些其他不相關的使用者代碼中調用中斷。
* Because interrupts are
* used solely to alert threads to check termination, which is
* checked anyway upon blocking, we clear status (using
* Thread.interrupted) before any call to park, so that park does
* not immediately return due to status being set via some other
* unrelated call to interrupt in user code.
*
* 發信号。 隻有當出現至少有一個任務他們能夠找到并執行時,我們才會
* 建立或喚醒workers。
* Signalling. We create or wake up workers only when there
* appears to be at least one task they might be able to find and
* execute.
* 當一個送出被添加,或者另一個worker将一個任務添加到一個少于兩個任務
* 的隊列中時,它們就通知等待的worker(或者如果少于給定的并行度級别,
* 就觸發建立新任務——signalWork)。
* When a submission is added or another worker adds a
* task to a queue that has fewer than two tasks, they signal
* waiting workers (or trigger creation of new ones if fewer than
* the given parallelism level -- signalWork). These primary
* 當其他線程從隊列中删除一個任務并注意到隊列中還有其他任務時,
* 這些主信号将得到其他線程的支援。
* signals are buttressed by others whenever other threads remove
* a task from a queue and notice that there are other tasks there
* 是以,總體而言,池将會被過度通知。
* as well. So in general, pools will be over-signalled. On most
* 在大多數平台上,信号(unpark)開銷時間非常長,而且從向線程發出信号到
* 它實際取得進展之間的時間間隔非常長,是以值得盡可能多地消除關鍵路徑上的這些延遲。
* platforms, signalling (unpark) overhead time is noticeably
* long, and the time between signalling a thread and it actually
* making progress can be very noticeably long, so it is worth
* offloading these delays from critical paths as much as
* possible.
* 此外,隻要workers看到ctl的狀态發生變化,他們就會保持活躍,逐漸地向下旋轉。
* 類似的穩定性感覺技術也被用于阻塞之前的awaitJoin和helpComplete。
* Additionally, workers spin-down gradually, by staying
* alive so long as they see the ctl state changing. Similar
* 類似的穩定性感覺技術也被用于阻塞之前的awaitJoin和helpComplete。
* stability-sensing techniques are also used before blocking in
* awaitJoin and helpComplete.
*
* 削減workers. 在一段時間不使用後釋放資源,當池靜止時worker開始等待,并且如果
* 池在給定的時期保持靜止,worker将會逾時并且終止 -- 如果線程數大于并行度,則周期較短,
* 如果線程數減少,則周期較長。
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
* time out and terminate if the pool has remained quiescent for a
* given period -- a short period if there are more threads than
* parallelism, longer as the number of threads decreases. This
* 這将慢慢傳播,最終在一段時間的不使用後終止所有的workers。
* will slowly propagate, eventually terminating all workers after
* periods of non-use.
*
* Shutdown 和 Termination. 調用shutdownNow會原子地設定plock位,然後(非原子地)設定
* 每個worker的qlock狀态,取消所有未處理的任務,并喚醒所有等待的worker。
* Shutdown and Termination. A call to shutdownNow atomically sets
* a plock bit and then (non-atomically) sets each worker's
* qlock status, cancels all unprocessed tasks, and wakes up
* all waiting workers.
* 檢測是否應該在非突然shutdown()調用後開始終止需要更多的工作并記帳。
* Detecting whether termination should
* commence after a non-abrupt shutdown() call requires more work
* 我們需要對平靜達成共識。(比如,沒有更多的工作)。
* and bookkeeping. We need consensus about quiescence (i.e., that
* there is no more work).
* 活動計數提供了一個主要的訓示,但非突然shutdown仍然需要重新檢查掃描的任何
* 不活動的但沒有排隊的workers。
* The active count provides a primary
* indication but non-abrupt shutdown still requires a rechecking
* scan for any workers that are inactive but not queued.
*
* 加入任務
* Joining Tasks
* =============
*
* 當一個worker正在等待加入被另一個worker竊取(或總是持有)的任務時,可以采取以下任何一種操作。
* Any of several actions may be taken when one worker is waiting
* to join a task stolen (or always held) by another. Because we
* 因為我們将許多任務多路複用到一個workers池中,是以我們不能讓它們阻塞(如Thread.join)。
* are multiplexing many tasks on to a pool of workers, we can't
* just let them block (as in Thread.join).
* 我們也不能隻是用另一個重新配置設定joiner的運作時堆棧,然後替換它,這将是一種“延續”的形式,
* 即使可能也不一定是一個好主意,因為我們有時需要一個未阻塞的任務和它的延續來進行。
* We also cannot just
* reassign the joiner's run-time stack with another and replace
* it later, which would be a form of "continuation", that even if
* possible is not necessarily a good idea since we sometimes need
* both an unblocked task and its continuation to progress.
* 相反,我們結合了兩種政策:
* Instead we combine two tactics:
*
* 幫助: 安排連接配接程式執行一些任務,這些任務在未發生偷取時将運作。
* Helping: Arranging for the joiner to execute some task that it
* would be running if the steal had not occurred.
*
* 補償: 除非已經有足夠的活動線程,否則tryCompensate()方法可以
* 建立或重新激活一個備用線程,以補償阻塞的連接配接,直到它們解除阻塞為止。
* Compensating: Unless there are already enough live threads,
* method tryCompensate() may create or re-activate a spare
* thread to compensate for blocked joiners until they unblock.
*
* 第三種形式(在tryRemoveAndExec中實作)相當于幫助一個假設的補償器:
* A third form (implemented in tryRemoveAndExec) amounts to
* helping a hypothetical compensator:
* 如果我們可以很容易地判斷出補償器的一個可能動作是偷取并執行正在連接配接的任務,
* 那麼連接配接線程就可以直接這樣做,而不需要補償線程(盡管會以較大的運作時堆棧為代價,
* 但是權衡下通常是值得的)。
* If we can readily tell that
* a possible action of a compensator is to steal and execute the
* task being joined, the joining thread can do so directly,
* without the need for a compensation thread (although at the
* expense of larger run-time stacks, but the tradeoff is
* typically worthwhile).
*
* ManagedBlocker擴充API不能使用幫助,是以僅依賴于方法awaitBlocker中的補償。
* The ManagedBlocker extension API can't use helping so relies
* only on compensation in method awaitBlocker.
*
* tryHelpStealer中的算法需要一種“線性”幫助:
* The algorithm in tryHelpStealer entails a form of "linear"
* 每個worker記錄(在currentSteal字段中)它從其他某個worker那裡竊取的最近的任務。
* helping: Each worker records (in field currentSteal) the most
* recent task it stole from some other worker. Plus, it records
* 另外,它記錄(在currentJoin字段中)目前正在積極加入的任務。
* (in field currentJoin) the task it is currently actively
* joining.
* tryHelpStealer方法使用這些标記試圖找到一個worker來幫助(比如:從它那裡偷回一個任務并執行),
* 進而加速主動加入任務的完成。
* Method tryHelpStealer uses these markers to try to
* find a worker to help (i.e., steal back a task from and execute
* it) that could hasten completion of the actively joined task.
* 本質上,如果要加入的任務未被盜用,則joiner執行在它自己本地deque的任務。
* In essence, the joiner executes a task that would be on its own
* local deque had the to-be-joined task not been stolen. This may
* 這可能被視為Wagner & Calder在1993年出版的《跨越:一種實作高效期貨的可移植技術》
* (SIGPLAN notice, http://portal.acm.org/cit.cfm? id=155354)一書中所采用的方法的保守變體。
* be seen as a conservative variant of the approach in Wagner &
* Calder "Leapfrogging: a portable technique for implementing
* efficient futures" SIGPLAN Notices, 1993
* (http://portal.acm.org/citation.cfm?id=155354).
* 它的不同之處在于:(1)我們隻在workers之間維護對偷竊的依賴連結,而不是使用每個任務的記賬。
* It differs in
* that: (1) We only maintain dependency links across workers upon
* steals, rather than use per-task bookkeeping.
* 這有時需要對workQueues數組進行線性掃描來定位偷竊者,但通常不需要,
* 因為偷竊者會留下在哪裡定位他們的提示(這可能會過時/錯誤)。
* This sometimes
* requires a linear scan of workQueues array to locate stealers,
* but often doesn't because stealers leave hints (that may become
* stale/wrong) of where to locate them.
* 這隻是一個提示,因為一個worker可能有多個竊取,而提示隻記錄其中一個(通常是最新的)。
* It is only a hint
* because a worker might have had multiple steals and the hint
* records only one of them (usually the most current). Hinting
* 提示将成本隔離到需要它的時候,而不是增加每個任務的開銷。
* isolates cost to when it is needed, rather than adding to
* (2)它是“淺”的,忽略嵌套和潛在的循環互偷。
* per-task overhead. (2) It is "shallow", ignoring nesting and
* potentially cyclic mutual steals.
* 這是有意的:字段currentJoin隻在主動連接配接時更新,這意味着我們在長生命期任務、
* GC停止等期間會錯過鍊中的連結(這是可以的,因為在這種情況下阻塞通常是一個好主意)。
* (3) It is intentionally
* racy: field currentJoin is updated only while actively joining,
* which means that we miss links in the chain during long-lived
* tasks, GC stalls etc (which is OK since blocking in such cases
* (4) 我們限制了試圖找工作的數量(請參閱MAX_HELP),并退回到挂起該工作者,
* 如果需要的話,用另一個工作者替換它。
* is usually a good idea). (4) We bound the number of attempts
* to find work (see MAX_HELP) and fall back to suspending the
* worker and if necessary replacing it with another.
*
* CountedCompleters的幫助操作要簡單得多:方法helpComplete可以擷取和執行任何
* 正在等待的具有相同根的任務。
* Helping actions for CountedCompleters are much simpler: Method
* helpComplete can take and execute any task with the same root
* as the task being waited on.
* 然而,這仍然需要周遊一些完成器鍊,是以,與使用未顯式連接配接的CountedCompleters相比,效率更低。
* However, this still entails some
* traversal of completer chains, so is less efficient than using
* CountedCompleters without explicit joins.
*
* 在任何給定的時間都不可能準确地保持線程的目标并行度。
* It is impossible to keep exactly the target parallelism number
* of threads running at any given time. Determining the
* 确定保守安全的幫助目标的存在,已經建立的備件的可用性,以及建立新備件
* 的明顯需要都是有活力的,是以我們依賴于每個的多次重試。
* existence of conservatively safe helping targets, the
* availability of already-created spares, and the apparent need
* to create new spares are all racy, so we rely on multiple
* retries of each.
* 在明顯缺乏幫助機會的情況下,補償很難控制在jvm上,GC和其他活動會阻止任務的進展,
* 進而導緻許多其他依賴任務的停滞,而我們無法确定它們是否需要補償。
* Compensation in the apparent absence of
* helping opportunities is challenging to control on JVMs, where
* GC and other activities can stall progress of tasks that in
* turn stall out many other dependent tasks, without us being
* able to determine whether they will ever require compensation.
* 即使在存在比核心更多的線程的情況下,竊取工作幾乎不會導緻性能下降,但是在這種情況下,
* 積極地添加新線程會帶來不必要的正回報控制循環的風險,其中更多的線程會導緻更多的相關停頓
* (以及暢通的線程的延遲進度) 到我們知道它們可用的地步)導緻更多情況需要更多線程,依此類推。
* Even though work-stealing otherwise encounters little
* degradation in the presence of more threads than cores,
* aggressively adding new threads in such cases entails risk of
* unwanted positive feedback control loops in which more threads
* cause more dependent stalls (as well as delayed progress of
* unblocked threads to the point that we know they are available)
* leading to more situations requiring more threads, and so
* on.
* 這方面的控制可以被看作是一個(分析上棘手的)遊戲,對手可能會選擇最壞的(對我們來說)
* 活動線程在任何時候停止。
* This aspect of control can be seen as an (analytically
* intractable) game with an opponent that may choose the worst
* (for us) active thread to stall at any time. We take several
* 我們采取幾種預防措施來限制損失(進而限制收益),主要是在tryCompensate和awaitJoin方法中。
* precautions to bound losses (and thus bound gains), mainly in
* methods tryCompensate and awaitJoin.
*
* 公共池
* Common Pool
* ===========
*
* 靜态公共池在靜态初始化之後始終存在。
* The static common pool always exists after static
* initialization.
* 由于不需要使用它(或任何其他建立的池),是以我們将初始構造開銷和占用空間最小化到大
* 約十二個字段的設定,而沒有嵌套配置設定。
* Since it (or any other created pool) need
* never be used, we minimize initial construction overhead and
* footprint to the setup of about a dozen fields, with no nested
* 大多數bootstrapping發生在方法fullExternalPush第一次送出到池的時候。
* allocation. Most bootstrapping occurs within method
* fullExternalPush during the first submission to the pool.
*
* 當外部線程送出到公共池時,它們可以執行子任務處理(參見externalHelpJoin和相關方法)。
* When external threads submit to the common pool, they can
* perform subtask processing (see externalHelpJoin and related
* methods).
* 這種調用方幫助政策使得将公共池并行度級别設定為比可用核心總數少一個(或多個),
* 甚至為純調用方運作設定為零是明智的。
* This caller-helps policy makes it sensible to set
* common pool parallelism level to one (or more) less than the
* total number of available cores, or even zero for pure
* caller-runs.
* 我們不需要記錄外部送出是否指向公共池——如果不是,externalHelpJoin将快速傳回
* (最多幫助通知一些公共池workers)。
* We do not need to record whether external
* submissions are to the common pool -- if not, externalHelpJoin
* returns quickly (at the most helping to signal some common pool
* workers).
* 否則,這些送出者将被阻塞,等待完成,是以,在不适用的情況下,額外的工作
* (通過大量分散的任務狀态檢查)相當于在ForkJoinTask.join中阻塞前的一種奇怪的有限自旋等待。
* These submitters would otherwise be blocked waiting
* for completion, so the extra effort (with liberally sprinkled
* task status checks) in inapplicable cases amounts to an odd
* form of limited spin-wait before blocking in ForkJoinTask.join.
*
* As a more appropriate default in managed environments, unless
* overridden by system properties, we use workers of subclass
* InnocuousForkJoinWorkerThread when there is a SecurityManager
* present. These workers have no permissions set, do not belong
* to any user-defined ThreadGroup, and erase all ThreadLocals
* after executing any top-level task (see WorkQueue.runTask). The
* associated mechanics (mainly in ForkJoinWorkerThread) may be
* JVM-dependent and must access particular Thread class fields to
* achieve this effect.
*
* Style notes
* ===========
*
* There is a lot of representation-level coupling among classes
* ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
* fields of WorkQueue maintain data structures managed by
* ForkJoinPool, so are directly accessed. There is little point
* trying to reduce this, since any associated future changes in
* representations will need to be accompanied by algorithmic
* changes anyway. Several methods intrinsically sprawl because
* they must accumulate sets of consistent reads of volatiles held
* in local variables. Methods signalWork() and scan() are the
* main bottlenecks, so are especially heavily
* micro-optimized/mangled. There are lots of inline assignments
* (of form "while ((local = field) != 0)") which are usually the
* simplest way to ensure the required read orderings (which are
* sometimes critical). This leads to a "C"-like style of listing
* declarations of these locals at the heads of methods or blocks.
* There are several occurrences of the unusual "do {} while
* (!cas...)" which is the simplest way to force an update of a
* CAS'ed variable. There are also other coding oddities (including
* several unnecessary-looking hoisted null checks) that help
* some methods perform reasonably even when interpreted (not
* compiled).
*
* The order of declarations in this file is:
* (1) Static utility functions
* (2) Nested (static) classes
* (3) Static fields
* (4) Fields, along with constants used when unpacking some of them
* (5) Internal control methods
* (6) Callbacks and other support for ForkJoinTask methods
* (7) Exported methods
* (8) Static block initializing statics in minimally dependent order
*/
// Static utilities
/**
* If there is a security manager, makes sure caller has
* permission to modify threads.
*/
private static void checkPermission() {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(modifyThreadPermission);
}
// Nested classes
/**
* Factory for creating new {@link ForkJoinWorkerThread}s.
* A {@code ForkJoinWorkerThreadFactory} must be defined and used
* for {@code ForkJoinWorkerThread} subclasses that extend base
* functionality or initialize threads with different contexts.
*/
public static interface ForkJoinWorkerThreadFactory {
/**
* Returns a new worker thread operating in the given pool.
*
* @param pool the pool this thread works in
* @return the new worker thread
* @throws NullPointerException if the pool is null
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
/**
* 預設ForkJoinWorkerThreadFactory實作; 建立一個新的ForkJoinWorkerThread。
* Default ForkJoinWorkerThreadFactory implementation; creates a
* new ForkJoinWorkerThread.
*/
static final class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
/**
* 類,如果從WorkQueue.tryRemoveAndExec的内部隊列插槽中删除了用于替換本地連接配接目标的人工任務。
* Class for artificial tasks that are used to replace the target
* of local joins if they are removed from an interior queue slot
* in WorkQueue.tryRemoveAndExec. We don't need the proxy to
* 我們不需要代理來實際做任何事情,除了有一個唯一的身份。
* actually do anything beyond having a unique identity.
*/
static final class EmptyTask extends ForkJoinTask<Void> {
private static final long serialVersionUID = -7721805057305804111L;
EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
public final Void getRawResult() { return null; }
public final void setRawResult(Void x) {}
public final boolean exec() { return true; }
}
/**
* 支援工作竊取和外部任務送出的隊列。主要原理和算法見上面。
* Queues supporting work-stealing as well as external task
* submission. See above for main rationale and algorithms.
* 實作嚴重依賴于“Unsafe”的内部特性和“volatile”的選擇性使用:
* Implementation relies heavily on "Unsafe" intrinsics
* and selective use of "volatile":
*
* 字段“base”是最小有效隊列槽的索引(mod array.length),如果非空,它總是下一個要偷(poll)的位置。
* Field "base" is the index (mod array.length) of the least valid
* queue slot, which is always the next position to steal (poll)
* from if nonempty. Reads and writes require volatile orderings
* 讀和寫需要volatile順序,但不需要CAS,因為隻有在CAS更新slot後才會執行更新操作。
* but not CAS, because updates are only performed after slot
* CASes.
*
* 字段“top”是下一個要push或pop的隊列槽的索引(mod array.length)。
* Field "top" is the index (mod array.length) of the next queue
* slot to push to or pop from.
* 它的寫隻被所有者線程push,或者在持有鎖下通過外部/共享push,
* 其他線程在讀取(volatile)base之後才通路它。
* It is written only by owner thread
* for push, or under lock for external/shared push, and accessed
* by other threads only after reading (volatile) base. Both top
* “top”和“base”都可以在溢出時包圍,但是(top - base)(或者更常見的-(base - top)
* 強制在top之前 volatile讀base)仍然可以估計大小。
* and base are allowed to wrap around on overflow, but (top -
* base) (or more commonly -(base - top) to force volatile read of
* base before top) still estimates size.
* 鎖(“qlock”)在終止時強制為-1,導緻所有進一步的鎖嘗試失敗。(注意:終止狀态不需要CAS,因為在池關閉時,
* 所有共享隊列将停止使用。)
* The lock ("qlock") is
* forced to -1 on termination, causing all further lock attempts
* to fail. (Note: we don't need CAS for termination state because
* upon pool shutdown, all shared-queues will stop being used
* anyway.)
* 幾乎所有的鎖體都是這樣設定的:鎖體中的異常是“不可能的”(子產品化的JVM錯誤,無論如何都會導緻失敗)。
* Nearly all lock bodies are set up so that exceptions
* within lock bodies are "impossible" (modulo JVM errors that
* would cause failure anyway.)
*
* 數組槽的寫和讀使用 Unsafe提供的 volatiles/atomics 競争。
* The array slots are read and written using the emulation of
* volatiles/atomics provided by Unsafe. Insertions must in
* 插入通常必須使用putOrderedObject作為釋放存儲的一種形式,以確定對任務
* 對象的所有寫操作在隊列中釋出之前都是有序的。
* general use putOrderedObject as a form of releasing store to
* ensure that all writes to the task object are ordered before
* its publication in the queue.
* 是以的移除都需要使用CAS設定為null。這個數組的長度總是2的幂。
* All removals entail a CAS to
* null. The array is always a power of two. To ensure safety of
* 為了確定Unsafe的數組操作的安全性,所有通路都通過2的幂掩碼執行顯式空檢查和隐式界限檢查。
* Unsafe array operations, all accesses perform explicit null
* checks and implicit bounds checks via power-of-two masking.
*
* 除了基本的隊列支援之外,該類還包含在其他地方描述的字段來控制執行。
* In addition to basic queuing support, this class contains
* fields described elsewhere to control execution. It turns out
* 事實證明,将它們包含在這個類而不是單獨的類中,在記憶體方面會更好。
* to work better memory-layout-wise to include them in this class
* rather than a separate class.
*
* 大多數平台上的性能對工作隊列及其數組的執行個體的位置非常敏感——我們絕對不希望
* 多個工作隊列執行個體或多個隊列數組共享緩存行。
* Performance on most platforms is very sensitive to placement of
* instances of both WorkQueues and their arrays -- we absolutely
* do not want multiple WorkQueue instances or multiple queue
* arrays sharing cache lines. (It would be best for queue objects
* (最好讓隊列對象和它們的數組共享,但是沒有任何東西可以幫助安排共享)。
* and their arrays to share, but there is nothing available to
* help arrange that). The @Contended annotation alerts JVMs to
* Contended 注釋警告jvm嘗試隔離執行個體。
* try to keep instances apart.
*/
@sun.misc.Contended
static final class WorkQueue {
/**
* 在初始化時竊取工作隊列數組的容量。
* Capacity of work-stealing queue array upon initialization.
* 必須是二的幂;至少4個,但是應該更大一些,以減少或消除隊列之間的緩存行共享。
* Must be a power of two; at least 4, but should be larger to
* reduce or eliminate cacheline sharing among queues.
* 目前,它要大得多,作為一個部分解決方案,因為jvm經常将數組放置在共享GC記帳
* (特别是cardmarks)的位置,這樣每次寫通路都會遇到嚴重的記憶體争用。
* Currently, it is much larger, as a partial workaround for
* the fact that JVMs often place arrays in locations that
* share GC bookkeeping (especially cardmarks) such that
* per-write accesses encounter serious memory contention.
*/
// 8192
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
/**
* 隊列數組的最大大小。必須是小于或等于1的2的幂<<(31 -數組元素的寬度),
* 以確定缺少對索引計算的總結,但是定義的值要比這個值小一點,以幫助使用者在
* 系統飽和之前捕獲失控的程式。
* Maximum size for queue arrays. Must be a power of two less
* than or equal to 1 << (31 - width of array entry) to ensure
* lack of wraparound of index calculations, but defined to a
* value a bit less than this to help users trap runaway
* programs before saturating systems.
*/
// 67108864
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// registerWorker() 方法初始化時等于 poolIndex 的值, 而poolIndex是short類型的
// 是以eventCount的低16位的值為:poolIndex
// 小于0 說明線程不是活動狀态 (或者準備進入等待狀态)
volatile int eventCount; // encoded inactivation count; < 0 if inactive
// nextWait雖然儲存的是ctl低32位的值,但是最高位是沒有用的,最高位是池的狀态
int nextWait; // encoded record of next event waiter
// 竊取數量
int nsteals; // number of steals
// 1、構造方法會把 hint初始化為: this.hint = seed (threadLocalRandomProbe)
// 用于提示該workQueue在 workQueues中的索引位置
// 2、join()方法調用tryHelpStealer()方法時,如果找到等待任務的偷竊者,記錄偷竊者的索引位置
int hint; // steal index hint 偷竊者索引提示
short poolIndex; // index of this queue in pool
// 模式: 0: lifo, > 0: fifo, < 0: shared
// shared -> 共享隊列
final short mode; // 0: lifo, > 0: fifo, < 0: shared
volatile int qlock; // 1: locked, -1: terminate; else 0
// base 使用volatile 修飾,top沒有 (下一個poll 操作的索引位置)
volatile int base; // index of next slot for poll
// 下一個push 操作的索引位置
int top; // index of next slot for push
// WorkQueue建立時沒有進行初始化
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
// 這個工作隊列所屬的線程,如果是共享的則為null (可用于判斷隊列擁有者是否處于阻塞/等待狀态)
final ForkJoinWorkerThread owner; // owning thread or null if shared
// 在park期間 == owner,即此 workQueue的擁有者; 其他情況為null
volatile Thread parker; // == owner during call to park; else null
// awaitJoin方法等待完成的任務
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
// 目前正在執行的非本地任務
ForkJoinTask<?> currentSteal; // current non-local task being executed
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
int seed) {
this.pool = pool;
this.owner = owner;
this.mode = (short)mode;
// seed 傳入的是 threadLocalRandomProbe
this.hint = seed; // store initial seed for runWorker
// 将索引放在數組的中心(尚未配置設定)
// INITIAL_QUEUE_CAPACITY = 1 << 13
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
/**
* 傳回隊列中任務的大概數量
* Returns the approximate number of tasks in the queue.
*/
final int queueSize() {
// 非工作隊列擁有者調用必須先讀 base (因為base是 volatile修飾的,而 top 不是)
int n = base - top; // non-owner callers must read base first
// 忽略瞬态負值 (忽略base > 0 todo 什麼場景)
return (n >= 0) ? 0 : -n; // ignore transient negative
}
/**
* 通過檢查接近空隊列是否至少有一個無人認領的任務,可以比queueSize更準确地估計該隊列是否有任務。
* Provides a more accurate estimate of whether this queue has
* any tasks than does queueSize, by checking whether a
* near-empty queue has at least one unclaimed task.
*/
final boolean isEmpty() {
ForkJoinTask<?>[] a; int m, s;
// 先讀base, 再讀top
int n = base - (s = top);
return (n >= 0 || // n >= 0 說明沒有任務了,傳回 empty
(n == -1 && // 當 n = -1時,說明隊列中可能還有一個任務,但可能這個任務已經被
((a = array) == null || // 取走了,是以進一步判斷這個任務是否還存在
(m = a.length - 1) < 0 ||
U.getObject
(a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
}
/**
* 添加一個任務。僅由非共享隊列中的所有者調用。
* Pushes a task. Call only by owner in unshared queues. (The
* (共享隊列版本嵌入到externalPush方法中。)
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
* @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int s = top, n;
// 如果隊列被移除,則忽略
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1;
// s = top,把任務放入top索引位置。隻有線程擁有者才會調用這個方法,而且任務
// 取出top不會被更新,是以不會有線程安全問題
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
// top + 1。如果隊列中任務的個數小于等于2,那麼喚醒或者添加一個worker去執行任務,
// 否則池中活動的線程數量可能不夠 (保證線程池的活躍度)
if ((n = (top = s + 1) - base) <= 2)
(p = pool).signalWork(p.workQueues, this);
else if (n >= m)
// 如果隊列中任務的數量達到了 a.length - 1,那麼預先進行擴容
growArray();
}
}
/**
* 初始化或加倍數組的容量。由所有者或持有鎖調用。調整大小時,base可以移動,但top不行。
* (其他工作線程竊取任務base就變化了,但是top隻能由隊列擁有者/共享隊列擷取鎖後才能改變,
* 隊列擁有者在執行擴容,而共享隊列擴容前需要先擷取鎖,是以擴容時top不會改變)
* Initializes or doubles the capacity of array. Call either
* by owner or with lock held -- it is OK for base, but not
* top, to move while resizings are in progress.
*/
final ForkJoinTask<?>[] growArray() {
ForkJoinTask<?>[] oldA = array;
// 如果數組不為null,那麼容量擴大一倍,否則使用預設容量:INITIAL_QUEUE_CAPACITY = 1 << 13
int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
// 若容量超出MAXIMUM_QUEUE_CAPACITY = 1 << 26,抛出異常
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
int oldMask, t, b;
// 使用新的容量建立數組,并 指派給array
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
// (oldMask = oldA.length - 1) >= 0 說明原來的數組不是空數組
// (t = top) - (b = base) > 0 說明原來的數組中有元素,需要複制元素
if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
(t = top) - (b = base) > 0) {
int mask = size - 1;
// 擴容需要一個一個地複制資料,使用CAS擷取資料,不能使用數組複制的方式。
// (使用數組複制方式會導緻資料複制過去後,其他線程還可以去原來的數組擷取資料,
// 導緻任務重複執行)
do {
ForkJoinTask<?> x;
// 計算索引 (b & oldMask) 的位址偏移量
int oldj = ((b & oldMask) << ASHIFT) + ABASE;
// 計算索引 (b & mask) 的位址偏移量
int j = ((b & mask) << ASHIFT) + ABASE;
// 具有volatile讀的記憶體語義
x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
if (x != null &&
U.compareAndSwapObject(oldA, oldj, x, null)) // 清除舊數組的元素
U.putObjectVolatile(a, j, x); // 立即可見
} while (++b != t); // ++b != t 說明還有元素
}
// 傳回新的數組
return a;
}
/**
* 按後進先出(LIFO)順序擷取下一個任務(如果有的話)
* Takes next task, if one exists, in LIFO order. Call only
* 僅由非共享隊列中的所有者調用。
* by owner in unshared queues.
*/
// LIFO: 後進先出 (僅由隊列所有者線程調用這個方法)
final ForkJoinTask<?> pop() {
ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
// 判斷隊列不為null,且長度大于0
if ((a = array) != null && (m = a.length - 1) >= 0) {
// (s = top - 1) - base >= 0 說明隊列中有元素
for (int s; (s = top - 1) - base >= 0;) {
// 計算最後一個元素的位址偏移量
long j = ((m & s) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
// 因為其他工作線程隻會從base位置擷取任務,如果最後一個任務為null,
// 說明最後一個任務也被其他線程竊取了,隊列為空了,是以不用重試,傳回null
break;
// 使用CAS擷取任務
if (U.compareAndSwapObject(a, j, t, null)) {
// top = top - 1
top = s;
return t;
}
}
}
return null;
}
/**
* 如果b是隊列的base,并且可以無争用地認領任務,則按FIFO順序擷取任務。
* Takes a task in FIFO order if b is base of queue and a task
* can be claimed without contention. Specialized versions
* 專門的版本出現在ForkJoinPool方法scan和tryHelpStealer。
* appear in ForkJoinPool methods scan and tryHelpStealer.
*/
final ForkJoinTask<?> pollAt(int b) {
ForkJoinTask<?> t; ForkJoinTask<?>[] a;
if ((a = array) != null) {
// 計算索引b的位址偏移量
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
// b 必須等于 base才可以擷取任務
if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
base == b && U.compareAndSwapObject(a, j, t, null)) {
// base ++
U.putOrderedInt(this, QBASE, b + 1);
return t;
}
}
return null;
}
/**
* 按FIFO(先進先出) 順序擷取下一個任務(如果存在的話)。
* Takes next task, if one exists, in FIFO order.
*/
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
// 判斷隊列中有元素
while ((b = base) - top < 0 && (a = array) != null) {
// 計算base索引位置的位址偏移量
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 先讀取任務,再通過CAS擷取任務
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
if (t != null) {
// 使用CAS擷取任務
if (U.compareAndSwapObject(a, j, t, null)) {
// base ++
U.putOrderedInt(this, QBASE, b + 1);
// 傳回任務
return t;
}
// 若任務擷取失敗重試,直到隊列為空
}
// ---- 任務已被其它線程擷取了 ----
// base == b說明 base還沒進行更新,此時如果隊列中還有任務,那麼我們放棄線程的執行
// 權,等待base更新
else if (base == b) {
// b + 1 = top,說明隊列中已經沒有任務了(base位置的任務已被其它線程取走了)
if (b + 1 == top)
break;
// 放棄線程執行權,等待 base更新
Thread.yield(); // wait for lagging update (very rare) 等待滞後更新(非常罕見)
}
}
return null;
}
/**
* 按模式指定的順序擷取下一個任務(如果存在)。會從隊列中删除這個任務
* Takes next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> nextLocalTask() {
// 0: lifo, > 0: fifo, < 0: shared
return mode == 0 ? pop() : poll();
}
/**
* 如果存在的下一個任務,則按模式指定的順序傳回該任務。 (不會删除)
* Returns next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> peek() {
ForkJoinTask<?>[] a = array; int m;
// 判斷隊列是否為空
if (a == null || (m = a.length - 1) < 0)
return null;
// 0: lifo, > 0: fifo, < 0: shared
// 0:後進先出, 非0 先進先出
int i = mode == 0 ? top - 1 : base;
// 計算索引i的位址偏移量
int j = ((i & m) << ASHIFT) + ABASE;
return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
}
/**
* 僅當給定任務位于目前top時,才彈出該任務。
* Pops the given task only if it is at the current top.
* 共享版本的僅能通過FJP.tryExternalUnpush 使用。
* (A shared version is available only via FJP.tryExternalUnpush)
*/
final boolean tryUnpush(ForkJoinTask<?> t) {
ForkJoinTask<?>[] a; int s;
// top == base 說明隊列中沒有任務
if ((a = array) != null && (s = top) != base &&
// 如果索引 --s 的元素是t,則擷取 并且 設定該索引位置的元素為null
U.compareAndSwapObject
(a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
// top = top - 1
top = s;
return true;
}
return false;
}
/**
* 删除并取消所有已知任務,忽略任何異常
* Removes and cancels all known tasks, ignoring any exceptions.
*/
final void cancelAll() {
// 取消目前等待完成的任務
ForkJoinTask.cancelIgnoringExceptions(currentJoin);
// 取消目前竊取的任務
ForkJoinTask.cancelIgnoringExceptions(currentSteal);
// 取消隊列中的所有任務
// poll() 按FIFO(先進先出) 順序擷取下一個任務(如果存在的話)。
for (ForkJoinTask<?> t; (t = poll()) != null; )
ForkJoinTask.cancelIgnoringExceptions(t);
}
// Specialized execution methods
/**
* 輪詢并運作任務,直到隊列為空
* Polls and runs tasks until empty.
*/
// FIFO 模式
final void pollAndExecAll() {
// poll()擷取并删除第一個元素,如果隊列為空傳回null (實作FIFO)
for (ForkJoinTask<?> t; (t = poll()) != null;)
t.doExec(); // 執行任務
}
/**
* 執行頂級任務和執行後剩餘的任何本地任務。
* Executes a top-level task and any local tasks remaining
* after execution.
*/
final void runTask(ForkJoinTask<?> task) {
// currentSteal = task
if ((currentSteal = task) != null) {
ForkJoinWorkerThread thread;
// 執行任務
task.doExec();
ForkJoinTask<?>[] a = array;
// 0: lifo, > 0: fifo, < 0: shared
int md = mode;
// nsteals + 1
++nsteals;
currentSteal = null;
// 預設情況下使用的是LIFO_QUEUE模式,即md = 0
// FIFO
if (md != 0) {
// 運作此方法的workQueue一定不是 shared模式的,是以如果md != 0,
// 那麼 md 一定大于0,即 FIFO 模式
pollAndExecAll();
}
// LIFO,預設情況下使用的是這個模式,因為後面的任務是由前面的任務分割的,任務比較小,
// 較大的任務讓其他線程去執行,這樣任務分攤比較均勻
else if (a != null) {
int s, m = a.length - 1;
ForkJoinTask<?> t;
// (s = top - 1) - base >= 0 說明隊列中還有任務
while ((s = top - 1) - base >= 0 &&
// 擷取索引s (top - 1)的元素,并設定該索引位置的元素為null
(t = (ForkJoinTask<?>)U.getAndSetObject
(a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
// top = top -1
top = s;
// 執行任務
t.doExec();
}
}
if ((thread = owner) != null) // no need to do in finally clause
// 預設為空,由InnocuousForkJoinWorkerThread 實作
thread.afterTopLevelExec();
}
}
/**
* 如果給定的任務存在,從隊列中移除并執行,或任何其他已取消的任務。
* If present, removes from queue and executes the given task,
* or any other cancelled task. Returns (true) on any CAS
* 在任何CAS或一緻性檢查失敗時傳回(true),以便調用者可以重試。
* or consistency check failure so caller can retry.
*
* 如果沒有取得進展傳回false,否則傳回true
* @return false if no progress can be made, else true
*/
// 周遊整個workQueue隊列去查找任務,找到則執行這個任務
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
boolean stat;
ForkJoinTask<?>[] a; int m, s, b, n;
// (n = (s = top) - (b = base)) > 0) 說明隊列中有任務
if (task != null && (a = array) != null && (m = a.length - 1) >= 0 &&
(n = (s = top) - (b = base)) > 0) {
boolean removed = false, empty = true;
stat = true;
// 從s(top) 開始周遊到 b(base)
for (ForkJoinTask<?> t;;) { // traverse from s to b
// 計算--s 索引位置的位址偏移量
long j = ((--s & m) << ASHIFT) + ABASE;
t = (ForkJoinTask<?>)U.getObject(a, j);
// t == null 說明任務被其他線程取出了,有可能是目标任務,是以退出,
// 傳回true,外部會判斷目标任務是否已經完成。(一緻性檢查失敗)
if (t == null) // inconsistent length 長度不一緻
// 此時 stat = true,是以會傳回true,傳回true後,如果判斷任務沒有完成會
// 繼續調用這個方法
break;
else if (t == task) { // 找到task
// 如果task 是隊列的最後一個任務,直接取出這個任務
if (s + 1 == top) { // pop
if (!U.compareAndSwapObject(a, j, task, null))
// 如果CAS失敗,break,此時 stat = true,是以方法會傳回true,外部方法
// 會判斷任務是否已經完成了
break;
// CAS成功,top = top -1
top = s;
// removed = true 表示任務已經移除
removed = true;
}
// task不是隊列的最後一個任務
// 一緻性檢查,如果 base != b,說明有線程取出了base索引所在的任務,
// 對于這種情況傳回true,判斷任務是否完成(因為有可能取出的是目标這個任務),
// 而且這種情況下有可能不止一個任務被取出,是以不重新檢查任務是否還在隊列中,
// 這樣可以快速響應join()
else if (base == b) // replace with proxy 替換為代理
// 使用EmptyTask 來替換這個任務
removed = U.compareAndSwapObject(a, j, task,
new EmptyTask());
break;
}
// t.status >= 0 說明隊列中除了目标任務外還有未完成的任務
else if (t.status >= 0)
empty = false;
// 執行到這裡說明t.status < 0, s + 1 == top 說明是隊列的最後一個任務
else if (s + 1 == top) { // pop and throw away
// 任務已經取消,并且這個任務是隊列的最後一個任務,
// 那麼直接扔掉這個任務 (取消的任務在隊列的最後一個任務才會被直接扔掉)
if (U.compareAndSwapObject(a, j, t, null))
// top = top - 1
top = s;
break;
}
// --n == 0 說明周遊完了
if (--n == 0) {
// base == b 說明沒有任務被取出,這種情況下才可以确定任務不在此workQueue中,
// 否則需要傳回true,判斷任務是否已經完成,未完成進行重試(繼續檢查任務是否完成)
if (!empty && base == b)
stat = false;
break;
}
}
// ------ for循環 end ----------------
if (removed)
// 如果成功取出這個任務,則執行這個任務
task.doExec();
}
else
// 隊列為空,傳回false,沒必要再輪詢這個隊列
stat = false;
// 隊列為空 或者 隊列不為空并且base沒有改變 時傳回false
return stat;
}
/**
* 嘗試輪詢并執行給定的任務或任何其子任務
* Tries to poll for and execute the given task or any other
* task in its CountedCompleter computation.
*/
// 判斷隊列中base索引位置的任務是否是root任務或者其子任務,如果是,取出并執行。
// 如果base索引位置的任務已經被其他線程擷取,或者base索引位置的任務是root任務
// 或者其子任務,傳回true; 否則傳回false
final boolean pollAndExecCC(CountedCompleter<?> root) {
ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r;
// (b = base) - top < 0 說明隊列中有任務
if ((b = base) - top < 0 && (a = array) != null) {
// 計算隊列base 索引位置的偏移量 (因為這個方法是由其他線程調用的,是以從base位置擷取任務)
long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 如果base位置的任務為null,說明存在競争,傳回true,進行重試
if ((o = U.getObjectVolatile(a, j)) == null)
return true; // retry
if (o instanceof CountedCompleter) {
for (t = (CountedCompleter<?>)o, r = t;;) {
// 找到目标任務root,取出base索引位置的任務,并執行
if (r == root) {
// base == b 判斷base索引位置的任務沒有被其他線程擷取
if (base == b &&
U.compareAndSwapObject(a, j, t, null)) {
// CAS成功取出任務,修改 base = base + 1
U.putOrderedInt(this, QBASE, b + 1);
// 執行任務
t.doExec();
}
// 傳回true,進行重試,繼續判斷base位置的任務是否是目标任務root的子任務
return true;
}
else if ((r = r.completer) == null)
// base索引位置的任務不是 root 任務的子任務
break; // not part of root computation 不是root 任務計算的一部分
}
}
}
// 傳回false不進行重試
return false;
}
/**
* 嘗試彈出并執行給定的任務或者其葉子任務 (隻判斷隊列的最後一個任務)
* Tries to pop and execute the given task or any other task
* in its CountedCompleter computation.
*/
final boolean externalPopAndExecCC(CountedCompleter<?> root) {
ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
// 判斷隊列不為空
if (base - (s = top) < 0 && (a = array) != null) {
// 計算最後一個元素的位址偏移量
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
for (t = (CountedCompleter<?>)o, r = t;;) {
// 找到目标任務 (根任務)
if (r == root) {
// 使用CAS擷取隊列的鎖
if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
// 一緻性檢查,并使用CAS修改最後一個元素為null (擷取任務的執行權)
if (top == s && array == a &&
U.compareAndSwapObject(a, j, t, null)) {
// 修改top = top - 1
top = s - 1;
// 釋放鎖
qlock = 0;
// 執行任務
t.doExec();
}
else
// 一緻性檢查失敗,或者擷取任務的執行權失敗,釋放鎖
qlock = 0;
}
// 傳回true,說明有進展
return true;
}
// r指向父節點任務
else if ((r = r.completer) == null)
break;
}
}
}
// 沒有進展傳回false
return false;
}
/**
* Internal version
*/
// 判斷工作隊列的最後一個任務是否是任務root,或者其子任務,如果是的話,取出這個任務并執行,
// 然後傳回true,否則傳回false。
// ExecCC -> ExecCountedCompleter
final boolean internalPopAndExecCC(CountedCompleter<?> root) {
ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
// base - (s = top) < 0 說明隊列中有元素
if (base - (s = top) < 0 && (a = array) != null) {
// 計算隊列中最後一個元素的索引位置
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
for (t = (CountedCompleter<?>)o, r = t;;) {
// 找到root 任務
if (r == root) {
// 使用CAS取出任務,并執行它 (注意:這裡取出的是任務t,而不是任務r)
if (U.compareAndSwapObject(a, j, t, null)) {
// CAS成功,top = top - 1,不會存線上程安全問題,因為CAS成功了才能修改top,
// 而且隻有工作線程本身才可以從top取出任務,其他工作線程隻能從base取出任務
top = s - 1;
// doExec()裡面會判斷任務是否已經執行,沒有執行過才會執行
t.doExec();
}
// 找到任務并執行,傳回true
return true;
}
// r
// / \
// b c
// / \ / \
// d e f g
// 比如root 是上圖中的任務r(即工作線程要等待任務r的完成),d是此工作線程中隊列的最後
// 一個任務,如果d的類型是 CountedCompleter,那麼一級一級往上判斷d的父節點任務是否
// 是root任務,如果是則從隊列中取出這個任務,并執行(任務還在隊列中,是以一定還沒有執行過)。
// 任務r的完成需要任務d先完成,是以有必要去執行任務d。
// r = r.completer,判斷
else if ((r = r.completer) == null)
// 到達根任務了,沒有任務t不是目标任務root的子任務,傳回false
break;
}
}
}
return false;
}
/**
* 如果隊列擁有者未阻塞,傳回true
* Returns true if owned and not known to be blocked.
*/
final boolean isApparentlyUnblocked() {
Thread wt; Thread.State s;
// eventCount 小于0 說明線程不是活動狀态(或者準備進入等待狀态)
return (eventCount >= 0 &&
(wt = owner) != null &&
(s = wt.getState()) != Thread.State.BLOCKED && // 判斷線程不是阻塞或者等待狀态
s != Thread.State.WAITING &&
s != Thread.State.TIMED_WAITING);
}
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long QBASE;
private static final long QLOCK;
private static final int ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = WorkQueue.class;
Class<?> ak = ForkJoinTask[].class;
QBASE = U.objectFieldOffset
(k.getDeclaredField("base"));
QLOCK = U.objectFieldOffset
(k.getDeclaredField("qlock"));
// 報告給定數組類的存儲配置設定中第一個元素的偏移量。
ABASE = U.arrayBaseOffset(ak);
// 擷取數組元素的位址偏移量
int scale = U.arrayIndexScale(ak);
// 數組元素的位址偏移量必須是2的幂
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// Integer.numberOfLeadingZeros(scale) -> 判斷最高位的1的左邊的0的個數
// ASHIFT -> 表示 scale是 1向左移動 ASHIFT 位後得到的數 (即 scale = 1 << ASHIFT)
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
}
// static fields (initialized in static initializer below)
/**
* Creates a new ForkJoinWorkerThread. This factory is used unless
* overridden in ForkJoinPool constructors.
*/
public static final ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory;
/**
* Permission required for callers of methods that may start or
* kill threads.
*/
private static final RuntimePermission modifyThreadPermission;
/**
* Common (static) pool. Non-null for public use unless a static
* construction exception, but internal usages null-check on use
* to paranoically avoid potential initialization circularities
* as well as to simplify generated code.
*/
static final ForkJoinPool common;
/**
* Common pool parallelism. To allow simpler use and management
* when common pool threads are disabled, we allow the underlying
* common.parallelism field to be zero, but in that case still report
* parallelism as 1 to reflect resulting caller-runs mechanics.
*/
static final int commonParallelism;
/**
* Sequence number for creating workerNamePrefix.
*/
private static int poolNumberSequence;
/**
* Returns the next sequence number. We don't expect this to
* ever contend, so use simple builtin sync.
*/
// 靜态同步方法
private static final synchronized int nextPoolId() {
return ++poolNumberSequence;
}
// static constants
/**
* 初始逾時值(以納秒為機關),用于觸發暫停以等待新工作的線程。
* Initial timeout value (in nanoseconds) for the thread
* triggering quiescence to park waiting for new work. On timeout,
* 在逾時時,線程将嘗試減少workers的數量。
* the thread will instead try to shrink the number of
* 該值應該足夠大,以避免過度積極收縮在大多數瞬态停機(長GCs等)。
* workers. The value should be large enough to avoid overly
* aggressive shrinkage during most transient stalls (long GCs
* etc).
*/
// 2 秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
/**
* 當線程數超過并行度級别時的逾時值 (0.2秒)
* Timeout value when there are more threads than parallelism level
*/
private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L;
/**
* 對空閑逾時的容忍,以應對計時器不足
* Tolerance for idle timeouts, to cope with timer undershoots
*/
private static final long TIMEOUT_SLOP = 2000000L;
/**
* The maximum stolen->joining link depth allowed in method
* tryHelpStealer. Must be a power of two. Depths for legitimate
* chains are unbounded, but we use a fixed constant to avoid
* (otherwise unchecked) cycles and to bound staleness of
* traversal parameters at the expense of sometimes blocking when
* we could be helping.
*/
private static final int MAX_HELP = 64;
/**
* seed 生成器增量。
* Increment for seed generators. See class ThreadLocal for
* explanation.
*/
private static final int SEED_INCREMENT = 0x9e3779b9;
/*
* 控制變量的位和掩碼
* Bits and masks for control variables
*
* 字段ctl是一個long 類型:
* Field ctl is a long packed with:
*
* 活動運作workers的數量減去目标并行度(16 位) (ActiveCount)
* AC: Number of active running workers minus target parallelism (16 bits)
*
* 總workers數減去目标并行度(16位) (TotalCount)
* TC: Number of total workers minus target parallelism (16 bits)
*
* 如果池終止,則為true (1位) (ST: status)
* ST: true if pool is terminating (1 bit)
*
* 頂部等待線程的等待計數(15位). ID記錄的線程的等待次數,避免ABA問題
* EC: the wait count of top waiting thread (15 bits)
*
* waiters的Treiber頂棧的 poolIndex(16位)
* ID: poolIndex of top of Treiber stack of waiters (16 bits)
*
* 友善時,我們可以提取計數的上32位和隊列狀态的下32位,u = (int)(ctl >>> 32)和e = (int)ctl)
* (ctl 的高32位是計數,低32位是隊列狀态)
* When convenient, we can extract the upper 32 bits of counts and
* the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
* ec字段從不單獨通路,但總是與id和st一起通路。
* (int)ctl. The ec field is never accessed alone, but always
* together with id and st.
* 目标并行度和字段位置的計數偏移使得通過字段的符号測試執行最常見的檢查成為可能:
* The offsets of counts by the target
* parallelism and the positionings of fields makes it possible to
* perform the most common checks via sign tests of fields: When
* 當ac為負時,沒有足夠的活動的workers,當tc為負時,沒有足夠的總workers,當e為負時,池終止。
* ac is negative, there are not enough active workers, when tc is
* negative, there are not enough total workers, and when e is
* negative, the pool is terminating. To deal with these possibly
* 為了處理這些可能出現的負數字段,我們使用強制轉換成“short” 和/或 符号移位以保持符号性。
* negative fields, we use casts in and out of "short" and/or
* signed shifts to maintain signedness.
*
* 當一個線程進入隊列(未激活)時,它的eventCount字段被設定為負值,這是判斷worker
* 是否被阻止執行任務的唯一方法,即使它必須繼續掃描任務以避免隊列争用。
* When a thread is queued (inactivated), its eventCount field is
* set negative, which is the only way to tell if a worker is
* prevented from executing tasks, even though it must continue to
* scan for them to avoid queuing races. Note however that
* 但是請注意,eventCount更新延遲釋出,是以使用時需要謹慎。
* eventCount updates lag releases so usage requires care.
*
* plock 字段是int 類型的
* Field plock is an int packed with:
* true 如果可以shutdown(1位)
* SHUTDOWN: true if shutdown is enabled (1 bit)
* 系列化鎖,鎖定時PL_LOCK位被設定 (30位)
* SEQ: a sequence lock, with PL_LOCK bit set if locked (30 bits)
* 可能有線程正在等待這個鎖時被設定(1位)
* SIGNAL: set when threads may be waiting on the lock (1 bit)
*
* 序列号支援簡單的一緻性檢查:可以通過比較讀取前後的plock來檢查workQueues數組上的隻讀操作是否過時。
* The sequence number enables simple consistency checks:
* Staleness of read-only operations on the workQueues array can
* be checked by comparing plock before vs after the reads.
*/
// bit positions/shifts for fields
private static final int AC_SHIFT = 48;
private static final int TC_SHIFT = 32;
private static final int ST_SHIFT = 31;
private static final int EC_SHIFT = 16;
// bounds (1 << 16) - 1 -> SMASK = 1111 1111 1111 1111
private static final int SMASK = 0xffff; // short bits
// MAX_CAP = 0111 1111 1111 1111
private static final int MAX_CAP = 0x7fff; // max #workers - 1
// 1111 1111 1111 1110
private static final int EVENMASK = 0xfffe; // even short bits
// 0x007e (126) -> 1111110 (最大64個偶數槽)
// 111111 = 63,加上 0000000 (0槽位), 總共64個偶數槽
private static final int SQMASK = 0x007e; // max 64 (even) slots
private static final int SHORT_SIGN = 1 << 15;
private static final int INT_SIGN = 1 << 31;
// masks ST_SHIFT = 31 -> STOP_BIT = 1L << 31
private static final long STOP_BIT = 0x0001L << ST_SHIFT;
// SMASK = 0xffff -> (1 << 15) - 1; AC_SHIFT = 48
// AC_MASK 高16位為: 1111 1111 1111 1111,低48位為0
// 1111 1111 1111 1111 000000000000000000000000000000000000000000000000
private static final long AC_MASK = ((long)SMASK) << AC_SHIFT;
// TC_MASK 高32位為:0000 0000 0000 0000 1111 1111 1111 1111,低32位為0
// 0000 0000 0000 0000 1111 1111 1111 1111 00000000000000000000000000000000
private static final long TC_MASK = ((long)SMASK) << TC_SHIFT;
// TC_SHIFT = 32 TC_UNIT = 1L << 32
// units for incrementing and decrementing
private static final long TC_UNIT = 1L << TC_SHIFT;
// AC_SHIFT = 48; -> AC_UNIT = 1L << 48
private static final long AC_UNIT = 1L << AC_SHIFT;
// masks and units for dealing with u = (int)(ctl >>> 32)
// AC_SHIFT = 48, --> UAC_SHIFT = 16
private static final int UAC_SHIFT = AC_SHIFT - 32;
// TC_SHIFT = 32 -> UTC_SHIFT = 0
private static final int UTC_SHIFT = TC_SHIFT - 32;
// UAC_SHIFT = 16, SMASK = (1 << 16) - 1, UAC_MASK = 1111 1111 1111 1111 0000 0000 0000 0000
private static final int UAC_MASK = SMASK << UAC_SHIFT;
// SMASK = (1 << 16) - 1, UTC_SHIFT = 0 -> (1 << 16) - 1
private static final int UTC_MASK = SMASK << UTC_SHIFT;
// UAC_SHIFT = 16 -> UAC_UNIT = 1 << 16
private static final int UAC_UNIT = 1 << UAC_SHIFT;
// UTC_SHIFT = 0 -> UTC_UNIT = 1
private static final int UTC_UNIT = 1 << UTC_SHIFT;
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// masks and units for dealing with e = (int)ctl
private static final int E_MASK = 0x7fffffff; // no STOP_BIT
// EC_SHIFT = 16, E_SEQ = 1 << 16
private static final int E_SEQ = 1 << EC_SHIFT;
// plock bits
private static final int SHUTDOWN = 1 << 31;
// 0010
private static final int PL_LOCK = 2;
private static final int PL_SIGNAL = 1;
private static final int PL_SPINS = 1 << 8;
// access mode for WorkQueue
static final int LIFO_QUEUE = 0; // asyncMode = false 使用這個模式,預設為false
static final int FIFO_QUEUE = 1; // asyncMode = true 使用這個模式
static final int SHARED_QUEUE = -1;
// Instance fields
volatile long stealCount; // collects worker counts
// 主要池控制
// 從高到低:
// 16位:AC: 活動運作workers的數量減去parallelism
// 16位:TC: 總workers數減去parallelism
// 1位: ST: 表示池是否已經終止 (1終止)
// 15位:EC: 頂部等待線程的等待計數。ID記錄的線程的等待次數,避免ABA問題
// 16位: ID: waiters的Treiber頂棧的 poolIndex (低32就是取等待線程頂棧eventCount的值)
volatile long ctl; // main pool control
// 最高位:表示池是否已經關閉,若已關閉,則值為1,即 plock < 0
// 最低位:表示有線程在等待擷取鎖,需要喚醒
// 中間30位:最低位(plock的右邊第二位)表示鎖是否被擷取,1表示已被擷取,其他位表示鎖的系列号
// 每擷取一次鎖,plock的值加2,釋放鎖plock的值再加2
volatile int plock; // shutdown status and seqLock
volatile int indexSeed; // worker/submitter index seed
final short parallelism; // parallelism level
final short mode; // LIFO/FIFO
// 工作隊列的索引是奇數,共享(送出)隊列的索引是偶數的,最多64個槽 (偶數槽最多64個)
WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
final UncaughtExceptionHandler ueh; // per-worker UEH
// 構造方法初始化
final String workerNamePrefix; // to create worker name string
/**
* 擷取plock鎖以保護工作者數組和相關更新。
* Acquires the plock lock to protect worker array and related
* 隻有在plock上的初始CAS失敗時才調用此方法。
* updates. This method is called only if an initial CAS on plock
* fails.
* 這在正常情況下起自旋鎖的作用,但在(很少)需要時,又會回到内置螢幕來阻塞。
* 對于高度争用的鎖來說,這是一個糟糕的主意,但是作為純自旋鎖的一個更保守的替代方案,
* 這是可行的。
* This acts as a spinlock for normal cases, but falls back
* to builtin monitor to block when (rarely) needed. This would be
* a terrible idea for a highly contended lock, but works fine as
* a more conservative alternative to a pure spinlock.
*/
// 擷取鎖,并傳回擷取鎖後 plock的值
private int acquirePlock() {
// PL_SPINS = 1 << 8 (256)
int spins = PL_SPINS, ps, nps;
for (;;) {
// PL_LOCK = 2 -> 0010,若plock 右邊第二位為0,并使用CAS修改右邊第二位為1成功
if (((ps = plock) & PL_LOCK) == 0 &&
U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
// 傳回修改 plock的值
return nps;
else if (spins >= 0) {
// 随機生成threadLocalRandomSecondarySeed,并傳回
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
// 線程的 threadLocalRandomSecondarySeed >= 0, spins才會遞減
--spins;
}
// 超出自旋次數, PL_SIGNAL = 1, 使用CAS把 plock的右邊第一位修改為1 (表示線程需要喚醒)
else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
synchronized (this) {
// plock 右邊第一位為1 才能進入等待狀态,即判斷擷取鎖後上一個擷取 plock的線程還沒執行喚醒操作
if ((plock & PL_SIGNAL) != 0) {
try {
// wait() 等價于 this.wait(),是以螢幕是 this
// wait() 是在 for循環裡面,可以避免虛假喚醒
wait();
} catch (InterruptedException ie) {
try {
// 線程被中斷,重新中斷線程
Thread.currentThread().interrupt();
} catch (SecurityException ignore) {
}
}
}
else
// todo 回頭再看一下,是不是調用notifyAll之前先設定了狀态?
// 是的,擷取plock的線程先釋放鎖,再去競争this鎖,然後執行 notifyAll(),而不是先競争this鎖,
// 然後再釋放plock鎖,喚醒等待線程,使用了快速釋放鎖的方式,而其他線程可以幫忙先執行喚醒操作。
// 喚醒在this等待的所有線程。
// 因為擷取plock鎖的線程釋放鎖後,先設定了 plock的值,表示已經釋放鎖,然後再擷取 this螢幕,
// 喚醒所有線程,是以雖然plock鎖已經釋放了,但是此時可能還沒喚醒等待的線程,擷取鎖執行喚醒操作,
// 而不是等待上一個擷取plock鎖的線程競争到this鎖後才執行喚醒操作,減少線程的等待時間
notifyAll();
}
}
}
}
/**
* 解鎖并向任何等待鎖的線程發出信号。
* Unlocks and signals any thread waiting for plock. Called only
* when CAS of seq value for unlock fails.
*/
// 1、對plock進行指派,釋放鎖 2、喚醒所有等待的線程
private void releasePlock(int ps) {
// 直接指派而不是使用CAS的原因是:
// -> 擷取鎖,然後釋放鎖失敗才會調用這個方法,而CAS失敗隻能是最高位/最低位變了,
// 最高位表示池是否關閉,而池進行關閉需要先擷取鎖,是以不可能;最低位變了表示有
// 線程已經進入等待狀态,需要喚醒,我們在指派後,調用notifyAll()即可
plock = ps;
// 先釋放鎖,再進行喚醒
synchronized (this) { notifyAll(); }
}
/**
* 如果并行度低于目标水準,則嘗試建立并啟動一個worker。在故障時調整計數等。
* Tries to create and start one worker if fewer than target
* parallelism level exist. Adjusts counts etc on failure.
*/
private void tryAddWorker() {
long c; int u, e;
// 擷取ctl高32位的值 (ctl的 33-48位 和 49-64的初始值都是 -parallelism)
// (u = (int)((c = ctl) >>> 32)) < 0 說明:活動運作workers的數量小于 parallelism
// SHORT_SIGN = 1 << 15, u & SHORT_SIGN) != 0 說明:總workers數小于parallelism
// ctl 第32位的值為:ST(status),即池的狀态,1表示池已終止; (int) c >= 0說明池還沒終止。
while ((u = (int)((c = ctl) >>> 32)) < 0 &&
(u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) {
// UTC_UNIT = 1, UTC_MASK = (1 << 16) -1
// ((u + UTC_UNIT) & UTC_MASK) 執行結果為: u + 1,然後保留低16位的值
// UAC_UNIT = 1 << 16, UAC_MASK = 1111 1111 1111 1111 0000 0000 0000 0000
// ((u + UAC_UNIT) & UAC_MASK) 執行結果為:u的高16位 + 1, 然後保留高16位
// e = (int)c ,即ctl 低32位的值
// 整個式子執行結果為:ctl的高16位的值加1,次高16的值加1,低32位的值保持不變
// 也就是 AC + 1, TC + 1。 AC: 活動運作workers的數量 - parallelism ; TC: 總workers數 - parallelism
long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) |
((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e;
// 使用CAS修改ctl的值為nc
if (U.compareAndSwapLong(this, CTL, c, nc)) {
ForkJoinWorkerThreadFactory fac;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
// 使用ForkJoin線程池工廠建立一個ForkJoinWorkerThread 線程
if ((fac = factory) != null &&
(wt = fac.newThread(this)) != null) {
// 啟動線程
wt.start();
break;
}
} catch (Throwable rex) {
ex = rex;
}
// 線程建立或者啟動失敗,登出 wt
deregisterWorker(wt, ex);
break;
}
}
}
// 注冊和登出workers
// Registering and deregistering workers
/**
* 從ForkJoinWorkerThread調用,建立并記錄它的WorkQueue.
* Callback from ForkJoinWorkerThread to establish and record its
* WorkQueue.
* 為了避免由于在workQueues數組前填充條目而導緻的掃描偏差,我們将該數組視為一個簡單的二幂哈希表,
* 使用每個線程種子作為哈希,根據需要進行擴充。
* To avoid scanning bias due to packing entries in
* front of the workQueues array, we treat the array as a simple
* power-of-two hash table using per-thread seed as hash,
* expanding as needed.
*
* @param wt the worker thread
* @return the worker's queue
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
// 設定線程為守護線程
wt.setDaemon(true);
if ((handler = ueh) != null)
// 設定 Thread的uncaughtExceptionHandler 字段值
wt.setUncaughtExceptionHandler(handler);
// 使用CAS更新 indexSeed,在原來的值上增加 SEED_INCREMENT,直到成功為止,
// 并且更新後的 indexSeed 不能為0,如果為0,繼續增加
do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
s += SEED_INCREMENT) ||
s == 0); // skip 0
// 建立工作隊列
WorkQueue w = new WorkQueue(this, wt, mode, s);
// PL_LOCK = 2 -> 0010 , 如果 PL_LOCK 的第二位為1 (plock 池的鎖)
// || PL_LOCK的第二位為0,但使用CAS把第二位修改為1失敗
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
// 若鎖已被擷取,或者擷取鎖失敗,調用acquirePlock()方法擷取鎖,并傳回擷取鎖後 plock的值
ps = acquirePlock();
// SHUTDOWN = 1 << 31
// ps & SHUTDOWN -> 擷取ps 第32位的值
// & ~SHUTDOWN 的作用是把第32位的值設定為0,避免溢出而導緻第32位的值為1
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
try {
// 如果池已經關閉,則跳過
if ((ws = workQueues) != null) { // skip if shutting down
int n = ws.length, m = n - 1;
// x = indexSeed, | 1的作用是,保證 r的值為奇數
int r = (s << 1) | 1; // use odd-numbered indices 使用奇數索引
// r = r & m, 若該索引位置上已經有其他元素了,表示索引位置沖突了
if (ws[r &= m] != null) { // collision 沖突
int probes = 0; // step by approx half size
// EVENMASK = 0xfffe -> 1111 1111 1111 1110
// (n >>> 1) & EVENMASK 保證是個偶數
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[r = (r + step) & m] != null) {
if (++probes >= n) {
// 沖突次數達到了n次 (不包含前面那次),對 workQueues進行擴容,容量增加一倍
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
// 把workQueue在 workQueues數組中的索引位置儲存到 workQueue中
w.poolIndex = (short)r;
// volatile 寫順序
w.eventCount = r; // volatile write orders
// 把w 放到workQueues的索引 r位置
ws[r] = w;
}
} finally {
// 使用CAS修改 plock的值,釋放鎖
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
// 1、對plock進行指派,釋放鎖 2、喚醒所有等待的線程
releasePlock(nps);
}
// 設定線程名稱, w.poolIndex >>> 1 是因為都儲存在奇數索引位置上
wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1)));
return w;
}
/**
* 終止worker的最後回調,以及構造或啟動worker失敗時進行調用。
* Final callback from terminating worker, as well as upon failure
* to construct or start a worker. Removes record of worker from
* 從數組中删除worker記錄,并調整計數。如果池正在關閉,則嘗試完成終止。
* array, and adjusts counts. If pool is shutting down, tries to
* complete termination.
*
* @param wt the worker thread, or null if construction failed
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
// (w = wt.workQueue) != null 說明該線程的 workQueue已經被注冊到 workQueues中了
if (wt != null && (w = wt.workQueue) != null) {
int ps;
// 設定 w.qlock = -1,表示該workQueue已經終止
w.qlock = -1; // ensure set 確定有設定 ( awaitWork()方法等待逾時退出也會設定 )
// 該workQueue竊取的任務數量彙總到ForkJoinPool的執行個體字段stealCount 中
U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
// PL_LOCK = 0010, ((ps = plock) & PL_LOCK) != 0 說明plock鎖已經被擷取了
// 或者 使用CAS 擷取 plock鎖失敗,則調用acquirePlock()方法擷取鎖,并傳回擷取鎖後 plock的值
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
// 擷取 plock鎖
ps = acquirePlock();
// SHUTDOWN = 1 << 31, ps & SHUTDOWN 擷取 ps第32位的值,該位表示池是否終止,而不是鎖的系列号
// (ps + PL_LOCK) & ~SHUTDOWN 強制把運算後第32位的值置為0,因為加完後值有可能溢出而
// 導緻第32位的值變成1,是以 需要分兩步進行計算
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
try {
int idx = w.poolIndex;
WorkQueue[] ws = workQueues;
// 從 workQueues中移除要删除的 queue
if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
ws[idx] = null;
} finally {
// 雖然已經擷取鎖了,但是最低位是有可能變的,是以需要使用CAS
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
// 1、對plock進行指派,釋放鎖 2、喚醒所有等待的線程
releasePlock(nps);
}
}
// 上面隻是從 workQueues中移除 workQueue,還沒更新計數
// 調整ctl 計數
long c; // adjust ctl counts
// AC_UNIT = 1L << 48, AC_MASK 高16位為: 1111 1111 1111 1111,低48位為0
// TC_UNIT = 1L << 32, TC_MASK 高32位為:0000 0000 0000 0000 1111 1111 1111 1111,低32位為0
// 線程活動計數、線程總數分别減1
// c & ~(AC_MASK|TC_MASK) 擷取c 低32位的值,高32位的值取0
// 調整ctl計數,直到成功為止。 計算結果為:線程活動數量、線程總數量-1,低32位保持不變
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))));
// tryTerminate() 嘗試終止池,因為有可能被移除的線程的存在而導緻池不能終止(活動線程數大于0),
// 如果正在終止/已經終止,傳回true,否則傳回false。 如果傳回true,沒必要進行以下操作,
// 因為tryTerminate() 方法裡面會進行操作
if (!tryTerminate(false, false) && w != null && w.array != null) {
// 取消剩餘的任務
w.cancelAll(); // cancel remaining tasks
WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
// (u = (int)((c = ctl) >>> 32)) < 0 說明池中活動的線程數量未達到parallelism
// (e = (int)c) >= 0 說明池未終止,> 0 說明有等待的線程
while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
// e > 0 說明有等待的線程,進行喚醒
if (e > 0) { // activate or create replacement
// SMASK = 1111 1111 1111 1111
if ((ws = workQueues) == null ||
(i = e & SMASK) >= ws.length || // 等待線程的索引超出了ws的索引
(v = ws[i]) == null)
break; // 退出循環
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// UAC_UNIT = 1 << 16; (u + UAC_UNIT) << 32) 活動線程數量加1
long nc = (((long)(v.nextWait & E_MASK)) |
((long)(u + UAC_UNIT) << 32));
// v 為等待線程的頂棧,INT_SIGN = 1 << 31。因為線程在進入等待之前都需要把eventCount設定為負數
// 是以 v.eventCount == (e | INT_SIGN) 時線程才可能處于等待狀态。
// 判斷eventCount值是否改變,減少不必要的CAS操作,因為如果這裡不相等,下面的CAS肯定失敗
if (v.eventCount != (e | INT_SIGN))
break;
// 不會存在ABA問題,比如: A線程是要喚醒的線程,而喚醒之前A.eventCount會 + 1,
// 然後A線程又進入了等待狀态,雖然 ctl記錄的等待線程的頂棧都是A線程,但是ctl的
// 第17位到第31位值已經不一樣了。是以不會有ABA問題。
if (U.compareAndSwapLong(this, CTL, c, nc)) {
// E_SEQ = 1 << 16,等待次數 + 1,并把最高位設定回0
v.eventCount = (e + E_SEQ) & E_MASK;
// 喚醒線程
if ((p = v.parker) != null)
U.unpark(p);
break; // 退出循環,隻喚醒一個線程
}
}
else {
// 池中沒有等待的線程,但是線程總數未達到 parallelism,是以添加新的線程 (前提是池還未進行終止)
if ((short)u < 0)
tryAddWorker();
break;
}
}
}
// 線程退出時,幫助清理過期引用
if (ex == null) // help clean refs on way out
// 輪詢過期的異常記錄引用并删除它們
ForkJoinTask.helpExpungeStaleExceptions();
else // rethrow
// 重新抛出異常
ForkJoinTask.rethrow(ex);
}
// Submissions
/**
* 除非關閉,否則将給定的任務添加到送出者目前隊列索引(子產品化送出範圍)的送出隊列中。
* Unless shutting down, adds the given task to a submission queue
* at submitter's current queue index (modulo submission
* 該方法隻直接處理最常見的路徑。其他的都轉接到fullExternalPush。
* range). Only the most common path is directly handled in this
* method. All others are relayed to fullExternalPush.
*
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a;
// 擷取線程的threadLocalRandomProbe
int r = ThreadLocalRandom.getProbe();
int ps = plock;
WorkQueue[] ws = workQueues;
// 1、ps > 0 判斷說明池未終止
// 2、ws != null && (m = (ws.length - 1)) >= 0 判斷 ws不為空
// 3、SQMASK = 0x007e -> 1111110,強制把最後一位 置為0,即偶數。非FrokJoinWorkerThread線程
// 送出的任務必須送出到 ws的偶數索引位置;
// 4、r != 0 說明 threadLocalRandomProbe 已經初始化完成,否則需要初始化
// 5、使用CAS擷取WorkQueue q 的鎖
if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock 使用CAS擷取q的鎖
// 1、workQueue放入workQueues之前沒有對array 進行初始化,往workQueue添加任務時,發現array
// 沒有初始化/容量不夠時,才會對array進行初始化/擴容。是以當 q != null時,q.array可能為null
// 2、(am = a.length - 1) > (n = (s = q.top) - q.base), 判斷array是否還有剩餘空間。 am -> arrayMask
// 這裡可以先讀 top (非volatile),而不用先讀 base (volatile) 的原因是:已經先擷取了WorkQueue q 的鎖,
// 是以已經保證了可見性。 對于queueSize()這類的隻讀操作,沒有寫操作,不需要擷取鎖,是以需要先讀base,
// 保證可見性
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
// s = q.top。 計算索引 top的位址偏移量
int j = ((am & s) << ASHIFT) + ABASE;
// 把任務放入top 位置(top所在的索引位置是沒有元素的).
// 使用 putOrderedObject() 保證重新整理到主存中的順序。因為當top != base時表示隊列中
// 一定有元素,如果 top先于 task重新整理到主存,那麼top != base時,擷取的任務可能是空的
U.putOrderedObject(a, j, task);
// top + 1
q.top = s + 1; // push on to deque
// 釋放鎖。釋放鎖不需要使用CAS
q.qlock = 0;
// n <= 1 說明workQueues中在送出此任務之前,隊列中可能沒有任務,工作線程可能是以
// 沒有掃描到任務處于等待狀态。或者 池中的工作線程由于長時間沒有掃描到任務而退出了,
// 導緻池中可能沒有工作線程了。是以需要 通知/注冊 工作線程,保證線程的活躍度
if (n <= 1)
// 參數傳入了 workQueues, workQueue
signalWork(ws, q);
return;
}
// 釋放鎖
q.qlock = 0;
}
fullExternalPush(task);
}
/**
* 完整版的externalPush。這個方法在第一次向池送出第一個任務時調用,是以必須執行二次初始化。
* Full version of externalPush. This method is called, among
* other times, upon the first submission of the first task to the
* pool, so must perform secondary initialization. It also
* 它還通過查找ThreadLocal來檢測外部線程的首次送出,如果索引隊列為空或争用,
* 則建立一個新的共享隊列。
* detects first submission by an external thread by looking up
* its ThreadLocal, and creates a new shared queue if the one at
* index if empty or contended. The plock lock body must be
* plock鎖體必須是無異常的(是以沒有try/finally),是以我們樂觀地在鎖
* 外配置設定新隊列,如果不需要(很少)就丢棄它們。
* exception-free (so no try/finally) so we optimistically
* allocate new queues outside the lock and throw them away if
* (very rarely) not needed.
*
* 當plock為0時發生二次初始化,建立工作隊列數組并将plock設定為一個有效值。
* Secondary initialization occurs when plock is zero, to create
* workQueue array and set plock to a valid value. This lock body
* 這個鎖體也必須是無異常的。因為plock seq值最終可能會包圍在0附近,
* 是以如果工作隊列存在,這個方法就不會有害地失敗初始化,同時仍在推進plock。
* must also be exception-free. Because the plock seq value can
* eventually wrap around zero, this method harmlessly fails to
* reinitialize if workQueues exists, while still advancing plock.
*/
private void fullExternalPush(ForkJoinTask<?> task) {
int r;
// 如果線程的threadLocalRandomProbe 還沒初始化,則進行初始化
if ((r = ThreadLocalRandom.getProbe()) == 0) {
// 初始化 threadLocalRandomSeed 和 threadLocalRandomProbe
ThreadLocalRandom.localInit();
// 擷取threadLocalRandomProbe
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int ps, m, k;
boolean move = false;
// plock < 0 說明池已經關閉,拒絕任務。池關閉時首先會設定 plock = -1,等池滿足了終止條件
// 才會 ctl的停止位為1.
if ((ps = plock) < 0)
throw new RejectedExecutionException();
// ps == 0, plock = 0将發生二次初始化,二次初始化隻是将plock設定為一個非0的有效值,
// 不會修改workQueues
// 判斷workQueues是否已經進行初始化,如果還沒初始化,則進行初始化
else if (ps == 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0) { // initialize workQueues
// 初始化workQueues
// parallelism <= MAX_CAP, MAX_CAP = 0111 1111 1111 1111
// 是以 workQueues的最大長度不會超過 1 << 15
int p = parallelism; // find power of two table size
// 確定至少兩個槽位
int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
// 擷取最小的大于n的2的幂,并乘以2 (parallelism小于 1 << 15,是以不會溢出)
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
// 建立WorkQueue數組,長度為n。當plock = 0時, ws可能非空,是以 nws 可能為null
WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
new WorkQueue[n] : null);
// PL_LOCK = 2 -> 0010 , 如果 PL_LOCK 的第二位為1
// || PL_LOCK的第二位為0,但使用CAS把第二位修改為1失敗
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
// 調用acquirePlock()方法擷取鎖,并傳回擷取鎖後 plock的值
ps = acquirePlock();
// 擷取鎖後需要再次判斷workQueues 是否已經被初始化。如果 nws = null,不會對workQueues進行指派
if (((ws = workQueues) == null || ws.length == 0) && nws != null)
// 初始化 workQueues
workQueues = nws;
// SHUTDOWN = 1 << 31
// ps & SHUTDOWN -> 擷取ps 第32位的值
// & ~SHUTDOWN 的作用是把第32位的值設定為0,避免溢出而導緻第32位的值為1
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
// 每擷取一次鎖,plock的值加2,釋放鎖plock的值再加2
// (雖然已經擷取鎖了,但是最高位和最低位是有可能變的,是以需要使用CAS)
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
// 1、對plock進行指派 2、喚醒所有等待的線程
releasePlock(nps);
}
// 判斷線程在 WorkQueues數組中對應的索引位置是否有workQueue
// r -> 線程的 threadLocalRandomProbe; m = ws.length - 1
// SQMASK = 0x007e -> 1111110,強制把最後一位 置為0,即偶數
// k = r & m & SQMASK 計算線程在WorkQueues數組中對應的索引位置
else if ((q = ws[k = r & m & SQMASK]) != null) {
// qlock 0表示沒有鎖定,1表示鎖定 (plock 池的鎖, qlock 工作隊列的鎖)
// 如果 qlock鎖沒有被擷取,且使用CAS擷取鎖成功
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
// 建立WorkQueue時不會對 array進行初始化,是以 q.array可能為null
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false;
// 鎖定版本的 push
try { // locked version of push
// top - base = 隊列中元素的個數。如果 top + 1 - base = length,說明插入元素後
// 隊列就沒有剩餘容量了,是以進行擴容。
// q.growArray() 對q.array進行初始化 / 擴容 (容量擴大一倍)
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) { // must presize
// a.length 一定是2的幂, s = q.top
// 計算索引s的位址偏移量 ( s = q.top )
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
// top + 1
q.top = s + 1;
// submitted = true 表示任務已送出
submitted = true;
}
} finally {
// 釋放鎖
q.qlock = 0; // unlock
}
if (submitted) {
// 任務送出,喚醒等待的線程或者注冊新的worker
signalWork(ws, q);
return;
}
}
// 如果鎖已經被其他線程擷取了,那麼設定move = true,move = true時會修改
// 線程的 threadLocalRandomProbe 值,即修改線程在workQueues中的映射位置
move = true; // move on failure
}
// 如果 workQueues已經初始化,且線程在workQueues 數組中對應的索引位置沒有元素
else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
// 建立WorkQueue, 模式為:SHARED_QUEUE, seed 為線程的 threadLocalRandomProbe
// 參數為: ForkJoinPool pool, ForkJoinWorkerThread owner, int mode, nt seed
q = new WorkQueue(this, null, SHARED_QUEUE, r);
// k = r & m & SQMASK q在workQueues中對應的索引位置
// 設定workQueue在 workQueues中的索引位置
q.poolIndex = (short)k;
// 擷取plock 擷取
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
// 擷取鎖,并傳回擷取鎖後 plock的值
ps = acquirePlock();
// 擷取鎖後再次判斷 workQueues的索引k位置是否有元素
if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
// 索引k的元素指派為q
ws[k] = q;
// 計算釋放鎖後plock的值,計算後的值最低位一定是0,而不管最低位是否已經變成1了,
// 如果已經變成1了CAS會失敗,然後調用releasePlock()方法,釋放鎖,喚醒所有的線程
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
// 1、對plock進行指派,釋放鎖 2、喚醒所有等待的線程
releasePlock(nps);
}
// workQueues已經初始化,線程對應的索引位置沒有元素,且 plock已經被其他線程擷取了
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
// Maintaining ctl counts
/**
* 活動線程數 + 1,主要在阻塞傳回後調用
* Increments active count; mainly called upon return from blocking.
*/
final void incrementActiveCount() {
long c;
// AC_MASK 高16位為: 1111 1111 1111 1111,低48位為0
// AC_UNIT = 1L << 48, 計算結果為:低 48位不變,高16位加 1
// 活動線程數 + 1,直到成功
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((c & ~AC_MASK) |
((c & AC_MASK) + AC_UNIT))));
}
/**
* 如果激活的worker太少,則嘗試建立或激活worker。
* Tries to create or activate a worker if too few are active.
*
* @param ws the worker array to use to find signallees
* 如果非空,這個隊列包含待處理的任務
* @param q if non-null, the queue holding tasks to be processed
*/
final void signalWork(WorkQueue[] ws, WorkQueue q) {
for (;;) {
long c; int e, u, i; WorkQueue w; Thread p;
// 擷取ctl高32位的值 (ctl的 33-48位 和 49-64的初始值都是 -parallelism)
// u >= 0 說明池中活動的工作線程已經達到parallelism,是以沒有必要喚醒,也沒有必要
// 注冊新的工作線程,直接退出循環,傳回。
if ((u = (int)((c = ctl) >>> 32)) >= 0)
break;
// (e = (int)c) <= 0 -> e = 0 說明池中沒有等待的線程, e < 0 說明池已終止,tryAddWorker()方法裡
// 會判斷池是否已經終止,終止将不進行任何操作
if ((e = (int)c) <= 0) {
// (short)u 表示ctl的 33-48位的值,即 總workers數減去目标并行度 (-parallelism)
// u < 0說明 worker數量還沒達到 parallelism的值,是以注冊新的worker
if ((short)u < 0)
// 添加worker
tryAddWorker();
// 隊列中沒有等待的線程,在這裡傳回
break;
}
// e = (int)c , SMASK = 1111 1111 1111 1111。
// i -> waiters的Treiber頂棧的 poolIndex (等待線程頂棧的在workQueues中的索引)
// w = ws[i]) == null 說明沒有等待線程,不需要進行通知,是以直接break,然後傳回。
if (ws == null || ws.length <= (i = e & SMASK) ||
(w = ws[i]) == null)
break;
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// w 為等待線程的頂棧
// UAC_UNIT = 1 << 16,因為要喚醒一個線程,是以 活動線程數量 + 1
// 等待線程的棧頂指向下一個等待者,活動線程數 + 1
long nc = (((long)(w.nextWait & E_MASK)) |
((long)(u + UAC_UNIT)) << 32);
// w 為等待線程的頂棧,E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// E_SEQ = 1 << 16 等待次數 + 1,并且設定回正數
int ne = (e + E_SEQ) & E_MASK;
// w 為等待線程的頂棧,INT_SIGN = 1 << 31。因為線程在進入等待之前都需要把eventCount設定為負數
// 是以 w.eventCount == (e | INT_SIGN) 時線程才可能處于等待狀态
// CAS操作不會存在ABA問題,比如: A線程是要喚醒的線程,而喚醒之前A.eventCount會 + 1,
// 然後A線程又進入了等待狀态,雖然 ctl記錄的等待線程的頂棧都是A線程,但是ctl的
// 第17位到第31位值已經不一樣了。是以不會有ABA問題。
if (w.eventCount == (e | INT_SIGN) &&
U.compareAndSwapLong(this, CTL, c, nc)) {
// 修改被喚醒線程的 eventCount,即等待次數+1,并且設定回正數。線程的等待次數在被喚醒前
// 才進行 + 1操作,而不是在進入等待之前進行 + 1
w.eventCount = ne;
// 喚醒等待的線程
if ((p = w.parker) != null)
U.unpark(p);
break;
}
// q.base >= q.top 說明隊列中已經沒有任務了,是以不需要再進行喚醒操作。
// 這裡先讀 base,再讀 top,因為 base是 volatile修飾的,而top 不是
if (q != null && q.base >= q.top)
break;
}
}
// Scanning for tasks
/**
* 通過 ForkJoinWorkerThread.run 調用
* Top-level runloop for workers, called by ForkJoinWorkerThread.run.
*/
final void runWorker(WorkQueue w) {
// 配置設定隊列
w.growArray(); // allocate queue
// scan(w, r) 傳回0則繼續掃描,否則結束循環
for (int r = w.hint; scan(w, r) == 0; ) {
// 僞随機數
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
/**
* 掃描,如果發現,運作一個任務,否則可能使worker失活。
* Scans for and, if found, runs one task, else possibly
* inactivates the worker.
* 此方法對volatile狀态的單次讀取進行操作,并設計為連續地重新調用,
* 部分原因是它在檢測到不一緻、争用或狀态更改時傳回,這表明重新調用可能成功。
* This method operates on single reads of
* volatile state and is designed to be re-invoked continuously,
* in part because it returns upon detecting inconsistencies,
* contention, or state changes that indicate possible success on
* re-invocation.
*
* 該掃描從一個随機索引開始,在多個隊列中搜尋任務,每個至少檢查兩次。
* The scan searches for tasks across queues starting at a random
* index, checking each at least twice. The scan terminates upon
* 掃描在找到非空隊列或完成掃描時終止。
* either finding a non-empty queue, or completing the sweep. If
* 如果worker未被滅活,它将從該隊列擷取并運作一個任務。
* the worker is not inactivated, it takes and runs a task from
* 否則,如果不激活,它試圖通過信号來激活自己或其他worker。
* this queue. Otherwise, if not activated, it tries to activate
* itself or some other worker by signalling. On failure to find a
* 如果未能找到任務,則傳回(用于重試)在空掃描期間可能更改了池狀态,或者如果激活則嘗試停用,
* 否則可能阻塞或通過方法awaitWork終止。
* task, returns (for retry) if pool state may have changed during
* an empty scan, or tries to inactivate if active, else possibly
* blocks or terminates via method awaitWork.
*
* @param w the worker (via its WorkQueue)
* @param r a random seed
* @return worker qlock status if would have waited, else 0
*/
private final int scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
// 用于一緻性檢查
long c = ctl; // for consistency check
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
// m = ws.length - 1, j = m + m + 1 的原因是 保證workQueues,可以周遊兩次,因為是先
// 周遊元素,然後j 再減 1,是以起始位置可以周遊3次 (j = m 就可以周遊一次ws的所有元素)。
// eventCount -> registerWorker() 方法初始化時等于 poolIndex 的值
for (int j = m + m + 1, ec = w.eventCount;;) {
WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
// ws[(r - j) & m] 當r為threadLocalRandomProbe時(第一次調scan()時,之後傳入的是一個随機數),
// 從 w所在索引位置的下一個索引開始掃描 (因為一開始w的隊列中并沒有任務可以執行)
// 先讀q.base,再讀q.top,因為 base是volatile修飾的,而 top不是。
// (b = q.base) - q.top < 0 說明隊列中有任務
if ((q = ws[(r - j) & m]) != null &&
(b = q.base) - q.top < 0 && (a = q.array) != null) {
// 計算索引b的位址偏移量, b = q.base, ((a.length - 1) & b 的作用是:
// 每取出一個元素base都會增加,每插入一個元素top也會增加,top - base 結果
// 為隊列中的任務個數,隻要 top - base 不大于隊列array的長度,那麼array中
// 就還能存儲任務,base 一直增加,任務多的時候就會超過a.length,但是任務也一直
// 在被取出,是以隊列可以循環使用的,(a.length - 1) & b 計算任務儲存的位置
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 擷取索引b的元素,并指派給t
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null) {
// 如果scan()方法沒有掃描到任務,且 ctl == c,那麼會設定 ec = -ec(負數),
// 然後傳回0,表示繼續執行scan()方法,而繼續執行scan()方法時掃描到了任務,是以 ec < 0
if (ec < 0)
// 如果ctl上有記錄等待的線程,則喚醒一個線程 / 重新激活自己
// 線程在進入真正等待之前就把目前線程記錄到ctl的等待線程頂棧上了,所有如果線程在下次掃描時掃描到了任務,
// 如果頂棧記錄的還是目前線程,那麼可以進行自我激活,也可以被其他線程激活
helpRelease(c, ws, w, q, b);
// q.base == b 判斷base索引位置的任務還沒有被其他線程擷取
// 如果q.base沒有變化,使用CAS設定索引b 的元素為null
else if (q.base == b &&
U.compareAndSwapObject(a, i, t, null)) {
// 設定q.base = b + 1 (即 base + 1)
U.putOrderedInt(q, QBASE, b + 1);
// (b + 1) - q.top < 0 說明隊列中還有任務,喚醒其他worker,進行任務處理
if ((b + 1) - q.top < 0)
// 如果激活的worker太少,則嘗試建立或激活worker
signalWork(ws, q);
// 執行頂級任務和執行後剩餘的任何本地任務
w.runTask(t);
}
}
// 結束循環,直接傳回0,傳回0意味着從新調用scan()方法
break;
}
// ------------- workQueue 中沒有任務 -----------
// j = j - 1,如果 j < 0說明workQueues已經周遊過兩次,但是沒有找到可以執行的任務
else if (--j < 0) {
// ec = w.eventCount, c = ctl, e = (int)c) 隻取低32位
// w.eventCount < 0 或者 池已經終止,則調用awaitWork() 方法
if ((ec | (e = (int)c)) < 0) // inactive or terminating
// scan()方法隻有兩個地方有return,這裡一個,另一個直接傳回0,意味着從新調用scan()方法
return awaitWork(w, c, ec);
else if (ctl == c) { // try to inactivate and enqueue 嘗試失活和排隊
// AC_UNIT = 1L << 48, c - AC_UNIT -> 活動運作workers的數量減1
// AC_MASK: 1111 1111 1111 1111 000000000000000000000000000000000000000000000000
// TC_MASK: 0000 0000 0000 0000 111111111111111100000000000000000000000000000000
// AC_MASK|TC_MASK = 1111 1111 1111 1111 1111 1111 1111 1111 00000000000000000000000000000000
// 執行結果為:ctl的高16位 -1,低32的值取 ec的值, 而 ec的初始值為: poolIndex 的值,是以
// ctl 的低16位為:waiters的Treiber頂棧的 poolIndex(16位)
long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
// e = (int)c, 記錄原來等待的線程
w.nextWait = e;
// INT_SIGN = 1 << 31 , ec | INT_SIGN 即把 ec 的第32位設定為1, ec為負數,
// 表示 線程不是活動狀态
w.eventCount = ec | INT_SIGN;
// 使用CAS修改ctl的值。 線程在進入真正等待之前就把目前線程記錄到ctl的等待線程頂棧上了,
// 所有如果線程在下次掃描時掃描到了任務,如果頂棧記錄的還是目前線程,那麼可以進行自我激活
if (!U.compareAndSwapLong(this, CTL, c, nc))
// CAS失敗,回退 w.eventCount 的值
w.eventCount = ec; // back out
}
break;
}
}
}
return 0;
}
/**
* 繼續scan(),可能阻塞或終止worker w。
* A continuation of scan(), possibly blocking or terminating
* 如果自上次調用以來池狀态明顯改變,則傳回而不阻塞。
* worker w. Returns without blocking if pool state has apparently
* changed since last invocation.
* 另外,如果停用w導緻池變為靜态,則檢查池是否終止,并且,隻要這不是唯一的worker,
* 就在給定的持續時間等待事件。
* Also, if inactivating w has
* caused the pool to become quiescent, checks for pool
* termination, and, so long as this is not the only worker, waits
* for event for up to a given duration. On timeout, if ctl has
* 在逾時時,如果ctl沒有更改,則終止該worker,這将依次喚醒另一個worker,進而可能重複此過程。
* not changed, terminates the worker, which will in turn wake up
* another worker to possibly repeat this process.
*
* @param w the calling worker
* @param c the ctl value on entry to scan
* @param ec the worker's eventCount on entry to scan
*/
// w的 eventCount < 0, 或者池已經終止會調用這個方法
// 線程進入等待的流程: 1、沒有掃描到任務,修改 eventCount為負數,并記錄ctl的等待線程的頂棧為該線程;
// 2、如果 w.nsteals 不等于0 ,那麼該workQueue竊取的任務數量彙總到ForkJoinPool的執行個體字段stealCount 中
// 3、w.nsteals = 0,那麼判斷進行逾時等待還是無限等待,如果是無限等待的,線程被喚醒會繼續去掃描任務,
// 如果是逾時等待的,超過時間後線程會終止退出。
private final int awaitWork(WorkQueue w, long c, int ec) {
int stat, ns; long parkTime, deadline;
// w.qlock -> 1: locked, -1: terminate; else 0
// (stat = w.qlock) >= 0 判斷 w是否已經終止了, 如果 stat = -1,則傳回 -1,表示終止,工作線程需要退出
// w.eventCount == ec 如果 w已被被其他線程激活了,則結果為false
// 因為中斷隻用于警示線程檢查終止,這是在阻塞時無論如何都進行檢查,我們在任何
// 調用park之前清除狀态(使用Thread.interrupted),是以park并不會立即傳回
// 由于狀态被設定通過一些其他不相關的使用者代碼中調用中斷。
if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c &&
!Thread.interrupted()) {
// 擷取 ctl 低32位的值
int e = (int)c;
// 擷取ctl 高32位的值
int u = (int)(c >>> 32);
// UAC_SHIFT = 16, 計算活動的worker 數量
int d = (u >> UAC_SHIFT) + parallelism; // active count
// e < 0 說明池已經終止
// 如果停用w導緻池變為靜态,則檢查池是否終止,tryTerminate() 如果正在終止/已經終止,傳回true
if (e < 0 || (d <= 0 && tryTerminate(false, false)))
// 池已終止,設定 w的 qlock = -1
stat = w.qlock = -1; // pool is terminating
// 擷取該WorkQueue 的竊取任務數量
else if ((ns = w.nsteals) != 0) { // collect steals and retry
w.nsteals = 0;
// 該workQueue竊取的任務數量彙總到ForkJoinPool的執行個體字段stealCount 中
U.getAndAddLong(this, STEALCOUNT, (long)ns);
}
else {
// d 是活動worker的數量 (在調用此方法之前,d 已經減1了), d > 0說明不是最後一個等待的線程,
// 可以進行無限等待; 如果池中沒有活躍的線程,或者 w是等待線程的頂棧,那麼會進行逾時等待,
// 逾時等待在逾時時 會使用CAS更新ctl的值,如果CAS成功,w會終止退出。
// e : ctl 低32位的值, INT_SIGN = 1 << 31
// ec != (e | INT_SIGN) : 說明w不是等待線程的頂棧 (其他線程也要進入等待狀态,頂棧就會變成另一個線程)
// pc 是線程喚醒後修改 ctl的值。 但是pc = 0L的情況下不會更新 ctl的值
long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L :
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
((long)(w.nextWait & E_MASK)) | // ctl to restore
// u: ctl 高32 位的值; UAC_UNIT = 1 << 16。因為線程從等待狀态到
// 喚醒後,活動worker數量 + 1, pc是線程喚醒後 ctl的值
// << 優先級比 | 高
((long)(u + UAC_UNIT)) << 32);
// 池中沒有活躍的線程,或者 w是等待線程的頂棧,那麼會進行逾時等待
if (pc != 0L) { // timed wait if last waiter
// TC_SHIFT = 32; dc = parallelism - 線程總數
int dc = -(short)(c >>> TC_SHIFT);
// dc < 0 說明線程總數超出 parallelism
// FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L, 當線程數超過并行度級别時的逾時值, 0.2秒。如果線程數量
// 太多了,隻會等待很短的時間。
// IDLE_TIMEOUT = 2000L * 1000L * 1000L; 2秒
parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT:
(dc + 1) * IDLE_TIMEOUT); // dc = parallelism - 線程總數,線程數量越少等待時間越長
// TIMEOUT_SLOP = 2 000 000L = 2毫秒
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
else
// U.park(false, parkTime) parkTime = 0L,線程會無限等待
parkTime = deadline = 0L;
// 再次判斷狀态是否發生改變
if (w.eventCount == ec && ctl == c) {
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
// 把等待的線程儲存到 parker
w.parker = wt; // emulate LockSupport.park
// park之前再次檢查狀态是否發生改變
if (w.eventCount == ec && ctl == c)
U.park(false, parkTime); // must recheck before park
// 注意:如果線程被虛假喚醒也沒事,就相當于 (ns = w.nsteals) != 0 條件不滿足時傳回了
// 設定并清除工作隊列的“parker”字段,以減少不必要的取消unpark的調用。
w.parker = null;
// 清除線程的 parkBlocker 字段
U.putObject(wt, PARKBLOCKER, null);
// parkTime != 0L 說明是逾時等待。如果池中沒有活躍的線程,或者 w是等待線程的頂棧,
// 那麼會進行逾時等待
// deadline - System.nanoTime() <= 0L 說明已逾時
// CAS操作: 在逾時時,如果ctl沒有更改,并更新ctl成功,則終止該worker,
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, pc))
// 池縮小,在逾時時,如果ctl沒有更改,則終止該worker,
// 這将依次喚醒另一個worker,進而可能重複此過程。
// w.qlock = -1,說明此workQueue已終止
stat = w.qlock = -1; // shrink pool
}
}
}
return stat;
}
/**
* 可能釋放(通知)一個worker。隻有當處于非活動狀态的worker發現一個非空隊列時才從scan()調用。
* Possibly releases (signals) a worker. Called only from scan()
* when a worker with apparently inactive status finds a non-empty
* 這需要重新驗證調用者的所有關聯狀态。
* queue. This requires revalidating all of the associated state
* from caller.
*/
// helpRelease(c, ws, w, q, b) -> c: ctl; ws: workQueues; w: 調用該方法的workQueue;
// q: 掃描到具有非空隊列的workQueue; b : q.base,任務的索引位置
private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w,
WorkQueue q, int b) {
WorkQueue v; int e, i; Thread p;
// (e = (int)c) > 0 說明池還沒終止,并且有線程在等待 (0沒有線程在等待,負數池終止)
// SMASK = 1111 1111 1111 1111, i = e & SMASK -> waiters的Treiber頂棧的 poolIndex
// v = ws[i] waiters的頂棧workQueue
if (w != null && w.eventCount < 0 && (e = (int)c) > 0 &&
ws != null && ws.length > (i = e & SMASK) &&
(v = ws[i]) != null && ctl == c) {
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// v.nextWait & E_MASK把nextWait的最高位置為0
// UAC_UNIT = 1 << 16, (c >>> 32) + UAC_UNIT 活動worker 數量加1
// ctl 是通過CAS進行設定的,是以如果ctl的池狀态位變成1了也沒事,CAS會失敗
long nc = (((long)(v.nextWait & E_MASK)) |
((long)((int)(c >>> 32) + UAC_UNIT)) << 32); // nc -> nextCtl
// E_SEQ = 1 << 16, E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// (e + E_SEQ) & E_MASK, 高16位加 1,并且最高位置為 0
int ne = (e + E_SEQ) & E_MASK; // ne -> nextEventCount
// q != null && q.base == b 判斷任務是否已經被取走了
// INT_SIGN = 1 << 31
// v.eventCount == (e | INT_SIGN) 判斷v是否已經被喚醒了,如果不相等,說明已經被其他喚醒了
if (q != null && q.base == b && w.eventCount < 0 &&
v.eventCount == (e | INT_SIGN) &&
U.compareAndSwapLong(this, CTL, c, nc)) {
v.eventCount = ne;
// p 可能為null,因為線程ctl記錄的等待線程可能還沒真正進入等待,隻是先記錄到ctl上,
if ((p = v.parker) != null)
U.unpark(p);
}
}
}
/**
* 試圖為給定任務的竊取者定位和執行任務,或者反過來跟蹤它的偷竊者,跟蹤currentSteal—> currentJoin連結,
* 尋找在給定任務的後代上帶有非空隊列的工作線程,并把任務竊取回來并執行任務。
* Tries to locate and execute tasks for a stealer of the given
* task, or in turn one of its stealers, Traces currentSteal ->
* currentJoin links looking for a thread working on a descendant
* of the given task and with a non-empty queue to steal back and
* execute tasks from.
* 在等待join時對這個方法的第一次調用通常需要進行掃描/搜尋(這是可以的,
* 因為joiner沒有更好的事情要做),但是這個方法會在workers中留下提示來加快後續調用。
* The first call to this method upon a
* waiting join will often entail scanning/search, (which is OK
* because the joiner has nothing better to do), but this method
* leaves hints in workers to speed up subsequent calls. The
* 該實作非常靈活,可以處理潛在的不一緻性或循環遇到過時、未知或太長以至于可能是循環的鍊。
* implementation is very branchy to cope with potential
* inconsistencies or loops encountering chains that are stale,
* unknown, or so long that they are likely cyclic.
*
* @param joiner the joining worker
* @param task the task to join
* 如果沒有進展傳回0,如果任務已經完成傳回傳回負數,否則傳回正數(任務有進展)
* @return 0 if no progress can be made, negative if task
* known complete, else positive
*/
// 尋找并竊取task的一個子任務來執行,若子任務還進行分割,則執行完隊列中的所有子任務
private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
int stat = 0, steps = 0; // bound to avoid cycles
// joiner.base - joiner.top >= 0 說明joiner隊列中沒有任務,如果joiner的工作隊列中有任務
// 則直接傳回 0,不能幫助竊取任務
// 隊列中有任務不能去竊取任務的原因是:竊取完任務執行後,如果任務又分割了,則隊列中的任務不一定都是目前竊取
// 任務的子任務,如果其他工作線程執行tryHelpStealer()根據currentSteal 來這裡竊取了任務,則會導緻竊取到
// 任務就可能不是其他工作線程想竊取的任務
if (task != null && joiner != null &&
joiner.base - joiner.top >= 0) { // hoist checks
restart: for (;;) {
// 目前等待完成的目标任務 (這個任務可能被線程A竊取了,然後此任務被分割成了任務A和任務B,線程A執行了
// 任務A,然後等待任務B完成,此時joiner等待完成的任務轉向任務B,因為目标任務的完成,需要任務B先完成,
// 此時subtask = 任務B)
ForkJoinTask<?> subtask = task; // current target 目前目标
// r
// / \
// b c
// / \ / \
// d e f g
// j 為等待任務完成的等待者。若joiner等待任務c完成,而線程A竊取了任務c,執行了任務f,然後
// 等待任務g完成,此時j = 線程A,subtask = g
for (WorkQueue j = joiner, v;;) { // v is stealer of subtask v是subtask的偷竊者
WorkQueue[] ws; int m, s, h;
// (s = task.status) < 0 判斷任務是否已經完成,若已經完成傳回任務狀态值
if ((s = task.status) < 0) { // 必須是task完成,而不是subtask
stat = s;
break restart;
}
// 如果workQueues 為空,結束循環,此時stat = 0,即傳回0
// m = ws.length - 1
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
break restart; // shutting down 池已關閉
// 工作線程的workQueue是在奇數索引位置上, j.hint | 1 保證結果是奇數
if ((v = ws[h = (j.hint | 1) & m]) == null ||
v.currentSteal != subtask) {
for (int origin = h;;) { // find stealer 查找竊取者
// h = (h + 2) & m 計算下一個工作線程的索引位置
// 15 -> 1111 (索引位置為1、17、33...), 并且任務已經完成 (外部循環會傳回狀态值)
// || join等待完成的任務不是subtask (注意:j不一定是傳入的joiner,下面會修改j,是以
// 有可能subtask已經執行完成了,即過時了)
if (((h = (h + 2) & m) & 15) == 1 &&
(subtask.status < 0 || j.currentJoin != subtask))
// 偶爾過時檢查 (重新任務是否已經完成或者池是否已經關閉,
// 然後繼續從j開始開始周遊)
continue restart; // occasional staleness check
// 找到任務的竊取者
if ((v = ws[h]) != null &&
v.currentSteal == subtask) {
// 在等待join時對這個方法的第一次調用通常需要進行掃描/搜尋(這是可以的,
// 因為joiner沒有更好的事情要做),但是這個方法會在workers中留下提示來
// 加快後續調用 (竊取回任務然後執行完成後若任務沒有完成需要再去竊取任務,
// 留下提示可以快速找到任務竊取者的位置)
// 記錄偷竊者的索引位置
j.hint = h; // save hint 儲存提示
break;
}
if (h == origin)
// 循環一遍了,沒有找到任務竊取者
break restart; // cannot find stealer 找不到任務偷竊者
}
}
// ------- 工作線程v 目前竊取的任務是 subtask ------------
for (;;) { // help stealer or descend to its stealer
ForkJoinTask<?>[] a; int b;
// subtask已經完成
if (subtask.status < 0) // surround probes with
// 外部會判斷 task是否完成,如果task沒有完成需要繼續執行
continue restart; // consistency checks
// 判斷工作線程v 的工作隊列不為空
if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
// 計算索引b的位址偏移量
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 先拿出任務,然後判斷v 目前竊取的任務是否還是subtask,然後通過CAS取出任務,
// 保證竊取出的任務是subtask的子任務 (因為此時有可能v目前竊取的任務不是subtask,而
// 任務t也不是subtask的子任務,設計地太完美了)
ForkJoinTask<?> t =
(ForkJoinTask<?>)U.getObjectVolatile(a, i);
// 1、subtask.status < 0 說明subtask完成了;
// 2、j.currentJoin != subtask 說明j (不一定是傳入的joiner,下面會修改) 等待完成的
// 任務subtask完成了;
// 3、v.currentSteal != subtask 說明v 偷竊的任務subtask 完成了
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask)
continue restart; // stale 過時了
stat = 1; // apparent progress 明顯的進展
// v.base == b 判斷任務是否被其他線程竊取了
if (v.base == b) {
// 任務沒有被其他線程竊取,但是可能被線程v自己執行了,是以可能 v.base == b,而t 為null
if (t == null)
break restart;
// 使用CAS取出任務
if (U.compareAndSwapObject(a, i, t, null)) {
// 修改base
U.putOrderedInt(v, QBASE, b + 1);
// ps -> previousSteal 上一個竊取的任務
ForkJoinTask<?> ps = joiner.currentSteal;
int jt = joiner.top; // 記錄joiner 目前的top值
do {
// 修改joiner 目前竊取的任務為t
joiner.currentSteal = t;
// 執行任務t
t.doExec(); // clear local tasks too 并且清空本隊列的任務
} while (task.status >= 0 && // task任務沒有完成
joiner.top != jt && // 如果 top值發生改變了,說明任務t 又進行分割了
(t = joiner.pop()) != null); // 是以繼續取出子任務執行
// joiner目前竊取的任務修改回上一個竊取的任務 ps
joiner.currentSteal = ps;
// 跳出外部循環,因為此時stat = 1,是以結束循環後會傳回1,外部會判斷task任務是否完成,
// 沒有完成繼續竊取任務,是以已經留下提示了,是以可以很快找到任務竊取者
// (這裡沒有繼續判斷v的隊列中是否還有task的子任務,)
break restart;
}
}
}
// 工作線程v 的工作隊列為空,說明subtask的子任務都執行完了,或者子任務被
// 其他工作者竊取了,是以轉向 currentJoin,此為subtask 等待完成的子任務
else { // empty -- try to descend
// next 為v 目前等待完成的任務
// v.currentSteal != subtask 說明任務已經完成了
ForkJoinTask<?> next = v.currentJoin;
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask)
// 過時了todo
continue restart; // stale
// 沒有出路或者可能是循環的 (MAX_HELP = 64),任務分割次數超過64次
else if (next == null || ++steps == MAX_HELP)
break restart; // dead-end or maybe cyclic
else {
// 等待完成的任務轉向 next
subtask = next;
// 等待者為v
j = v;
// 跳出此循環,查找subtask 的偷竊者
break;
}
}
}
}
}
}
return stat;
}
/**
* 類似tryHelpStealer,用于CountedCompleters。
* Analog of tryHelpStealer for CountedCompleters. Tries to steal
* 試圖在目标的計算範圍内竊取和運作任務。
* and run tasks within the target's computation.
*
* @param task the task to join
* @param maxTasks the maximum number of other tasks to run
*/
// 1、檢查joiner工作隊列的top位置,及其他工作線程(包括joiner)的base位置的位置是否是
// 目标任務task或者其子任務,若是取出并執行。
// 2、判斷隊列中的base索引位置的任務就夠了,因為工作隊列中的所有任務都是源于同一個父節點任務
final int helpComplete(WorkQueue joiner, CountedCompleter<?> task,
int maxTasks) {
WorkQueue[] ws; int m;
int s = 0;
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
joiner != null && task != null) {
// 擷取 joiner在 workQueues中的索引
int j = joiner.poolIndex;
// 掃描次數
int scans = m + m + 1;
long c = 0L; // for stability check 穩定的檢查
// j += 2 隻判斷工作線程的隊列
for (int k = scans; ; j += 2) {
WorkQueue q;
// task.status < 0 說明任務已經完成了
if ((s = task.status) < 0)
break;
// 判斷工作隊列的最後一個任務(top位置)是否是目标任務task,或者其子任務,如果是的話,取出這個任務并執行,
// 然後傳回true,否則傳回false。 (每次循環都要檢查joiner的工作隊列的top是否為目标任務或者其子任務,因為
// 竊取竊取到任務并執行後,隊列中可能會有竊取到的任務的子任務,需要先執行完自己隊列中的任務)
else if (joiner.internalPopAndExecCC(task)) {
// maxTasks = Integer.MAX_VALUE
if (--maxTasks <= 0) {
s = task.status;
break;
}
// 重置k
k = scans;
}
// 判斷任務是否已經執行完成,若已執行完成,傳回任務狀态
else if ((s = task.status) < 0)
break;
// 判斷隊列中base索引位置的任務是否是root任務或者其子任務,如果是,取出并執行。
// 如果base索引位置的任務已經被其他線程擷取,或者base索引位置的任務是root任務
// 或者其子任務,傳回true; 否則傳回false
// q 可能為 joiner (第一次循環時就是joiner)
else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
if (--maxTasks <= 0) {
s = task.status;
break;
}
// 重置k
k = scans;
}
// --k < 0說明工作隊列已經掃描過4遍了 (因為隻掃描奇數索引的工作者隊列)
else if (--k < 0) {
// 先判斷 c是否等于 ctl,再把 ctl指派給c。 初始值 c = 0L,是以第一次 --k < 0,
// c == ctl 為false
if (c == (c = ctl))
break;
// 重置k
k = scans;
}
}
}
return s;
}
/**
* 嘗試減少活動計數(有時是隐式的),并可能釋放或建立一個補償worker,為阻塞做準備。
* Tries to decrement active count (sometimes implicitly) and
* possibly release or create a compensating worker in preparation
* 在競争或者終止時失敗。否則,添加一個新線程,如果沒有空閑的workers可用和池可能成為饑餓。
* for blocking. Fails on contention or termination. Otherwise,
* adds a new thread if no idle workers are available and pool
* may become starved.
*
* @param c the assumed ctl value
*/
// 1、如果有空閑的線程,則喚醒一個空閑的線程,并傳回true(池中增加了一個活動的線程,而這個線程要進入等待狀态,
// 活動線程減1,剛好進行補償,是以不用修改 ctl的值);
// 2、總線程數 >= parallelism && 活動線程數 > 1,則修改ctl活動線程數減1,并傳回true
// 3、沒有空閑的線程,且 總線程數少于parallelism || 活動線程數隻有一個,并且總線程數少于MAX_CAP,則建立一個新的線程
// 建立并啟動線程成功傳回true,失敗傳回false (活動線程數不變,是以不用修改ctl的值)
final boolean tryCompensate(long c) {
WorkQueue[] ws = workQueues;
int pc = parallelism, e = (int)c, m, tc;
// 第一次調用 c傳入的是0, 之後傳入的ctl的值
if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) {
// ctl低16位為:waiters的Treiber頂棧的 poolIndex,是以 w為頂棧的等待線程的索引
WorkQueue w = ws[e & m];
// w != null,說明有等待線程
if (e != 0 && w != null) {
Thread p;
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// AC_MASK 高16位為:1111 1111 1111 1111,低48位為0
// TC_MASK 高32位為:0000 0000 0000 0000 1111 1111 1111 1111,低32位為0
// 計算ctl新的值,高32為不變,低32位變為 w.nextWait (nextWait儲存的是ctl低32位的值)
long nc = ((long)(w.nextWait & E_MASK) |
(c & (AC_MASK|TC_MASK)));
// E_SEQ = 1 << 16 ; E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
int ne = (e + E_SEQ) & E_MASK;
// INT_SIGN = 1 << 31
// w.eventCount == (e | INT_SIGN) 時線程才可能處于等待狀态, 因為線程在進入等待之前
// 都需要把eventCount設定為負數
if (w.eventCount == (e | INT_SIGN) &&
U.compareAndSwapLong(this, CTL, c, nc)) { // CAS修改ctl的值
// 修改被喚醒線程的 eventCount,即等待次數+1,并且設定回正數。線程的等待次數在被喚醒前
// 才進行 + 1操作,而不是在進入等待之前進行 + 1 (修改等待次數,避免ABA問題)
w.eventCount = ne;
if ((p = w.parker) != null)
// 喚醒線程
U.unpark(p);
// 使用空閑線程替換, 不補償
return true; // replace with idle worker
}
}
// ----- 沒有空閑線程 -----
// TC_SHIFT = 32, (short)(c >>> TC_SHIFT) 的值為總線程數 - parallelism
// AC_SHIFT = 48,(int)(c >> AC_SHIFT) + pc > 1 判斷活動線程數是否大于1
// 總線程數 >= parallelism && 活動線程數 > 1
else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
(int)(c >> AC_SHIFT) + pc > 1) {
// AC_UNIT = 1L << 48, AC_MASK 高16位為: 1111 1111 1111 1111,低48位為0
// 計算結果為:高16位減1,即活動線程數減1,低48位不變
long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
// 使用CAS 修改 ctl的值
if (U.compareAndSwapLong(this, CTL, c, nc))
return true; // no compensation 不補償
}
// tc = (short)(c >>> TC_SHIFT), TC_SHIFT = 32, pc = parallelism
// MAX_CAP = 0111 1111 1111 1111
else if (tc + pc < MAX_CAP) { // 判斷總線程數少于 MAX_CAP
// TC_UNIT = 1L << 32, TC_MASK 高32位為:0000 0000 0000 0000 1111 1111 1111 1111,低32位為0
// 計算結果為:總線程數加1,其他不變 (注意:活動線程數沒有加1,因為工作線程要進入等待狀态,活動線程
// 少了一個,是以剛好進行補償)
long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
// 使用CAS修改ctl的值
if (U.compareAndSwapLong(this, CTL, c, nc)) {
ForkJoinWorkerThreadFactory fac;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
// 建立線程,this -> ForkJoinPool
if ((fac = factory) != null &&
(wt = fac.newThread(this)) != null) {
// 啟動線程
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
// 建立或者啟動線程出現異常,登出wt
deregisterWorker(wt, ex); // clean up and return false
}
}
}
return false;
}
/**
* 幫助 和/或 阻塞直到給定的任務完成
* Helps and/or blocks until the given task is done.
*
* @param joiner the joining worker
* @param task the task
* @return task status on exit
*/
/**
* 1、工作線程等待完成的任務一般都是其竊取的任務的子任務(除非個别情況,等待完成的是使用者送出的另一個任務),
* 而且工作線程的任務隊列一定都是目前竊取任務的子任務,不會存在其他任務,但是這些任務有可能被其他線程竊取了。
* 是以等待任務完成優先判斷任務是否在目前隊列裡 (CounterCompleter判斷方式有點不一樣,因為CounterCompleter
* 隻有根任務會等待完成,是以需要判斷是否其葉子任務,是的話取出執行)
*
* 2、一般的ForkJoinTask任務判斷工作線程竊取的任務是否是目前等待完成的任務,如果是幫助完成其隊列中的任務,
* 因為肯定都是子任務。但是這些任務有可能被其他線程竊取了,是以這個工作隊列可能在等待子任務的完成,擷取這個
* 子任務,作為下一個等待完成的任務。。。
*
* 3、隻會檢查工作線程的隊列是否有目标任務/目标任務的子任務,不會檢查公共隊列。
*/
final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
int s = 0;
// (s = task.status) >= 0 說明任務還未完成
if (task != null && (s = task.status) >= 0 && joiner != null) {
// 儲存上一個等待完成的任務 (執行awaitJoin()時可能會去執行另一個任務,而任務裡面也有task.join(),
// 是以又執行到了awaitJoin()方法... 産生了鍊式調用,是以需要儲存上一個等待完成的任務)
ForkJoinTask<?> prevJoin = joiner.currentJoin;
// 修改目前等待完成的任務
joiner.currentJoin = task;
// doJoin()調用tryUnpush(this) 僅當this是workQueue的最後一個任務才會取出任務,
// tryRemoveAndExec() 則是周遊整個workQueue隊列去查找任務,找到則執行這個任務
do {} while (joiner.tryRemoveAndExec(task) && // process local tasks 處理本地任務
(s = task.status) >= 0);
if (s >= 0 && (task instanceof CountedCompleter))
// 檢查joiner工作隊列的top位置,及其他工作線程(包括joiner)的base位置的位置是否是
// 目标任務task或者其子任務,若是取出并執行。
s = helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
long cc = 0; // for stability checks 穩定性檢查
while (s >= 0 && (s = task.status) >= 0) {
// tryHelpStealer()如果沒有進展傳回0,如果任務已經完成傳回傳回負數,否則傳回正數(任務有進展)
// 沒有進展,且任務還沒有完成
if ((s = tryHelpStealer(joiner, task)) == 0 &&
(s = task.status) >= 0) {
// tryCompensate()如果存在空閑線程則喚醒一個,如果沒有空閑線程,且隻剩下一個
// 活動的線程,則添加一個線程 (方法裡面已經維護好了活動線程數,即 ctl的值)
if (!tryCompensate(cc))
cc = ctl;
else {
// 嘗試設定SIGNAL狀态,表明有線程在等待此任務完成,需要喚醒。
if (task.trySetSignal() && (s = task.status) >= 0) {
synchronized (task) {
if (task.status >= 0) {
try { // see ForkJoinTask
// 線程進入等待狀态
task.wait(); // for explanation
} catch (InterruptedException ie) {
// 忽略中斷
}
}
else
// 如果任務已經完成,喚醒所有的等待線程
task.notifyAll();
}
}
long c; // reactivate
// AC_MASK 高16位為: 1111 1111 1111 1111,低48位為0, AC_UNIT = 1L << 48
// 更新ctl,高16位加1,低48位保持不變,表示活動線程數 +1
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl,
((c & ~AC_MASK) |
((c & AC_MASK) + AC_UNIT))));
}
}
}
// 等待完成的任務重置為上一個等待完成的任務
joiner.currentJoin = prevJoin;
}
return s;
}
/**
* Stripped-down variant of awaitJoin used by timed joins. Tries
* 隻有在有持續的進展時,才試圖幫助加join。(調用者将進入定時等待。)
* to help join only while there is continuous progress. (Caller
* will then enter a timed wait.)
*
* @param joiner the joining worker
* @param task the task
*/
// 幫助任務完成,和awaitJoin() 不同的是線程不會進入等待狀态,幫助任務完成的過程是一樣的
final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) {
int s;
if (joiner != null && task != null && (s = task.status) >= 0) {
// 儲存joiner目前等待完成的任務
ForkJoinTask<?> prevJoin = joiner.currentJoin;
// 修改joiner目前等待完成的任務為 task
joiner.currentJoin = task;
// tryRemoveAndExec()周遊整個workQueue隊列去查找任務,找到則執行這個任務
do {} while (joiner.tryRemoveAndExec(task) && // process local tasks 處理本地任務
(s = task.status) >= 0);
if (s >= 0) {
if (task instanceof CountedCompleter)
// 檢查joiner工作隊列的top位置,及其他工作線程(包括joiner)的base位置的位置是否是
// 目标任務task或者其子任務,若是取出并執行。
helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
// tryHelpStealer() 尋找并竊取task的一個子任務來執行,若子任務還進行分割,則執行完隊列中的所有子任務
do {} while (task.status >= 0 &&
tryHelpStealer(joiner, task) > 0);
}
// 重置 currentJoin
joiner.currentJoin = prevJoin;
}
}
/**
* 傳回一個(可能)非空的竊取隊列(如果在掃描期間找到),否則傳回null
* Returns a (probably) non-empty steal queue, if one is found
* during a scan, else null. This method must be retried by
* 如果調用者在嘗試使用隊列時該隊列為空,則必須重試此方法。
* caller if, by the time it tries to use the queue, it is empty.
*/
private WorkQueue findNonEmptyStealQueue() {
int r = ThreadLocalRandom.nextSecondarySeed();
for (;;) {
int ps = plock, m; WorkQueue[] ws; WorkQueue q;
// 判斷 workQueues 不為空, m = ws.length - 1
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
// (m + 1) << 2,是以隻掃描奇數索引位置,是以會掃描8次所有工作線程的隊列
for (int j = (m + 1) << 2; j >= 0; --j) {
// 1、 << 1 可以将結果放大一倍,即 --j會導緻 ((r - j) << 1) 結果少2,
// 因為需要跳過偶數索引位置,每次需要減2
// 2、 | 1保證結果為奇數,因為工作者的隊列在奇數索引位置上
if ((q = ws[(((r - j) << 1) | 1) & m]) != null &&
q.base - q.top < 0)
// q.base - q.top < 0 隊列不為空,傳回此隊列
return q;
}
}
// plock = ps,說明沒有注冊新的工作線程,也就是workQueues 中沒有增加新工作線程
// 的工作隊列,因為注冊工作線程需要擷取plock 鎖,plock值會變化
if (plock == ps)
return null;
// plock變化了則有可能增加了新工作隊列,需要重新掃描
}
}