package java.util.concurrent;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.security.AccessControlContext;
import java.security.ProtectionDomain;
import java.security.Permissions;
/**
* 一个ExecutorService运行ForkJoinTask 任务。
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
* {@code ForkJoinPool}为非{@code ForkJoinTask}客户端提供提交入口,同时也提供管理和监控操作。
* A {@code ForkJoinPool} provides the entry point for submissions
* from non-{@code ForkJoinTask} clients, as well as management and
* monitoring operations.
*
* {@code ForkJoinPool}与其他类型的{@link ExecutorService}的区别主要在于采用了<em>work-steal </em>:
* <p>A {@code ForkJoinPool} differs from other kinds of {@link
* ExecutorService} mainly by virtue of employing
* <em>work-stealing</em>:
* 池中的所有线程都试图查找和执行提交给池和/或由其他活动任务创建的任务
* (如果不存在工作,则最终阻塞等待工作)。
* all threads in the pool attempt to find and
* execute tasks submitted to the pool and/or created by other active
* tasks (eventually blocking waiting for work if none exist). This
* 当大多数任务生成其他子任务时(就像大多数{@code ForkJoinTask}一样),
* 以及许多小任务从外部客户端提交到池时,这就支持了高效的处理。
* enables efficient processing when most tasks spawn other subtasks
* (as do most {@code ForkJoinTask}s), as well as when many small
* tasks are submitted to the pool from external clients. Especially
* 特别是在构造函数中将<em>asyncMode</em>设置为true时,{@code ForkJoinPool}
* 可能也适合用于从未加入的事件类型任务。
* when setting <em>asyncMode</em> to true in constructors, {@code
* ForkJoinPool}s may also be appropriate for use with event-style
* tasks that are never joined.
*
* 静态{@link #commonPool()}是可用的,并且适合于大多数应用程序。
* <p>A static {@link #commonPool()} is available and appropriate for
* most applications.
* 公共池由任何没有显式提交到指定池的ForkJoinTask使用。
* The common pool is used by any ForkJoinTask that
* is not explicitly submitted to a specified pool. Using the common
* 使用公共池通常会减少资源的使用(它的线程在不使用期间缓慢地回收,在随后使用时恢复)。
* pool normally reduces resource usage (its threads are slowly
* reclaimed during periods of non-use, and reinstated upon subsequent
* use).
*
* 对于需要单独或自定义池的应用程序,可以使用给定的目标并行度级别构造{@code ForkJoinPool};
* 默认情况下,等于可用处理器的数量。
* <p>For applications that require separate or custom pools, a {@code
* ForkJoinPool} may be constructed with a given target parallelism
* level; by default, equal to the number of available processors. The
* 池尝试通过动态添加、挂起或恢复内部工作线程来维护足够的活动(或可用)线程,
* 即使有些任务因等待加入其他任务而停止。
* pool attempts to maintain enough active (or available) threads by
* dynamically adding, suspending, or resuming internal worker
* threads, even if some tasks are stalled waiting to join others.
* 但是,在面对阻塞的I/O或其他非托管同步时,不能保证这样的调整。
* 嵌套的{@link ManagedBlocker}接口允许扩展所适应的同步类型。
* However, no such adjustments are guaranteed in the face of blocked
* I/O or other unmanaged synchronization. The nested {@link
* ManagedBlocker} interface enables extension of the kinds of
* synchronization accommodated.
*
* 除了执行和生命周期控制方法之外,这个类还提供状态检查方法(例如{@link #getStealCount}),
* 用于帮助开发、调优和监视fork/join应用程序。
* <p>In addition to execution and lifecycle control methods, this
* class provides status check methods (for example
* {@link #getStealCount}) that are intended to aid in developing,
* tuning, and monitoring fork/join applications. Also, method
* 而且,方法{@link #toString}以方便非正式监视的形式返回池状态指示。
* {@link #toString} returns indications of pool state in a
* convenient form for informal monitoring.
*
* 与其他ExecutorServices一样,下表总结了三种主要的任务执行方法。
* <p>As is the case with other ExecutorServices, there are three
* main task execution methods summarized in the following table.
* 它们主要用于当前池中尚未参与fork/join计算的客户端。
* These are designed to be used primarily by clients not already
* engaged in fork/join computations in the current pool. The main
* 这些方法的主要形式接受{@code ForkJoinTask}的实例,但是重载的形式也
* 允许混合执行普通的{@code Runnable}或{@code Callable}。
* forms of these methods accept instances of {@code ForkJoinTask},
* but overloaded forms also allow mixed execution of plain {@code
* Runnable}- or {@code Callable}- based activities as well. However,
* 但是,已经在池中执行的任务通常应该使用表中列出的计算内表单,除非使用通常
* 不连接的异步事件样式的任务,在这种情况下,方法的选择几乎没有区别。
* tasks that are already executing in a pool should normally instead
* use the within-computation forms listed in the table unless using
* async event-style tasks that are not usually joined, in which case
* there is little difference among choice of methods.
*
* <table BORDER CELLPADDING=3 CELLSPACING=1>
* <caption>Summary of task execution methods</caption>
* <tr>
* <td></td>
* <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
* <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
* </tr>
* <tr>
* <td> <b>Arrange async execution</b></td>
* <td> {@link #execute(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork}</td>
* </tr>
* <tr>
* <td> <b>Await and obtain result</b></td>
* <td> {@link #invoke(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#invoke}</td>
* </tr>
* <tr>
* <td> <b>Arrange exec and obtain Future</b></td>
* <td> {@link #submit(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
* </tr>
* </table>
*
* 公共池默认情况下是用默认参数构造的,但是可以通过设置三个
* {@linkplain System#getProperty System properties}来控制这些参数:
* <p>The common pool is by default constructed with default
* parameters, but these may be controlled by setting three
* {@linkplain System#getProperty system properties}:
* <ul>
* <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
* 并行级别,非负整数
* - the parallelism level, a non-negative integer
*
* <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
* ForkJoinWorkerThreadFactory 类名
* - the class name of a {@link ForkJoinWorkerThreadFactory}
*
* <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
* - the class name of a {@link UncaughtExceptionHandler}
* </ul>
*
* 如果一个SecurityManager存在且没有指定工厂,则默认池使用一个工厂提供的线程不启用Permissions 。
* If a {@link SecurityManager} is present and no factory is
* specified, then the default pool uses a factory supplying
* threads that have no {@link Permissions} enabled.
* 系统类加载器用于加载这些类。
* The system class loader is used to load these classes.
* 建立这些设置有任何错误,使用默认参数。
* Upon any error in establishing these settings, default parameters
* are used.
* 通过将parallelism属性设置为0,和/或使用可能返回{@code null}的工厂,
* 可以禁用或限制公共池中线程的使用。
* It is possible to disable or limit the use of threads in
* the common pool by setting the parallelism property to zero, and/or
* using a factory that may return {@code null}. However doing so may
* 但是这样做可能会导致未加入的任务永远不会执行。
* cause unjoined tasks to never be executed.
*
* 实现注意事项 :此实现将运行的最大线程数限制为32767.尝试创建大于最大数目
* 的池导致IllegalArgumentException 。 ( (1 << 15) - 1)
* <p><b>Implementation notes</b>: This implementation restricts the
* maximum number of running threads to 32767. Attempts to create
* pools with greater than the maximum number result in
* {@code IllegalArgumentException}.
*
* 此实现仅在池关闭或内部资源耗尽时拒绝提交的任务(即抛出RejectedExecutionException )。
* <p>This implementation rejects submitted tasks (that is, by throwing
* {@link RejectedExecutionException}) only when the pool is shut down
* or internal resources have been exhausted.
*
* @since 1.7
* @author Doug Lea
*/
/**
* 1、外部提交任务时,如果线程在workQueues中对应的索引位置没有workQueue,则会创建一个workQueue,
* 并放入workQueues中对应的索引位置。
*
* 2、注册工作线程ForkJoinWorkerThread时,其构造方法会调用ForkJoinPool的registerWorker() 方法,
* 方面里面会创建workQueue,并注册到 workQueues中,最后返回给 ForkJoinWorkerThread,然后线程
* 里面也会保存此 workQueue。线程创建时没有初始化工作队列,线程运行的时候才会调用growArray()方法
* 分配工作队列ForkJoinTask<?>[]。
*
* 3、b = q.base, ((a.length - 1) & b 的作用是?
* 每取出一个元素base都会增加(扩容时也不会重置或者减少),每插入一个元素top也会增加,top - base 结果
* 为队列中的任务个数,只要 top - base 不大于队列array的长度,那么array中就还能存储任务,base 一直增加,
* 任务多的时候就会超过a.length,但是任务也一直在被取出,因此队列可以循环使用的,(a.length - 1) & b
* 计算任务保存的位置。从队列的中间索引位置开始,循环使用。
*
* 4、CountedCompleter 和 一般ForkJoinTask任务在等待任务完成上的不同,CountedCompleter只能等待根任务完成,
* 而一般的ForkJoinTask可以等待子任务完成,所以join()在帮助任务完成时,一般的ForkJoinTask只需要判断其他
* 工作线程当前窃取的任务是否是等待完成的任务即可,而CountedCompleter 任务需要一级一级往上判断是否是其子
* 任务,而不能通过判断工作线程当前窃取的任务是否是目标任务。
*
* 5、join()方法进入等待之前,会进行补偿,如果池中只剩下一个活动的工作线程了,那么会注册新的工作线程去执行任务,
* 注册成功后,线程才可以进入等待状态,因此活动的线程数会大于 parallelism。
*
*/
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
/*
* 实现概述
* Implementation Overview
*
* 这个类和它的嵌套类提供了一组工作线程的主要功能和控制:
* This class and its nested classes provide the main
* functionality and control for a set of worker threads:
* 来自非fj(ForkJoin)线程的提交将进入提交队列。
* Submissions from non-FJ threads enter into submission queues.
* Workers接受这些任务,并通常将它们分解为可能被其他workers窃取的子任务。
* Workers take these tasks and typically split them into subtasks
* that may be stolen by other workers.
*
* 优先级规则优先处理来自其自身队列的任务(LIFO或FIFO,取决于模式),
* 然后随机FIFO处理其他队列中的任务
* Preference rules give
* first priority to processing tasks from their own queues (LIFO
* or FIFO, depending on mode), then to randomized FIFO steals of
* tasks in other queues.
*
* 工作队列
* WorkQueues
* ==========
*
* 大多数操作发生在工作窃取队列中(在嵌套的类工作队列中)。
* Most operations occur within work-stealing queues (in nested
* class WorkQueue).
* 这些特别形式的双队列只能支持四种可能终端操作的三种,push,pop, and poll(又称 偷),
* These are special forms of Deques that
* support only three of the four possible end-operations -- push,
* pop, and poll (aka steal), under the further constraints that
* 在进一步的约束下,push和pop只能从拥有的线程中调用(或者,正如这里所扩展的,在一个锁中调用),
* 而poll可以从其他线程中调用。
* push and pop are called only from the owning thread (or, as
* extended here, under a lock), while poll may be called from
* other threads.
* 如果您不熟悉它们,那么您可能想要阅读Herlihy和Shavit的书《多处理器编程的艺术》
* (The Art of Multiprocessor programming),第16章对这些进行了更详细的描述。
* (If you are unfamiliar with them, you probably
* want to read Herlihy and Shavit's book "The Art of
* Multiprocessor programming", chapter 16 describing these in
* more detail before proceeding.)
* 主要的工作窃取队列设计与Chase和Lev的论文《"Dynamic Circular Work-Stealing Deque"
* 大致相似,
* The main work-stealing queue
* design is roughly similar to those in the papers "Dynamic
* Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
* (http://research.sun.com/scalable/pubs/index.html) and
* "Idempotent work stealing" by Michael, Saraswat, and Vechev,
* PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
* See also "Correct and Efficient Work-Stealing for Weak Memory
* Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
* (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
* analysis of memory ordering (atomic, volatile etc) issues.
* 主要的区别最终源于GC需求,即我们尽可能快地取消插槽,以便在生成大量
* 任务的程序中保持尽可能小的内存占用。
* The main differences ultimately stem from GC requirements that we
* null out taken slots as soon as we can, to maintain as small a
* footprint as possible even in programs generating huge numbers
* of tasks.
* 为了实现这一点,我们将CAS仲裁pop和poll (steal)从索引(“base”和“top”)转移到槽本身。
* To accomplish this, we shift the CAS arbitrating pop
* vs poll (steal) from being on the indices ("base" and "top") to
* the slots themselves.
* 因此,成功的pop和poll主要需要使用CAS把槽从非空设置为空。
* So, both a successful pop and poll
* mainly entail a CAS of a slot from non-null to null. Because
* 因为我们依赖引用的CAS操作,我们不需要在 base 或 top 上标记位。
* we rely on CASes of references, we do not need tag bits on base
* 它们是在任何基于循环数组的队列中使用的简单int(例如ArrayDeque)。
* or top. They are simple ints as used in any circular
* array-based queue (see for example ArrayDeque).
* 对索引的更新仍然必须以一种确保 top == base 意味着队列为空的方式排序,
* 否则可能会在push、pop或poll尚未完全提交时使队列显示为非空。
* Updates to the
* indices must still be ordered in a way that guarantees that top
* == base means the queue is empty, but otherwise may err on the
* side of possibly making the queue appear nonempty when a push,
* pop, or poll have not fully committed.
* 注意,这意味着poll操作(单独考虑)不是没有等待的。
* Note that this means
* that the poll operation, considered individually, is not
* 一个小偷不能成功地继续,直到另一个正在进行的小偷(或者,如果之前是空的,push)完成。
* wait-free. One thief cannot successfully continue until another
* in-progress one (or, if previously empty, a push) completes.
* 但是,总的来说,我们至少确保了概率性的非阻塞性。
* However, in the aggregate, we ensure at least probabilistic
* non-blockingness.
* 如果一个尝试窃取失败,小偷总是选择一个不同的随机目标受害者尝试下一步。
* If an attempted steal fails, a thief always
* chooses a different random victim target to try next. So, in
* 因此,为了让一个小偷继续前进,它足以完成任何正在进行的poll或任何空队列上的push。
* order for one thief to progress, it suffices for any
* in-progress poll or new push on any empty queue to
* 这就是为什么我们通常使用pollAt方法及其变体,它们只在表面的base索引上尝试一次,
* 或者考虑其他操作,而不是使用poll方法。
* complete. (This is why we normally use method pollAt and its
* variants that try once at the apparent base index, else
* consider alternative actions, rather than method poll.)
*
* 这种方法还支持一种用户模式,在这种模式中,本地任务处理采用FIFO而不是LIFO顺序,
* 只需使用poll而不是pop。
* This approach also enables support of a user mode in which local
* task processing is in FIFO, not LIFO order, simply by using
* poll rather than pop.
* 这在从不连接任务的消息传递框架中非常有用。
* This can be useful in message-passing
* frameworks in which tasks are never joined.
* 但是,这两种模式都不考虑亲和性、负载、缓存位置等,因此很少在给定的机器上提供可能的最佳性能,
* 但是通过对这些因素进行平均,可以提供良好的吞吐量。
* However neither
* mode considers affinities, loads, cache localities, etc, so
* rarely provide the best possible performance on a given
* machine, but portably provide good throughput by averaging over
* 此外,即使我们确实试图利用这些信息,我们通常也没有利用这些信息的基础。
* these factors. (Further, even if we did try to use such
* information, we do not usually have a basis for exploiting it.
* 例如,一些任务集从缓存亲和性中获益,而另一些任务集则受到缓存污染影响的损害。
* For example, some sets of tasks profit from cache affinities,
* but others are harmed by cache pollution effects.)
*
* 工作队列也以类似的方式用于提交给池的任务。我们不能将这些任务混合在用
* 于窃取工作的队列中(这将影响lifo/fifo处理)。
* WorkQueues are also used in a similar way for tasks submitted
* to the pool. We cannot mix these tasks in the same queues used
* for work-stealing (this would contaminate lifo/fifo
* processing).
* 相反,我们使用一种散列的形式,将提交队列与提交线程随机关联起来。
* Instead, we randomly associate submission queues
* with submitting threads, using a form of hashing. The
* ThreadLocalRandom探测值用作选择现有队列的散列代码,可以在与其他提交者
* 争用时随机重新定位。
* ThreadLocalRandom probe value serves as a hash code for
* choosing existing queues, and may be randomly repositioned upon
* contention with other submitters.
* 本质上,提交者就像工作者一样,除了他们被限制在执行他们提交的本地任务
* (或者在CountedCompleters的情况下,其他具有相同根任务)。
* In essence, submitters act
* like workers except that they are restricted to executing local
* tasks that they submitted (or in the case of CountedCompleters,
* others with the same root task).
* 但是,由于大多数共享/外部队列操作比内部队列操作更昂贵,而且在稳定状态下,
* 外部提交者将与工作人员争夺CPU,所以如果所有工作人员都处于活动状态,
* ForkJoinTask.join和相关方法将禁止它们重复地帮助处理任务。
* However, because most
* shared/external queue operations are more expensive than
* internal, and because, at steady state, external submitters
* will compete for CPU with workers, ForkJoinTask.join and
* related methods disable them from repeatedly helping to process
* tasks if all workers are active.
* 在共享模式下插入任务需要一个锁(主要是在调整大小的情况下进行保护),
* 但是我们只使用一个简单的自旋锁(在字段qlock中使用位),因为提交者遇到
* 繁忙的队列时会继续尝试或创建其他队列
* Insertion of tasks in shared
* mode requires a lock (mainly to protect in the case of
* resizing) but we use only a simple spinlock (using bits in
* field qlock), because submitters encountering a busy queue move
* on to try or create other queues -- they block only when
* 它们只在创建和注册新队列时阻塞。
* creating and registering new queues.
*
* 管理
* Management
* ==========
*
* 工作窃取的主要吞吐量优势源于分散控制 -- workers通常从他们自己或彼此那里接受任务。
* The main throughput advantages of work-stealing stem from
* decentralized control -- workers mostly take tasks from
* themselves or each other. We cannot negate this in the
* 我们不能在执行其他管理职责时否定这一点。
* implementation of other management responsibilities. The main
* 避免瓶颈的主要策略是将几乎所有本质上的原子控制状态打包成两个volatile变量,
* 这两个变量最常被读取(而不是写入)作为状态和一致性检查。
* tactic for avoiding bottlenecks is packing nearly all
* essentially atomic control state into two volatile variables
* that are by far most often read (not written) as status and
* consistency checks.
*
* 字段“ctl”包含64位,包含原子地添加、停用、(在事件队列上)排队、
* 退出队列,和/或重新激活workers 所需的所有信息。
* Field "ctl" contains 64 bits holding all the information needed
* to atomically decide to add, inactivate, enqueue (on an event
* queue), dequeue, and/or re-activate workers.
* 为了启用这种封装,我们将最大并行度限制为(1<<15)-1(远远超出正常的操作范围),
* 以允许ids、计counts及其negations(用于阈值设置)适合16位字段。
* To enable this
* packing, we restrict maximum parallelism to (1<<15)-1 (which is
* far in excess of normal operating range) to allow ids, counts,
* and their negations (used for thresholding) to fit into 16bit
* fields.
*
* 字段“plock”是一种带有饱和关闭位(类似于每个队列的“qlocks”)的序列锁,
* 主要保护对工作队列数组的更新,以及启用shutdown。
* Field "plock" is a form of sequence lock with a saturating
* shutdown bit (similarly for per-queue "qlocks"), mainly
* protecting updates to the workQueues array, as well as to
* enable shutdown.
* 当作为锁使用时,它通常只被短暂地持有,因此在短暂的旋转之后几乎总是可用,
* 但是我们在需要时使用基于监视器的备份策略来阻塞。
* When used as a lock, it is normally only very
* briefly held, so is nearly always available after at most a
* brief spin, but we use a monitor-based backup strategy to
* block when needed.
*
* 记录工作队列。工作队列记录在“workQueues”数组中,该数组在首次使用时创建,
* 并在必要时扩展。
* Recording WorkQueues. WorkQueues are recorded in the
* "workQueues" array that is created upon first use and expanded
* if necessary.
* 在记录新workers和移除终止worker时,对数组的更新通过一个锁相互保护,
* 但是数组是可并发读的,并且可以直接访问。
* Updates to the array while recording new workers
* and unrecording terminated ones are protected from each other
* by a lock but the array is otherwise concurrently readable, and
* accessed directly.
* 为了简化基于索引的操作,数组大小总是2的幂,并且所有的读取器必须容忍空槽。
* To simplify index-based operations, the
* array size is always a power of two, and all readers must
* tolerate null slots.
* 工作队列的索引是奇数。共享(提交)队列的索引是偶数的,最多64个槽,
* 即使数组需要扩展以添加更多的workers,也会限制增长。
* Worker queues are at odd indices. Shared
* (submission) queues are at even indices, up to a maximum of 64
* slots, to limit growth even if array needs to expand to add
* 以这种方式将它们组合在一起可以简化和加速任务扫描。
* more workers. Grouping them together in this way simplifies and
* speeds up task scanning.
*
* 所有worker线程的创建都是按需的,由任务提交、终止workers的替换,
* 和/或阻塞工作的补偿触发。
* All worker thread creation is on-demand, triggered by task
* submissions, replacement of terminated workers, and/or
* compensation for blocked workers.
* 但是,所有其他支持代码都被设置为与其他策略一起工作。
* However, all other support
* code is set up to work with other policies.
* 为了确保我们不持有worker引用(这会阻止GC),所有对工作队列的访问都是
* 通过对工作队列数组的索引进行的(这是一些混乱代码结构的一个来源)。
* To ensure that we
* do not hold on to worker references that would prevent GC, ALL
* accesses to workQueues are via indices into the workQueues
* array (which is one source of some of the messy code
* constructions here).
* 实际上,workQueues数组是一种弱引用机制。因此,例如,ctl的等待队列字段存储索引,
* 而不是引用。
* In essence, the workQueues array serves as
* a weak reference mechanism. Thus for example the wait queue
* field of ctl stores indices, not references.
* 对相关方法(例如signalWork)中的工作队列的访问必须同时进行索引检查和空检查IDs。
* Access to the
* workQueues in associated methods (for example signalWork) must
* both index-check and null-check the IDs.
* 所有这些访问都通过提前返回来忽略坏的IDs,因为这只与终止相关,
* 在这种情况下,放弃是可以的。
* All such accesses
* ignore bad IDs by returning out early from what they are doing,
* since this can only be associated with termination, in which
* case it is OK to give up.
* 工作队列数组的所有用法还将检查它是否为非空(即使以前是非空)。
* All uses of the workQueues array
* also check that it is non-null (even if previously
* 这允许在终止期间为空,这是目前不需要的,但仍然是基于资源撤销的shutdown方案的一个选项。
* non-null). This allows nulling during termination, which is
* currently not necessary, but remains an option for
* resource-revocation-based shutdown schemes. It also helps
* 它还有助于减少异常陷阱代码的JIT发布,这往往会使某些方法中的控制流变得不必要地复杂。
* reduce JIT issuance of uncommon-trap code, which tends to
* unnecessarily complicate control flow in some methods.
*
* 事件队列。与HPC的工作窃取框架不同,我们不能让workers在无法立即找到任务
* 的情况下无限期地扫描任务,并且我们不能启动/恢复workers,除非出现可用的任务。
* Event Queuing. Unlike HPC work-stealing frameworks, we cannot
* let workers spin indefinitely scanning for tasks when none can
* be found immediately, and we cannot start/resume workers unless
* there appear to be tasks available. On the other hand, we must
* 另一方面,在提交或生成新任务时,我们必须快速地促使它们采取行动。
* quickly prod them into action when new tasks are submitted or
* 在许多情况下,激活worker的启动时间是总体性能的主要限制因素
* (在程序启动时,JIT编译和分配会加剧这种限制)。
* generated. In many usages, ramp-up time to activate workers is
* the main limiting factor in overall performance (this is
* compounded at program start-up by JIT compilation and
* 所以我们尽可能地简化它。
* allocation). So we try to streamline this as much as possible.
* 当workers找不到工作时,我们将他们放入事件等待队列中,然后让他们park/unpark。
* We park/unpark workers after placing in an event wait queue
* when they cannot find work.
* 这个“queue”实际上是一个简单的Treiber堆栈,以ctl的“id”字段为首,加上一个15位的
* 计数器值(它反映了一个worker被灭活的次数)来避免ABA影响(我们只需要像worker线程
* 一样多的版本号)。
* This "queue" is actually a simple
* Treiber stack, headed by the "id" field of ctl, plus a 15bit
* counter value (that reflects the number of times a worker has
* been inactivated) to avoid ABA effects (we need only as many
* version numbers as worker threads). Successors are held in
* Successors被WorkQueue.nextWait 字段保存。
* field WorkQueue.nextWait.
* Queuing处理几个固有的竞争,主要是一个任务生产线程可能看不到(和signalling)
* 另一个线程放弃寻找工作,但还没有进入等待队列。
* Queuing deals with several intrinsic
* races, mainly that a task-producing thread can miss seeing (and
* signalling) another thread that gave up looking for work but
* has not yet entered the wait queue.
* 我们通过在新等待工作者被添加到等待队列之前和之后都需要对所有workers进行
* 全面扫描(通过反复调用方法scan())来解决这个问题。
* We solve this by requiring
* a full sweep of all workers (via repeated calls to method
* scan()) both before and after a newly waiting worker is added
* to the wait queue.
* 因为排队的workers实际上可能在重新扫描而不是等待,所以我们设置并清除工作队列的“parker”字段,
* 以减少不必要的取消unpark的调用。
* Because enqueued workers may actually be
* rescanning rather than waiting, we set and clear the "parker"
* field of WorkQueues to reduce unnecessary calls to unpark.
* 这需要再次检查,以避免错过信号。
* (This requires a secondary recheck to avoid missed signals.)
* 请注意:关于Thread.interrupts在parking和其他阻塞周围的不同寻常的约定:
* Note the unusual conventions about Thread.interrupts
* surrounding parking and other blocking:
* 因为中断只用于警示线程检查终止,这是在阻塞时无论如何都进行检查,我们在任何
* 调用park之前清除状态(使用Thread.interrupted),因此park并不会立即返回
* 由于状态被设置通过一些其他不相关的用户代码中调用中断。
* Because interrupts are
* used solely to alert threads to check termination, which is
* checked anyway upon blocking, we clear status (using
* Thread.interrupted) before any call to park, so that park does
* not immediately return due to status being set via some other
* unrelated call to interrupt in user code.
*
* 发信号。 只有当出现至少有一个任务他们能够找到并执行时,我们才会
* 创建或唤醒workers。
* Signalling. We create or wake up workers only when there
* appears to be at least one task they might be able to find and
* execute.
* 当一个提交被添加,或者另一个worker将一个任务添加到一个少于两个任务
* 的队列中时,它们就通知等待的worker(或者如果少于给定的并行度级别,
* 就触发创建新任务——signalWork)。
* When a submission is added or another worker adds a
* task to a queue that has fewer than two tasks, they signal
* waiting workers (or trigger creation of new ones if fewer than
* the given parallelism level -- signalWork). These primary
* 当其他线程从队列中删除一个任务并注意到队列中还有其他任务时,
* 这些主信号将得到其他线程的支持。
* signals are buttressed by others whenever other threads remove
* a task from a queue and notice that there are other tasks there
* 因此,总体而言,池将会被过度通知。
* as well. So in general, pools will be over-signalled. On most
* 在大多数平台上,信号(unpark)开销时间非常长,而且从向线程发出信号到
* 它实际取得进展之间的时间间隔非常长,因此值得尽可能多地消除关键路径上的这些延迟。
* platforms, signalling (unpark) overhead time is noticeably
* long, and the time between signalling a thread and it actually
* making progress can be very noticeably long, so it is worth
* offloading these delays from critical paths as much as
* possible.
* 此外,只要workers看到ctl的状态发生变化,他们就会保持活跃,逐渐地向下旋转。
* 类似的稳定性感知技术也被用于阻塞之前的awaitJoin和helpComplete。
* Additionally, workers spin-down gradually, by staying
* alive so long as they see the ctl state changing. Similar
* 类似的稳定性感知技术也被用于阻塞之前的awaitJoin和helpComplete。
* stability-sensing techniques are also used before blocking in
* awaitJoin and helpComplete.
*
* 削减workers. 在一段时间不使用后释放资源,当池静止时worker开始等待,并且如果
* 池在给定的时期保持静止,worker将会超时并且终止 -- 如果线程数大于并行度,则周期较短,
* 如果线程数减少,则周期较长。
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
* time out and terminate if the pool has remained quiescent for a
* given period -- a short period if there are more threads than
* parallelism, longer as the number of threads decreases. This
* 这将慢慢传播,最终在一段时间的不使用后终止所有的workers。
* will slowly propagate, eventually terminating all workers after
* periods of non-use.
*
* Shutdown 和 Termination. 调用shutdownNow会原子地设置plock位,然后(非原子地)设置
* 每个worker的qlock状态,取消所有未处理的任务,并唤醒所有等待的worker。
* Shutdown and Termination. A call to shutdownNow atomically sets
* a plock bit and then (non-atomically) sets each worker's
* qlock status, cancels all unprocessed tasks, and wakes up
* all waiting workers.
* 检测是否应该在非突然shutdown()调用后开始终止需要更多的工作并记帐。
* Detecting whether termination should
* commence after a non-abrupt shutdown() call requires more work
* 我们需要对平静达成共识。(比如,没有更多的工作)。
* and bookkeeping. We need consensus about quiescence (i.e., that
* there is no more work).
* 活动计数提供了一个主要的指示,但非突然shutdown仍然需要重新检查扫描的任何
* 不活动的但没有排队的workers。
* The active count provides a primary
* indication but non-abrupt shutdown still requires a rechecking
* scan for any workers that are inactive but not queued.
*
* 加入任务
* Joining Tasks
* =============
*
* 当一个worker正在等待加入被另一个worker窃取(或总是持有)的任务时,可以采取以下任何一种操作。
* Any of several actions may be taken when one worker is waiting
* to join a task stolen (or always held) by another. Because we
* 因为我们将许多任务多路复用到一个workers池中,所以我们不能让它们阻塞(如Thread.join)。
* are multiplexing many tasks on to a pool of workers, we can't
* just let them block (as in Thread.join).
* 我们也不能只是用另一个重新分配joiner的运行时堆栈,然后替换它,这将是一种“延续”的形式,
* 即使可能也不一定是一个好主意,因为我们有时需要一个未阻塞的任务和它的延续来进行。
* We also cannot just
* reassign the joiner's run-time stack with another and replace
* it later, which would be a form of "continuation", that even if
* possible is not necessarily a good idea since we sometimes need
* both an unblocked task and its continuation to progress.
* 相反,我们结合了两种策略:
* Instead we combine two tactics:
*
* 帮助: 安排连接程序执行一些任务,这些任务在未发生偷取时将运行。
* Helping: Arranging for the joiner to execute some task that it
* would be running if the steal had not occurred.
*
* 补偿: 除非已经有足够的活动线程,否则tryCompensate()方法可以
* 创建或重新激活一个备用线程,以补偿阻塞的连接,直到它们解除阻塞为止。
* Compensating: Unless there are already enough live threads,
* method tryCompensate() may create or re-activate a spare
* thread to compensate for blocked joiners until they unblock.
*
* 第三种形式(在tryRemoveAndExec中实现)相当于帮助一个假设的补偿器:
* A third form (implemented in tryRemoveAndExec) amounts to
* helping a hypothetical compensator:
* 如果我们可以很容易地判断出补偿器的一个可能动作是偷取并执行正在连接的任务,
* 那么连接线程就可以直接这样做,而不需要补偿线程(尽管会以较大的运行时堆栈为代价,
* 但是权衡下通常是值得的)。
* If we can readily tell that
* a possible action of a compensator is to steal and execute the
* task being joined, the joining thread can do so directly,
* without the need for a compensation thread (although at the
* expense of larger run-time stacks, but the tradeoff is
* typically worthwhile).
*
* ManagedBlocker扩展API不能使用帮助,因此仅依赖于方法awaitBlocker中的补偿。
* The ManagedBlocker extension API can't use helping so relies
* only on compensation in method awaitBlocker.
*
* tryHelpStealer中的算法需要一种“线性”帮助:
* The algorithm in tryHelpStealer entails a form of "linear"
* 每个worker记录(在currentSteal字段中)它从其他某个worker那里窃取的最近的任务。
* helping: Each worker records (in field currentSteal) the most
* recent task it stole from some other worker. Plus, it records
* 另外,它记录(在currentJoin字段中)当前正在积极加入的任务。
* (in field currentJoin) the task it is currently actively
* joining.
* tryHelpStealer方法使用这些标记试图找到一个worker来帮助(比如:从它那里偷回一个任务并执行),
* 从而加速主动加入任务的完成。
* Method tryHelpStealer uses these markers to try to
* find a worker to help (i.e., steal back a task from and execute
* it) that could hasten completion of the actively joined task.
* 本质上,如果要加入的任务未被盗用,则joiner执行在它自己本地deque的任务。
* In essence, the joiner executes a task that would be on its own
* local deque had the to-be-joined task not been stolen. This may
* 这可能被视为Wagner & Calder在1993年出版的《跨越:一种实现高效期货的可移植技术》
* (SIGPLAN notice, http://portal.acm.org/cit.cfm? id=155354)一书中所采用的方法的保守变体。
* be seen as a conservative variant of the approach in Wagner &
* Calder "Leapfrogging: a portable technique for implementing
* efficient futures" SIGPLAN Notices, 1993
* (http://portal.acm.org/citation.cfm?id=155354).
* 它的不同之处在于:(1)我们只在workers之间维护对偷窃的依赖链接,而不是使用每个任务的记账。
* It differs in
* that: (1) We only maintain dependency links across workers upon
* steals, rather than use per-task bookkeeping.
* 这有时需要对workQueues数组进行线性扫描来定位偷窃者,但通常不需要,
* 因为偷窃者会留下在哪里定位他们的提示(这可能会过时/错误)。
* This sometimes
* requires a linear scan of workQueues array to locate stealers,
* but often doesn't because stealers leave hints (that may become
* stale/wrong) of where to locate them.
* 这只是一个提示,因为一个worker可能有多个窃取,而提示只记录其中一个(通常是最新的)。
* It is only a hint
* because a worker might have had multiple steals and the hint
* records only one of them (usually the most current). Hinting
* 提示将成本隔离到需要它的时候,而不是增加每个任务的开销。
* isolates cost to when it is needed, rather than adding to
* (2)它是“浅”的,忽略嵌套和潜在的循环互偷。
* per-task overhead. (2) It is "shallow", ignoring nesting and
* potentially cyclic mutual steals.
* 这是有意的:字段currentJoin只在主动连接时更新,这意味着我们在长生命期任务、
* GC停止等期间会错过链中的链接(这是可以的,因为在这种情况下阻塞通常是一个好主意)。
* (3) It is intentionally
* racy: field currentJoin is updated only while actively joining,
* which means that we miss links in the chain during long-lived
* tasks, GC stalls etc (which is OK since blocking in such cases
* (4) 我们限制了试图找工作的数量(请参阅MAX_HELP),并退回到挂起该工作者,
* 如果需要的话,用另一个工作者替换它。
* is usually a good idea). (4) We bound the number of attempts
* to find work (see MAX_HELP) and fall back to suspending the
* worker and if necessary replacing it with another.
*
* CountedCompleters的帮助操作要简单得多:方法helpComplete可以获取和执行任何
* 正在等待的具有相同根的任务。
* Helping actions for CountedCompleters are much simpler: Method
* helpComplete can take and execute any task with the same root
* as the task being waited on.
* 然而,这仍然需要遍历一些完成器链,因此,与使用未显式连接的CountedCompleters相比,效率更低。
* However, this still entails some
* traversal of completer chains, so is less efficient than using
* CountedCompleters without explicit joins.
*
* 在任何给定的时间都不可能准确地保持线程的目标并行度。
* It is impossible to keep exactly the target parallelism number
* of threads running at any given time. Determining the
* 确定保守安全的帮助目标的存在,已经创建的备件的可用性,以及创建新备件
* 的明显需要都是有活力的,所以我们依赖于每个的多次重试。
* existence of conservatively safe helping targets, the
* availability of already-created spares, and the apparent need
* to create new spares are all racy, so we rely on multiple
* retries of each.
* 在明显缺乏帮助机会的情况下,补偿很难控制在jvm上,GC和其他活动会阻止任务的进展,
* 从而导致许多其他依赖任务的停滞,而我们无法确定它们是否需要补偿。
* Compensation in the apparent absence of
* helping opportunities is challenging to control on JVMs, where
* GC and other activities can stall progress of tasks that in
* turn stall out many other dependent tasks, without us being
* able to determine whether they will ever require compensation.
* 即使在存在比核心更多的线程的情况下,窃取工作几乎不会导致性能下降,但是在这种情况下,
* 积极地添加新线程会带来不必要的正反馈控制循环的风险,其中更多的线程会导致更多的相关停顿
* (以及畅通的线程的延迟进度) 到我们知道它们可用的地步)导致更多情况需要更多线程,依此类推。
* Even though work-stealing otherwise encounters little
* degradation in the presence of more threads than cores,
* aggressively adding new threads in such cases entails risk of
* unwanted positive feedback control loops in which more threads
* cause more dependent stalls (as well as delayed progress of
* unblocked threads to the point that we know they are available)
* leading to more situations requiring more threads, and so
* on.
* 这方面的控制可以被看作是一个(分析上棘手的)游戏,对手可能会选择最坏的(对我们来说)
* 活动线程在任何时候停止。
* This aspect of control can be seen as an (analytically
* intractable) game with an opponent that may choose the worst
* (for us) active thread to stall at any time. We take several
* 我们采取几种预防措施来限制损失(从而限制收益),主要是在tryCompensate和awaitJoin方法中。
* precautions to bound losses (and thus bound gains), mainly in
* methods tryCompensate and awaitJoin.
*
* 公共池
* Common Pool
* ===========
*
* 静态公共池在静态初始化之后始终存在。
* The static common pool always exists after static
* initialization.
* 由于不需要使用它(或任何其他创建的池),因此我们将初始构造开销和占用空间最小化到大
* 约十二个字段的设置,而没有嵌套分配。
* Since it (or any other created pool) need
* never be used, we minimize initial construction overhead and
* footprint to the setup of about a dozen fields, with no nested
* 大多数bootstrapping发生在方法fullExternalPush第一次提交到池的时候。
* allocation. Most bootstrapping occurs within method
* fullExternalPush during the first submission to the pool.
*
* 当外部线程提交到公共池时,它们可以执行子任务处理(参见externalHelpJoin和相关方法)。
* When external threads submit to the common pool, they can
* perform subtask processing (see externalHelpJoin and related
* methods).
* 这种调用方帮助策略使得将公共池并行度级别设置为比可用内核总数少一个(或多个),
* 甚至为纯调用方运行设置为零是明智的。
* This caller-helps policy makes it sensible to set
* common pool parallelism level to one (or more) less than the
* total number of available cores, or even zero for pure
* caller-runs.
* 我们不需要记录外部提交是否指向公共池——如果不是,externalHelpJoin将快速返回
* (最多帮助通知一些公共池workers)。
* We do not need to record whether external
* submissions are to the common pool -- if not, externalHelpJoin
* returns quickly (at the most helping to signal some common pool
* workers).
* 否则,这些提交者将被阻塞,等待完成,因此,在不适用的情况下,额外的工作
* (通过大量分散的任务状态检查)相当于在ForkJoinTask.join中阻塞前的一种奇怪的有限自旋等待。
* These submitters would otherwise be blocked waiting
* for completion, so the extra effort (with liberally sprinkled
* task status checks) in inapplicable cases amounts to an odd
* form of limited spin-wait before blocking in ForkJoinTask.join.
*
* As a more appropriate default in managed environments, unless
* overridden by system properties, we use workers of subclass
* InnocuousForkJoinWorkerThread when there is a SecurityManager
* present. These workers have no permissions set, do not belong
* to any user-defined ThreadGroup, and erase all ThreadLocals
* after executing any top-level task (see WorkQueue.runTask). The
* associated mechanics (mainly in ForkJoinWorkerThread) may be
* JVM-dependent and must access particular Thread class fields to
* achieve this effect.
*
* Style notes
* ===========
*
* There is a lot of representation-level coupling among classes
* ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
* fields of WorkQueue maintain data structures managed by
* ForkJoinPool, so are directly accessed. There is little point
* trying to reduce this, since any associated future changes in
* representations will need to be accompanied by algorithmic
* changes anyway. Several methods intrinsically sprawl because
* they must accumulate sets of consistent reads of volatiles held
* in local variables. Methods signalWork() and scan() are the
* main bottlenecks, so are especially heavily
* micro-optimized/mangled. There are lots of inline assignments
* (of form "while ((local = field) != 0)") which are usually the
* simplest way to ensure the required read orderings (which are
* sometimes critical). This leads to a "C"-like style of listing
* declarations of these locals at the heads of methods or blocks.
* There are several occurrences of the unusual "do {} while
* (!cas...)" which is the simplest way to force an update of a
* CAS'ed variable. There are also other coding oddities (including
* several unnecessary-looking hoisted null checks) that help
* some methods perform reasonably even when interpreted (not
* compiled).
*
* The order of declarations in this file is:
* (1) Static utility functions
* (2) Nested (static) classes
* (3) Static fields
* (4) Fields, along with constants used when unpacking some of them
* (5) Internal control methods
* (6) Callbacks and other support for ForkJoinTask methods
* (7) Exported methods
* (8) Static block initializing statics in minimally dependent order
*/
// Static utilities
/**
* If there is a security manager, makes sure caller has
* permission to modify threads.
*/
private static void checkPermission() {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(modifyThreadPermission);
}
// Nested classes
/**
* Factory for creating new {@link ForkJoinWorkerThread}s.
* A {@code ForkJoinWorkerThreadFactory} must be defined and used
* for {@code ForkJoinWorkerThread} subclasses that extend base
* functionality or initialize threads with different contexts.
*/
public static interface ForkJoinWorkerThreadFactory {
/**
* Returns a new worker thread operating in the given pool.
*
* @param pool the pool this thread works in
* @return the new worker thread
* @throws NullPointerException if the pool is null
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
/**
* 默认ForkJoinWorkerThreadFactory实现; 创建一个新的ForkJoinWorkerThread。
* Default ForkJoinWorkerThreadFactory implementation; creates a
* new ForkJoinWorkerThread.
*/
static final class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
/**
* 类,如果从WorkQueue.tryRemoveAndExec的内部队列插槽中删除了用于替换本地连接目标的人工任务。
* Class for artificial tasks that are used to replace the target
* of local joins if they are removed from an interior queue slot
* in WorkQueue.tryRemoveAndExec. We don't need the proxy to
* 我们不需要代理来实际做任何事情,除了有一个唯一的身份。
* actually do anything beyond having a unique identity.
*/
static final class EmptyTask extends ForkJoinTask<Void> {
private static final long serialVersionUID = -7721805057305804111L;
EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
public final Void getRawResult() { return null; }
public final void setRawResult(Void x) {}
public final boolean exec() { return true; }
}
/**
* 支持工作窃取和外部任务提交的队列。主要原理和算法见上面。
* Queues supporting work-stealing as well as external task
* submission. See above for main rationale and algorithms.
* 实现严重依赖于“Unsafe”的内部特性和“volatile”的选择性使用:
* Implementation relies heavily on "Unsafe" intrinsics
* and selective use of "volatile":
*
* 字段“base”是最小有效队列槽的索引(mod array.length),如果非空,它总是下一个要偷(poll)的位置。
* Field "base" is the index (mod array.length) of the least valid
* queue slot, which is always the next position to steal (poll)
* from if nonempty. Reads and writes require volatile orderings
* 读和写需要volatile顺序,但不需要CAS,因为只有在CAS更新slot后才会执行更新操作。
* but not CAS, because updates are only performed after slot
* CASes.
*
* 字段“top”是下一个要push或pop的队列槽的索引(mod array.length)。
* Field "top" is the index (mod array.length) of the next queue
* slot to push to or pop from.
* 它的写只被所有者线程push,或者在持有锁下通过外部/共享push,
* 其他线程在读取(volatile)base之后才访问它。
* It is written only by owner thread
* for push, or under lock for external/shared push, and accessed
* by other threads only after reading (volatile) base. Both top
* “top”和“base”都可以在溢出时包围,但是(top - base)(或者更常见的-(base - top)
* 强制在top之前 volatile读base)仍然可以估计大小。
* and base are allowed to wrap around on overflow, but (top -
* base) (or more commonly -(base - top) to force volatile read of
* base before top) still estimates size.
* 锁(“qlock”)在终止时强制为-1,导致所有进一步的锁尝试失败。(注意:终止状态不需要CAS,因为在池关闭时,
* 所有共享队列将停止使用。)
* The lock ("qlock") is
* forced to -1 on termination, causing all further lock attempts
* to fail. (Note: we don't need CAS for termination state because
* upon pool shutdown, all shared-queues will stop being used
* anyway.)
* 几乎所有的锁体都是这样设置的:锁体中的异常是“不可能的”(模块化的JVM错误,无论如何都会导致失败)。
* Nearly all lock bodies are set up so that exceptions
* within lock bodies are "impossible" (modulo JVM errors that
* would cause failure anyway.)
*
* 数组槽的写和读使用 Unsafe提供的 volatiles/atomics 竞争。
* The array slots are read and written using the emulation of
* volatiles/atomics provided by Unsafe. Insertions must in
* 插入通常必须使用putOrderedObject作为释放存储的一种形式,以确保对任务
* 对象的所有写操作在队列中发布之前都是有序的。
* general use putOrderedObject as a form of releasing store to
* ensure that all writes to the task object are ordered before
* its publication in the queue.
* 所以的移除都需要使用CAS设置为null。这个数组的长度总是2的幂。
* All removals entail a CAS to
* null. The array is always a power of two. To ensure safety of
* 为了确保Unsafe的数组操作的安全性,所有访问都通过2的幂掩码执行显式空检查和隐式界限检查。
* Unsafe array operations, all accesses perform explicit null
* checks and implicit bounds checks via power-of-two masking.
*
* 除了基本的队列支持之外,该类还包含在其他地方描述的字段来控制执行。
* In addition to basic queuing support, this class contains
* fields described elsewhere to control execution. It turns out
* 事实证明,将它们包含在这个类而不是单独的类中,在内存方面会更好。
* to work better memory-layout-wise to include them in this class
* rather than a separate class.
*
* 大多数平台上的性能对工作队列及其数组的实例的位置非常敏感——我们绝对不希望
* 多个工作队列实例或多个队列数组共享缓存行。
* Performance on most platforms is very sensitive to placement of
* instances of both WorkQueues and their arrays -- we absolutely
* do not want multiple WorkQueue instances or multiple queue
* arrays sharing cache lines. (It would be best for queue objects
* (最好让队列对象和它们的数组共享,但是没有任何东西可以帮助安排共享)。
* and their arrays to share, but there is nothing available to
* help arrange that). The @Contended annotation alerts JVMs to
* Contended 注释警告jvm尝试隔离实例。
* try to keep instances apart.
*/
@sun.misc.Contended
static final class WorkQueue {
/**
* 在初始化时窃取工作队列数组的容量。
* Capacity of work-stealing queue array upon initialization.
* 必须是二的幂;至少4个,但是应该更大一些,以减少或消除队列之间的缓存行共享。
* Must be a power of two; at least 4, but should be larger to
* reduce or eliminate cacheline sharing among queues.
* 目前,它要大得多,作为一个部分解决方案,因为jvm经常将数组放置在共享GC记帐
* (特别是cardmarks)的位置,这样每次写访问都会遇到严重的内存争用。
* Currently, it is much larger, as a partial workaround for
* the fact that JVMs often place arrays in locations that
* share GC bookkeeping (especially cardmarks) such that
* per-write accesses encounter serious memory contention.
*/
// 8192
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
/**
* 队列数组的最大大小。必须是小于或等于1的2的幂<<(31 -数组元素的宽度),
* 以确保缺少对索引计算的总结,但是定义的值要比这个值小一点,以帮助用户在
* 系统饱和之前捕获失控的程序。
* Maximum size for queue arrays. Must be a power of two less
* than or equal to 1 << (31 - width of array entry) to ensure
* lack of wraparound of index calculations, but defined to a
* value a bit less than this to help users trap runaway
* programs before saturating systems.
*/
// 67108864
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// registerWorker() 方法初始化时等于 poolIndex 的值, 而poolIndex是short类型的
// 因此eventCount的低16位的值为:poolIndex
// 小于0 说明线程不是活动状态 (或者准备进入等待状态)
volatile int eventCount; // encoded inactivation count; < 0 if inactive
// nextWait虽然保存的是ctl低32位的值,但是最高位是没有用的,最高位是池的状态
int nextWait; // encoded record of next event waiter
// 窃取数量
int nsteals; // number of steals
// 1、构造方法会把 hint初始化为: this.hint = seed (threadLocalRandomProbe)
// 用于提示该workQueue在 workQueues中的索引位置
// 2、join()方法调用tryHelpStealer()方法时,如果找到等待任务的偷窃者,记录偷窃者的索引位置
int hint; // steal index hint 偷窃者索引提示
short poolIndex; // index of this queue in pool
// 模式: 0: lifo, > 0: fifo, < 0: shared
// shared -> 共享队列
final short mode; // 0: lifo, > 0: fifo, < 0: shared
volatile int qlock; // 1: locked, -1: terminate; else 0
// base 使用volatile 修饰,top没有 (下一个poll 操作的索引位置)
volatile int base; // index of next slot for poll
// 下一个push 操作的索引位置
int top; // index of next slot for push
// WorkQueue创建时没有进行初始化
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
// 这个工作队列所属的线程,如果是共享的则为null (可用于判断队列拥有者是否处于阻塞/等待状态)
final ForkJoinWorkerThread owner; // owning thread or null if shared
// 在park期间 == owner,即此 workQueue的拥有者; 其他情况为null
volatile Thread parker; // == owner during call to park; else null
// awaitJoin方法等待完成的任务
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
// 当前正在执行的非本地任务
ForkJoinTask<?> currentSteal; // current non-local task being executed
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
int seed) {
this.pool = pool;
this.owner = owner;
this.mode = (short)mode;
// seed 传入的是 threadLocalRandomProbe
this.hint = seed; // store initial seed for runWorker
// 将索引放在数组的中心(尚未分配)
// INITIAL_QUEUE_CAPACITY = 1 << 13
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
/**
* 返回队列中任务的大概数量
* Returns the approximate number of tasks in the queue.
*/
final int queueSize() {
// 非工作队列拥有者调用必须先读 base (因为base是 volatile修饰的,而 top 不是)
int n = base - top; // non-owner callers must read base first
// 忽略瞬态负值 (忽略base > 0 todo 什么场景)
return (n >= 0) ? 0 : -n; // ignore transient negative
}
/**
* 通过检查接近空队列是否至少有一个无人认领的任务,可以比queueSize更准确地估计该队列是否有任务。
* Provides a more accurate estimate of whether this queue has
* any tasks than does queueSize, by checking whether a
* near-empty queue has at least one unclaimed task.
*/
final boolean isEmpty() {
ForkJoinTask<?>[] a; int m, s;
// 先读base, 再读top
int n = base - (s = top);
return (n >= 0 || // n >= 0 说明没有任务了,返回 empty
(n == -1 && // 当 n = -1时,说明队列中可能还有一个任务,但可能这个任务已经被
((a = array) == null || // 取走了,因此进一步判断这个任务是否还存在
(m = a.length - 1) < 0 ||
U.getObject
(a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
}
/**
* 添加一个任务。仅由非共享队列中的所有者调用。
* Pushes a task. Call only by owner in unshared queues. (The
* (共享队列版本嵌入到externalPush方法中。)
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
* @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int s = top, n;
// 如果队列被移除,则忽略
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1;
// s = top,把任务放入top索引位置。只有线程拥有者才会调用这个方法,而且任务
// 取出top不会被更新,因此不会有线程安全问题
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
// top + 1。如果队列中任务的个数小于等于2,那么唤醒或者添加一个worker去执行任务,
// 否则池中活动的线程数量可能不够 (保证线程池的活跃度)
if ((n = (top = s + 1) - base) <= 2)
(p = pool).signalWork(p.workQueues, this);
else if (n >= m)
// 如果队列中任务的数量达到了 a.length - 1,那么预先进行扩容
growArray();
}
}
/**
* 初始化或加倍数组的容量。由所有者或持有锁调用。调整大小时,base可以移动,但top不行。
* (其他工作线程窃取任务base就变化了,但是top只能由队列拥有者/共享队列获取锁后才能改变,
* 队列拥有者在执行扩容,而共享队列扩容前需要先获取锁,所以扩容时top不会改变)
* Initializes or doubles the capacity of array. Call either
* by owner or with lock held -- it is OK for base, but not
* top, to move while resizings are in progress.
*/
final ForkJoinTask<?>[] growArray() {
ForkJoinTask<?>[] oldA = array;
// 如果数组不为null,那么容量扩大一倍,否则使用默认容量:INITIAL_QUEUE_CAPACITY = 1 << 13
int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
// 若容量超出MAXIMUM_QUEUE_CAPACITY = 1 << 26,抛出异常
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
int oldMask, t, b;
// 使用新的容量创建数组,并 赋值给array
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
// (oldMask = oldA.length - 1) >= 0 说明原来的数组不是空数组
// (t = top) - (b = base) > 0 说明原来的数组中有元素,需要复制元素
if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
(t = top) - (b = base) > 0) {
int mask = size - 1;
// 扩容需要一个一个地复制数据,使用CAS获取数据,不能使用数组复制的方式。
// (使用数组复制方式会导致数据复制过去后,其他线程还可以去原来的数组获取数据,
// 导致任务重复执行)
do {
ForkJoinTask<?> x;
// 计算索引 (b & oldMask) 的地址偏移量
int oldj = ((b & oldMask) << ASHIFT) + ABASE;
// 计算索引 (b & mask) 的地址偏移量
int j = ((b & mask) << ASHIFT) + ABASE;
// 具有volatile读的内存语义
x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
if (x != null &&
U.compareAndSwapObject(oldA, oldj, x, null)) // 清除旧数组的元素
U.putObjectVolatile(a, j, x); // 立即可见
} while (++b != t); // ++b != t 说明还有元素
}
// 返回新的数组
return a;
}
/**
* 按后进先出(LIFO)顺序获取下一个任务(如果有的话)
* Takes next task, if one exists, in LIFO order. Call only
* 仅由非共享队列中的所有者调用。
* by owner in unshared queues.
*/
// LIFO: 后进先出 (仅由队列所有者线程调用这个方法)
final ForkJoinTask<?> pop() {
ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
// 判断队列不为null,且长度大于0
if ((a = array) != null && (m = a.length - 1) >= 0) {
// (s = top - 1) - base >= 0 说明队列中有元素
for (int s; (s = top - 1) - base >= 0;) {
// 计算最后一个元素的地址偏移量
long j = ((m & s) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
// 因为其他工作线程只会从base位置获取任务,如果最后一个任务为null,
// 说明最后一个任务也被其他线程窃取了,队列为空了,所以不用重试,返回null
break;
// 使用CAS获取任务
if (U.compareAndSwapObject(a, j, t, null)) {
// top = top - 1
top = s;
return t;
}
}
}
return null;
}
/**
* 如果b是队列的base,并且可以无争用地认领任务,则按FIFO顺序获取任务。
* Takes a task in FIFO order if b is base of queue and a task
* can be claimed without contention. Specialized versions
* 专门的版本出现在ForkJoinPool方法scan和tryHelpStealer。
* appear in ForkJoinPool methods scan and tryHelpStealer.
*/
final ForkJoinTask<?> pollAt(int b) {
ForkJoinTask<?> t; ForkJoinTask<?>[] a;
if ((a = array) != null) {
// 计算索引b的地址偏移量
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
// b 必须等于 base才可以获取任务
if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
base == b && U.compareAndSwapObject(a, j, t, null)) {
// base ++
U.putOrderedInt(this, QBASE, b + 1);
return t;
}
}
return null;
}
/**
* 按FIFO(先进先出) 顺序获取下一个任务(如果存在的话)。
* Takes next task, if one exists, in FIFO order.
*/
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
// 判断队列中有元素
while ((b = base) - top < 0 && (a = array) != null) {
// 计算base索引位置的地址偏移量
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 先读取任务,再通过CAS获取任务
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
if (t != null) {
// 使用CAS获取任务
if (U.compareAndSwapObject(a, j, t, null)) {
// base ++
U.putOrderedInt(this, QBASE, b + 1);
// 返回任务
return t;
}
// 若任务获取失败重试,直到队列为空
}
// ---- 任务已被其它线程获取了 ----
// base == b说明 base还没进行更新,此时如果队列中还有任务,那么我们放弃线程的执行
// 权,等待base更新
else if (base == b) {
// b + 1 = top,说明队列中已经没有任务了(base位置的任务已被其它线程取走了)
if (b + 1 == top)
break;
// 放弃线程执行权,等待 base更新
Thread.yield(); // wait for lagging update (very rare) 等待滞后更新(非常罕见)
}
}
return null;
}
/**
* 按模式指定的顺序获取下一个任务(如果存在)。会从队列中删除这个任务
* Takes next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> nextLocalTask() {
// 0: lifo, > 0: fifo, < 0: shared
return mode == 0 ? pop() : poll();
}
/**
* 如果存在的下一个任务,则按模式指定的顺序返回该任务。 (不会删除)
* Returns next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> peek() {
ForkJoinTask<?>[] a = array; int m;
// 判断队列是否为空
if (a == null || (m = a.length - 1) < 0)
return null;
// 0: lifo, > 0: fifo, < 0: shared
// 0:后进先出, 非0 先进先出
int i = mode == 0 ? top - 1 : base;
// 计算索引i的地址偏移量
int j = ((i & m) << ASHIFT) + ABASE;
return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
}
/**
* 仅当给定任务位于当前top时,才弹出该任务。
* Pops the given task only if it is at the current top.
* 共享版本的仅能通过FJP.tryExternalUnpush 使用。
* (A shared version is available only via FJP.tryExternalUnpush)
*/
final boolean tryUnpush(ForkJoinTask<?> t) {
ForkJoinTask<?>[] a; int s;
// top == base 说明队列中没有任务
if ((a = array) != null && (s = top) != base &&
// 如果索引 --s 的元素是t,则获取 并且 设置该索引位置的元素为null
U.compareAndSwapObject
(a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
// top = top - 1
top = s;
return true;
}
return false;
}
/**
* 删除并取消所有已知任务,忽略任何异常
* Removes and cancels all known tasks, ignoring any exceptions.
*/
final void cancelAll() {
// 取消当前等待完成的任务
ForkJoinTask.cancelIgnoringExceptions(currentJoin);
// 取消当前窃取的任务
ForkJoinTask.cancelIgnoringExceptions(currentSteal);
// 取消队列中的所有任务
// poll() 按FIFO(先进先出) 顺序获取下一个任务(如果存在的话)。
for (ForkJoinTask<?> t; (t = poll()) != null; )
ForkJoinTask.cancelIgnoringExceptions(t);
}
// Specialized execution methods
/**
* 轮询并运行任务,直到队列为空
* Polls and runs tasks until empty.
*/
// FIFO 模式
final void pollAndExecAll() {
// poll()获取并删除第一个元素,如果队列为空返回null (实现FIFO)
for (ForkJoinTask<?> t; (t = poll()) != null;)
t.doExec(); // 执行任务
}
/**
* 执行顶级任务和执行后剩余的任何本地任务。
* Executes a top-level task and any local tasks remaining
* after execution.
*/
final void runTask(ForkJoinTask<?> task) {
// currentSteal = task
if ((currentSteal = task) != null) {
ForkJoinWorkerThread thread;
// 执行任务
task.doExec();
ForkJoinTask<?>[] a = array;
// 0: lifo, > 0: fifo, < 0: shared
int md = mode;
// nsteals + 1
++nsteals;
currentSteal = null;
// 默认情况下使用的是LIFO_QUEUE模式,即md = 0
// FIFO
if (md != 0) {
// 运行此方法的workQueue一定不是 shared模式的,因此如果md != 0,
// 那么 md 一定大于0,即 FIFO 模式
pollAndExecAll();
}
// LIFO,默认情况下使用的是这个模式,因为后面的任务是由前面的任务分割的,任务比较小,
// 较大的任务让其他线程去执行,这样任务分摊比较均匀
else if (a != null) {
int s, m = a.length - 1;
ForkJoinTask<?> t;
// (s = top - 1) - base >= 0 说明队列中还有任务
while ((s = top - 1) - base >= 0 &&
// 获取索引s (top - 1)的元素,并设置该索引位置的元素为null
(t = (ForkJoinTask<?>)U.getAndSetObject
(a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
// top = top -1
top = s;
// 执行任务
t.doExec();
}
}
if ((thread = owner) != null) // no need to do in finally clause
// 默认为空,由InnocuousForkJoinWorkerThread 实现
thread.afterTopLevelExec();
}
}
/**
* 如果给定的任务存在,从队列中移除并执行,或任何其他已取消的任务。
* If present, removes from queue and executes the given task,
* or any other cancelled task. Returns (true) on any CAS
* 在任何CAS或一致性检查失败时返回(true),以便调用者可以重试。
* or consistency check failure so caller can retry.
*
* 如果没有取得进展返回false,否则返回true
* @return false if no progress can be made, else true
*/
// 遍历整个workQueue队列去查找任务,找到则执行这个任务
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
boolean stat;
ForkJoinTask<?>[] a; int m, s, b, n;
// (n = (s = top) - (b = base)) > 0) 说明队列中有任务
if (task != null && (a = array) != null && (m = a.length - 1) >= 0 &&
(n = (s = top) - (b = base)) > 0) {
boolean removed = false, empty = true;
stat = true;
// 从s(top) 开始遍历到 b(base)
for (ForkJoinTask<?> t;;) { // traverse from s to b
// 计算--s 索引位置的地址偏移量
long j = ((--s & m) << ASHIFT) + ABASE;
t = (ForkJoinTask<?>)U.getObject(a, j);
// t == null 说明任务被其他线程取出了,有可能是目标任务,因此退出,
// 返回true,外部会判断目标任务是否已经完成。(一致性检查失败)
if (t == null) // inconsistent length 长度不一致
// 此时 stat = true,因此会返回true,返回true后,如果判断任务没有完成会
// 继续调用这个方法
break;
else if (t == task) { // 找到task
// 如果task 是队列的最后一个任务,直接取出这个任务
if (s + 1 == top) { // pop
if (!U.compareAndSwapObject(a, j, task, null))
// 如果CAS失败,break,此时 stat = true,因此方法会返回true,外部方法
// 会判断任务是否已经完成了
break;
// CAS成功,top = top -1
top = s;
// removed = true 表示任务已经移除
removed = true;
}
// task不是队列的最后一个任务
// 一致性检查,如果 base != b,说明有线程取出了base索引所在的任务,
// 对于这种情况返回true,判断任务是否完成(因为有可能取出的是目标这个任务),
// 而且这种情况下有可能不止一个任务被取出,所以不重新检查任务是否还在队列中,
// 这样可以快速响应join()
else if (base == b) // replace with proxy 替换为代理
// 使用EmptyTask 来替换这个任务
removed = U.compareAndSwapObject(a, j, task,
new EmptyTask());
break;
}
// t.status >= 0 说明队列中除了目标任务外还有未完成的任务
else if (t.status >= 0)
empty = false;
// 执行到这里说明t.status < 0, s + 1 == top 说明是队列的最后一个任务
else if (s + 1 == top) { // pop and throw away
// 任务已经取消,并且这个任务是队列的最后一个任务,
// 那么直接扔掉这个任务 (取消的任务在队列的最后一个任务才会被直接扔掉)
if (U.compareAndSwapObject(a, j, t, null))
// top = top - 1
top = s;
break;
}
// --n == 0 说明遍历完了
if (--n == 0) {
// base == b 说明没有任务被取出,这种情况下才可以确定任务不在此workQueue中,
// 否则需要返回true,判断任务是否已经完成,未完成进行重试(继续检查任务是否完成)
if (!empty && base == b)
stat = false;
break;
}
}
// ------ for循环 end ----------------
if (removed)
// 如果成功取出这个任务,则执行这个任务
task.doExec();
}
else
// 队列为空,返回false,没必要再轮询这个队列
stat = false;
// 队列为空 或者 队列不为空并且base没有改变 时返回false
return stat;
}
/**
* 尝试轮询并执行给定的任务或任何其子任务
* Tries to poll for and execute the given task or any other
* task in its CountedCompleter computation.
*/
// 判断队列中base索引位置的任务是否是root任务或者其子任务,如果是,取出并执行。
// 如果base索引位置的任务已经被其他线程获取,或者base索引位置的任务是root任务
// 或者其子任务,返回true; 否则返回false
final boolean pollAndExecCC(CountedCompleter<?> root) {
ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r;
// (b = base) - top < 0 说明队列中有任务
if ((b = base) - top < 0 && (a = array) != null) {
// 计算队列base 索引位置的偏移量 (因为这个方法是由其他线程调用的,所以从base位置获取任务)
long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 如果base位置的任务为null,说明存在竞争,返回true,进行重试
if ((o = U.getObjectVolatile(a, j)) == null)
return true; // retry
if (o instanceof CountedCompleter) {
for (t = (CountedCompleter<?>)o, r = t;;) {
// 找到目标任务root,取出base索引位置的任务,并执行
if (r == root) {
// base == b 判断base索引位置的任务没有被其他线程获取
if (base == b &&
U.compareAndSwapObject(a, j, t, null)) {
// CAS成功取出任务,修改 base = base + 1
U.putOrderedInt(this, QBASE, b + 1);
// 执行任务
t.doExec();
}
// 返回true,进行重试,继续判断base位置的任务是否是目标任务root的子任务
return true;
}
else if ((r = r.completer) == null)
// base索引位置的任务不是 root 任务的子任务
break; // not part of root computation 不是root 任务计算的一部分
}
}
}
// 返回false不进行重试
return false;
}
/**
* 尝试弹出并执行给定的任务或者其叶子任务 (只判断队列的最后一个任务)
* Tries to pop and execute the given task or any other task
* in its CountedCompleter computation.
*/
final boolean externalPopAndExecCC(CountedCompleter<?> root) {
ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
// 判断队列不为空
if (base - (s = top) < 0 && (a = array) != null) {
// 计算最后一个元素的地址偏移量
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
for (t = (CountedCompleter<?>)o, r = t;;) {
// 找到目标任务 (根任务)
if (r == root) {
// 使用CAS获取队列的锁
if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
// 一致性检查,并使用CAS修改最后一个元素为null (获取任务的执行权)
if (top == s && array == a &&
U.compareAndSwapObject(a, j, t, null)) {
// 修改top = top - 1
top = s - 1;
// 释放锁
qlock = 0;
// 执行任务
t.doExec();
}
else
// 一致性检查失败,或者获取任务的执行权失败,释放锁
qlock = 0;
}
// 返回true,说明有进展
return true;
}
// r指向父节点任务
else if ((r = r.completer) == null)
break;
}
}
}
// 没有进展返回false
return false;
}
/**
* Internal version
*/
// 判断工作队列的最后一个任务是否是任务root,或者其子任务,如果是的话,取出这个任务并执行,
// 然后返回true,否则返回false。
// ExecCC -> ExecCountedCompleter
final boolean internalPopAndExecCC(CountedCompleter<?> root) {
ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
// base - (s = top) < 0 说明队列中有元素
if (base - (s = top) < 0 && (a = array) != null) {
// 计算队列中最后一个元素的索引位置
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
for (t = (CountedCompleter<?>)o, r = t;;) {
// 找到root 任务
if (r == root) {
// 使用CAS取出任务,并执行它 (注意:这里取出的是任务t,而不是任务r)
if (U.compareAndSwapObject(a, j, t, null)) {
// CAS成功,top = top - 1,不会存在线程安全问题,因为CAS成功了才能修改top,
// 而且只有工作线程本身才可以从top取出任务,其他工作线程只能从base取出任务
top = s - 1;
// doExec()里面会判断任务是否已经执行,没有执行过才会执行
t.doExec();
}
// 找到任务并执行,返回true
return true;
}
// r
// / \
// b c
// / \ / \
// d e f g
// 比如root 是上图中的任务r(即工作线程要等待任务r的完成),d是此工作线程中队列的最后
// 一个任务,如果d的类型是 CountedCompleter,那么一级一级往上判断d的父节点任务是否
// 是root任务,如果是则从队列中取出这个任务,并执行(任务还在队列中,所以一定还没有执行过)。
// 任务r的完成需要任务d先完成,所以有必要去执行任务d。
// r = r.completer,判断
else if ((r = r.completer) == null)
// 到达根任务了,没有任务t不是目标任务root的子任务,返回false
break;
}
}
}
return false;
}
/**
* 如果队列拥有者未阻塞,返回true
* Returns true if owned and not known to be blocked.
*/
final boolean isApparentlyUnblocked() {
Thread wt; Thread.State s;
// eventCount 小于0 说明线程不是活动状态(或者准备进入等待状态)
return (eventCount >= 0 &&
(wt = owner) != null &&
(s = wt.getState()) != Thread.State.BLOCKED && // 判断线程不是阻塞或者等待状态
s != Thread.State.WAITING &&
s != Thread.State.TIMED_WAITING);
}
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long QBASE;
private static final long QLOCK;
private static final int ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = WorkQueue.class;
Class<?> ak = ForkJoinTask[].class;
QBASE = U.objectFieldOffset
(k.getDeclaredField("base"));
QLOCK = U.objectFieldOffset
(k.getDeclaredField("qlock"));
// 报告给定数组类的存储分配中第一个元素的偏移量。
ABASE = U.arrayBaseOffset(ak);
// 获取数组元素的地址偏移量
int scale = U.arrayIndexScale(ak);
// 数组元素的地址偏移量必须是2的幂
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// Integer.numberOfLeadingZeros(scale) -> 判断最高位的1的左边的0的个数
// ASHIFT -> 表示 scale是 1向左移动 ASHIFT 位后得到的数 (即 scale = 1 << ASHIFT)
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
}
// static fields (initialized in static initializer below)
/**
* Creates a new ForkJoinWorkerThread. This factory is used unless
* overridden in ForkJoinPool constructors.
*/
public static final ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory;
/**
* Permission required for callers of methods that may start or
* kill threads.
*/
private static final RuntimePermission modifyThreadPermission;
/**
* Common (static) pool. Non-null for public use unless a static
* construction exception, but internal usages null-check on use
* to paranoically avoid potential initialization circularities
* as well as to simplify generated code.
*/
static final ForkJoinPool common;
/**
* Common pool parallelism. To allow simpler use and management
* when common pool threads are disabled, we allow the underlying
* common.parallelism field to be zero, but in that case still report
* parallelism as 1 to reflect resulting caller-runs mechanics.
*/
static final int commonParallelism;
/**
* Sequence number for creating workerNamePrefix.
*/
private static int poolNumberSequence;
/**
* Returns the next sequence number. We don't expect this to
* ever contend, so use simple builtin sync.
*/
// 静态同步方法
private static final synchronized int nextPoolId() {
return ++poolNumberSequence;
}
// static constants
/**
* 初始超时值(以纳秒为单位),用于触发暂停以等待新工作的线程。
* Initial timeout value (in nanoseconds) for the thread
* triggering quiescence to park waiting for new work. On timeout,
* 在超时时,线程将尝试减少workers的数量。
* the thread will instead try to shrink the number of
* 该值应该足够大,以避免过度积极收缩在大多数瞬态停机(长GCs等)。
* workers. The value should be large enough to avoid overly
* aggressive shrinkage during most transient stalls (long GCs
* etc).
*/
// 2 秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
/**
* 当线程数超过并行度级别时的超时值 (0.2秒)
* Timeout value when there are more threads than parallelism level
*/
private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L;
/**
* 对空闲超时的容忍,以应对计时器不足
* Tolerance for idle timeouts, to cope with timer undershoots
*/
private static final long TIMEOUT_SLOP = 2000000L;
/**
* The maximum stolen->joining link depth allowed in method
* tryHelpStealer. Must be a power of two. Depths for legitimate
* chains are unbounded, but we use a fixed constant to avoid
* (otherwise unchecked) cycles and to bound staleness of
* traversal parameters at the expense of sometimes blocking when
* we could be helping.
*/
private static final int MAX_HELP = 64;
/**
* seed 生成器增量。
* Increment for seed generators. See class ThreadLocal for
* explanation.
*/
private static final int SEED_INCREMENT = 0x9e3779b9;
/*
* 控制变量的位和掩码
* Bits and masks for control variables
*
* 字段ctl是一个long 类型:
* Field ctl is a long packed with:
*
* 活动运行workers的数量减去目标并行度(16 位) (ActiveCount)
* AC: Number of active running workers minus target parallelism (16 bits)
*
* 总workers数减去目标并行度(16位) (TotalCount)
* TC: Number of total workers minus target parallelism (16 bits)
*
* 如果池终止,则为true (1位) (ST: status)
* ST: true if pool is terminating (1 bit)
*
* 顶部等待线程的等待计数(15位). ID记录的线程的等待次数,避免ABA问题
* EC: the wait count of top waiting thread (15 bits)
*
* waiters的Treiber顶栈的 poolIndex(16位)
* ID: poolIndex of top of Treiber stack of waiters (16 bits)
*
* 方便时,我们可以提取计数的上32位和队列状态的下32位,u = (int)(ctl >>> 32)和e = (int)ctl)
* (ctl 的高32位是计数,低32位是队列状态)
* When convenient, we can extract the upper 32 bits of counts and
* the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
* ec字段从不单独访问,但总是与id和st一起访问。
* (int)ctl. The ec field is never accessed alone, but always
* together with id and st.
* 目标并行度和字段位置的计数偏移使得通过字段的符号测试执行最常见的检查成为可能:
* The offsets of counts by the target
* parallelism and the positionings of fields makes it possible to
* perform the most common checks via sign tests of fields: When
* 当ac为负时,没有足够的活动的workers,当tc为负时,没有足够的总workers,当e为负时,池终止。
* ac is negative, there are not enough active workers, when tc is
* negative, there are not enough total workers, and when e is
* negative, the pool is terminating. To deal with these possibly
* 为了处理这些可能出现的负数字段,我们使用强制转换成“short” 和/或 符号移位以保持符号性。
* negative fields, we use casts in and out of "short" and/or
* signed shifts to maintain signedness.
*
* 当一个线程进入队列(未激活)时,它的eventCount字段被设置为负值,这是判断worker
* 是否被阻止执行任务的唯一方法,即使它必须继续扫描任务以避免队列争用。
* When a thread is queued (inactivated), its eventCount field is
* set negative, which is the only way to tell if a worker is
* prevented from executing tasks, even though it must continue to
* scan for them to avoid queuing races. Note however that
* 但是请注意,eventCount更新延迟发布,因此使用时需要谨慎。
* eventCount updates lag releases so usage requires care.
*
* plock 字段是int 类型的
* Field plock is an int packed with:
* true 如果可以shutdown(1位)
* SHUTDOWN: true if shutdown is enabled (1 bit)
* 系列化锁,锁定时PL_LOCK位被设置 (30位)
* SEQ: a sequence lock, with PL_LOCK bit set if locked (30 bits)
* 可能有线程正在等待这个锁时被设置(1位)
* SIGNAL: set when threads may be waiting on the lock (1 bit)
*
* 序列号支持简单的一致性检查:可以通过比较读取前后的plock来检查workQueues数组上的只读操作是否过时。
* The sequence number enables simple consistency checks:
* Staleness of read-only operations on the workQueues array can
* be checked by comparing plock before vs after the reads.
*/
// bit positions/shifts for fields
private static final int AC_SHIFT = 48;
private static final int TC_SHIFT = 32;
private static final int ST_SHIFT = 31;
private static final int EC_SHIFT = 16;
// bounds (1 << 16) - 1 -> SMASK = 1111 1111 1111 1111
private static final int SMASK = 0xffff; // short bits
// MAX_CAP = 0111 1111 1111 1111
private static final int MAX_CAP = 0x7fff; // max #workers - 1
// 1111 1111 1111 1110
private static final int EVENMASK = 0xfffe; // even short bits
// 0x007e (126) -> 1111110 (最大64个偶数槽)
// 111111 = 63,加上 0000000 (0槽位), 总共64个偶数槽
private static final int SQMASK = 0x007e; // max 64 (even) slots
private static final int SHORT_SIGN = 1 << 15;
private static final int INT_SIGN = 1 << 31;
// masks ST_SHIFT = 31 -> STOP_BIT = 1L << 31
private static final long STOP_BIT = 0x0001L << ST_SHIFT;
// SMASK = 0xffff -> (1 << 15) - 1; AC_SHIFT = 48
// AC_MASK 高16位为: 1111 1111 1111 1111,低48位为0
// 1111 1111 1111 1111 000000000000000000000000000000000000000000000000
private static final long AC_MASK = ((long)SMASK) << AC_SHIFT;
// TC_MASK 高32位为:0000 0000 0000 0000 1111 1111 1111 1111,低32位为0
// 0000 0000 0000 0000 1111 1111 1111 1111 00000000000000000000000000000000
private static final long TC_MASK = ((long)SMASK) << TC_SHIFT;
// TC_SHIFT = 32 TC_UNIT = 1L << 32
// units for incrementing and decrementing
private static final long TC_UNIT = 1L << TC_SHIFT;
// AC_SHIFT = 48; -> AC_UNIT = 1L << 48
private static final long AC_UNIT = 1L << AC_SHIFT;
// masks and units for dealing with u = (int)(ctl >>> 32)
// AC_SHIFT = 48, --> UAC_SHIFT = 16
private static final int UAC_SHIFT = AC_SHIFT - 32;
// TC_SHIFT = 32 -> UTC_SHIFT = 0
private static final int UTC_SHIFT = TC_SHIFT - 32;
// UAC_SHIFT = 16, SMASK = (1 << 16) - 1, UAC_MASK = 1111 1111 1111 1111 0000 0000 0000 0000
private static final int UAC_MASK = SMASK << UAC_SHIFT;
// SMASK = (1 << 16) - 1, UTC_SHIFT = 0 -> (1 << 16) - 1
private static final int UTC_MASK = SMASK << UTC_SHIFT;
// UAC_SHIFT = 16 -> UAC_UNIT = 1 << 16
private static final int UAC_UNIT = 1 << UAC_SHIFT;
// UTC_SHIFT = 0 -> UTC_UNIT = 1
private static final int UTC_UNIT = 1 << UTC_SHIFT;
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// masks and units for dealing with e = (int)ctl
private static final int E_MASK = 0x7fffffff; // no STOP_BIT
// EC_SHIFT = 16, E_SEQ = 1 << 16
private static final int E_SEQ = 1 << EC_SHIFT;
// plock bits
private static final int SHUTDOWN = 1 << 31;
// 0010
private static final int PL_LOCK = 2;
private static final int PL_SIGNAL = 1;
private static final int PL_SPINS = 1 << 8;
// access mode for WorkQueue
static final int LIFO_QUEUE = 0; // asyncMode = false 使用这个模式,默认为false
static final int FIFO_QUEUE = 1; // asyncMode = true 使用这个模式
static final int SHARED_QUEUE = -1;
// Instance fields
volatile long stealCount; // collects worker counts
// 主要池控制
// 从高到低:
// 16位:AC: 活动运行workers的数量减去parallelism
// 16位:TC: 总workers数减去parallelism
// 1位: ST: 表示池是否已经终止 (1终止)
// 15位:EC: 顶部等待线程的等待计数。ID记录的线程的等待次数,避免ABA问题
// 16位: ID: waiters的Treiber顶栈的 poolIndex (低32就是取等待线程顶栈eventCount的值)
volatile long ctl; // main pool control
// 最高位:表示池是否已经关闭,若已关闭,则值为1,即 plock < 0
// 最低位:表示有线程在等待获取锁,需要唤醒
// 中间30位:最低位(plock的右边第二位)表示锁是否被获取,1表示已被获取,其他位表示锁的系列号
// 每获取一次锁,plock的值加2,释放锁plock的值再加2
volatile int plock; // shutdown status and seqLock
volatile int indexSeed; // worker/submitter index seed
final short parallelism; // parallelism level
final short mode; // LIFO/FIFO
// 工作队列的索引是奇数,共享(提交)队列的索引是偶数的,最多64个槽 (偶数槽最多64个)
WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
final UncaughtExceptionHandler ueh; // per-worker UEH
// 构造方法初始化
final String workerNamePrefix; // to create worker name string
/**
* 获取plock锁以保护工作者数组和相关更新。
* Acquires the plock lock to protect worker array and related
* 只有在plock上的初始CAS失败时才调用此方法。
* updates. This method is called only if an initial CAS on plock
* fails.
* 这在正常情况下起自旋锁的作用,但在(很少)需要时,又会回到内置监视器来阻塞。
* 对于高度争用的锁来说,这是一个糟糕的主意,但是作为纯自旋锁的一个更保守的替代方案,
* 这是可行的。
* This acts as a spinlock for normal cases, but falls back
* to builtin monitor to block when (rarely) needed. This would be
* a terrible idea for a highly contended lock, but works fine as
* a more conservative alternative to a pure spinlock.
*/
// 获取锁,并返回获取锁后 plock的值
private int acquirePlock() {
// PL_SPINS = 1 << 8 (256)
int spins = PL_SPINS, ps, nps;
for (;;) {
// PL_LOCK = 2 -> 0010,若plock 右边第二位为0,并使用CAS修改右边第二位为1成功
if (((ps = plock) & PL_LOCK) == 0 &&
U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
// 返回修改 plock的值
return nps;
else if (spins >= 0) {
// 随机生成threadLocalRandomSecondarySeed,并返回
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
// 线程的 threadLocalRandomSecondarySeed >= 0, spins才会递减
--spins;
}
// 超出自旋次数, PL_SIGNAL = 1, 使用CAS把 plock的右边第一位修改为1 (表示线程需要唤醒)
else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
synchronized (this) {
// plock 右边第一位为1 才能进入等待状态,即判断获取锁后上一个获取 plock的线程还没执行唤醒操作
if ((plock & PL_SIGNAL) != 0) {
try {
// wait() 等价于 this.wait(),所以监视器是 this
// wait() 是在 for循环里面,可以避免虚假唤醒
wait();
} catch (InterruptedException ie) {
try {
// 线程被中断,重新中断线程
Thread.currentThread().interrupt();
} catch (SecurityException ignore) {
}
}
}
else
// todo 回头再看一下,是不是调用notifyAll之前先设置了状态?
// 是的,获取plock的线程先释放锁,再去竞争this锁,然后执行 notifyAll(),而不是先竞争this锁,
// 然后再释放plock锁,唤醒等待线程,使用了快速释放锁的方式,而其他线程可以帮忙先执行唤醒操作。
// 唤醒在this等待的所有线程。
// 因为获取plock锁的线程释放锁后,先设置了 plock的值,表示已经释放锁,然后再获取 this监视器,
// 唤醒所有线程,因此虽然plock锁已经释放了,但是此时可能还没唤醒等待的线程,获取锁执行唤醒操作,
// 而不是等待上一个获取plock锁的线程竞争到this锁后才执行唤醒操作,减少线程的等待时间
notifyAll();
}
}
}
}
/**
* 解锁并向任何等待锁的线程发出信号。
* Unlocks and signals any thread waiting for plock. Called only
* when CAS of seq value for unlock fails.
*/
// 1、对plock进行赋值,释放锁 2、唤醒所有等待的线程
private void releasePlock(int ps) {
// 直接赋值而不是使用CAS的原因是:
// -> 获取锁,然后释放锁失败才会调用这个方法,而CAS失败只能是最高位/最低位变了,
// 最高位表示池是否关闭,而池进行关闭需要先获取锁,因此不可能;最低位变了表示有
// 线程已经进入等待状态,需要唤醒,我们在赋值后,调用notifyAll()即可
plock = ps;
// 先释放锁,再进行唤醒
synchronized (this) { notifyAll(); }
}
/**
* 如果并行度低于目标水平,则尝试创建并启动一个worker。在故障时调整计数等。
* Tries to create and start one worker if fewer than target
* parallelism level exist. Adjusts counts etc on failure.
*/
private void tryAddWorker() {
long c; int u, e;
// 获取ctl高32位的值 (ctl的 33-48位 和 49-64的初始值都是 -parallelism)
// (u = (int)((c = ctl) >>> 32)) < 0 说明:活动运行workers的数量小于 parallelism
// SHORT_SIGN = 1 << 15, u & SHORT_SIGN) != 0 说明:总workers数小于parallelism
// ctl 第32位的值为:ST(status),即池的状态,1表示池已终止; (int) c >= 0说明池还没终止。
while ((u = (int)((c = ctl) >>> 32)) < 0 &&
(u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) {
// UTC_UNIT = 1, UTC_MASK = (1 << 16) -1
// ((u + UTC_UNIT) & UTC_MASK) 执行结果为: u + 1,然后保留低16位的值
// UAC_UNIT = 1 << 16, UAC_MASK = 1111 1111 1111 1111 0000 0000 0000 0000
// ((u + UAC_UNIT) & UAC_MASK) 执行结果为:u的高16位 + 1, 然后保留高16位
// e = (int)c ,即ctl 低32位的值
// 整个式子执行结果为:ctl的高16位的值加1,次高16的值加1,低32位的值保持不变
// 也就是 AC + 1, TC + 1。 AC: 活动运行workers的数量 - parallelism ; TC: 总workers数 - parallelism
long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) |
((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e;
// 使用CAS修改ctl的值为nc
if (U.compareAndSwapLong(this, CTL, c, nc)) {
ForkJoinWorkerThreadFactory fac;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
// 使用ForkJoin线程池工厂创建一个ForkJoinWorkerThread 线程
if ((fac = factory) != null &&
(wt = fac.newThread(this)) != null) {
// 启动线程
wt.start();
break;
}
} catch (Throwable rex) {
ex = rex;
}
// 线程创建或者启动失败,注销 wt
deregisterWorker(wt, ex);
break;
}
}
}
// 注册和注销workers
// Registering and deregistering workers
/**
* 从ForkJoinWorkerThread调用,建立并记录它的WorkQueue.
* Callback from ForkJoinWorkerThread to establish and record its
* WorkQueue.
* 为了避免由于在workQueues数组前填充条目而导致的扫描偏差,我们将该数组视为一个简单的二幂哈希表,
* 使用每个线程种子作为哈希,根据需要进行扩展。
* To avoid scanning bias due to packing entries in
* front of the workQueues array, we treat the array as a simple
* power-of-two hash table using per-thread seed as hash,
* expanding as needed.
*
* @param wt the worker thread
* @return the worker's queue
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
// 设置线程为守护线程
wt.setDaemon(true);
if ((handler = ueh) != null)
// 设置 Thread的uncaughtExceptionHandler 字段值
wt.setUncaughtExceptionHandler(handler);
// 使用CAS更新 indexSeed,在原来的值上增加 SEED_INCREMENT,直到成功为止,
// 并且更新后的 indexSeed 不能为0,如果为0,继续增加
do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
s += SEED_INCREMENT) ||
s == 0); // skip 0
// 创建工作队列
WorkQueue w = new WorkQueue(this, wt, mode, s);
// PL_LOCK = 2 -> 0010 , 如果 PL_LOCK 的第二位为1 (plock 池的锁)
// || PL_LOCK的第二位为0,但使用CAS把第二位修改为1失败
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
// 若锁已被获取,或者获取锁失败,调用acquirePlock()方法获取锁,并返回获取锁后 plock的值
ps = acquirePlock();
// SHUTDOWN = 1 << 31
// ps & SHUTDOWN -> 获取ps 第32位的值
// & ~SHUTDOWN 的作用是把第32位的值设置为0,避免溢出而导致第32位的值为1
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
try {
// 如果池已经关闭,则跳过
if ((ws = workQueues) != null) { // skip if shutting down
int n = ws.length, m = n - 1;
// x = indexSeed, | 1的作用是,保证 r的值为奇数
int r = (s << 1) | 1; // use odd-numbered indices 使用奇数索引
// r = r & m, 若该索引位置上已经有其他元素了,表示索引位置冲突了
if (ws[r &= m] != null) { // collision 冲突
int probes = 0; // step by approx half size
// EVENMASK = 0xfffe -> 1111 1111 1111 1110
// (n >>> 1) & EVENMASK 保证是个偶数
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[r = (r + step) & m] != null) {
if (++probes >= n) {
// 冲突次数达到了n次 (不包含前面那次),对 workQueues进行扩容,容量增加一倍
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
// 把workQueue在 workQueues数组中的索引位置保存到 workQueue中
w.poolIndex = (short)r;
// volatile 写顺序
w.eventCount = r; // volatile write orders
// 把w 放到workQueues的索引 r位置
ws[r] = w;
}
} finally {
// 使用CAS修改 plock的值,释放锁
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
// 1、对plock进行赋值,释放锁 2、唤醒所有等待的线程
releasePlock(nps);
}
// 设置线程名称, w.poolIndex >>> 1 是因为都保存在奇数索引位置上
wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1)));
return w;
}
/**
* 终止worker的最后回调,以及构造或启动worker失败时进行调用。
* Final callback from terminating worker, as well as upon failure
* to construct or start a worker. Removes record of worker from
* 从数组中删除worker记录,并调整计数。如果池正在关闭,则尝试完成终止。
* array, and adjusts counts. If pool is shutting down, tries to
* complete termination.
*
* @param wt the worker thread, or null if construction failed
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
// (w = wt.workQueue) != null 说明该线程的 workQueue已经被注册到 workQueues中了
if (wt != null && (w = wt.workQueue) != null) {
int ps;
// 设置 w.qlock = -1,表示该workQueue已经终止
w.qlock = -1; // ensure set 确保有设置 ( awaitWork()方法等待超时退出也会设置 )
// 该workQueue窃取的任务数量汇总到ForkJoinPool的实例字段stealCount 中
U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
// PL_LOCK = 0010, ((ps = plock) & PL_LOCK) != 0 说明plock锁已经被获取了
// 或者 使用CAS 获取 plock锁失败,则调用acquirePlock()方法获取锁,并返回获取锁后 plock的值
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
// 获取 plock锁
ps = acquirePlock();
// SHUTDOWN = 1 << 31, ps & SHUTDOWN 获取 ps第32位的值,该位表示池是否终止,而不是锁的系列号
// (ps + PL_LOCK) & ~SHUTDOWN 强制把运算后第32位的值置为0,因为加完后值有可能溢出而
// 导致第32位的值变成1,因此 需要分两步进行计算
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
try {
int idx = w.poolIndex;
WorkQueue[] ws = workQueues;
// 从 workQueues中移除要删除的 queue
if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
ws[idx] = null;
} finally {
// 虽然已经获取锁了,但是最低位是有可能变的,因此需要使用CAS
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
// 1、对plock进行赋值,释放锁 2、唤醒所有等待的线程
releasePlock(nps);
}
}
// 上面只是从 workQueues中移除 workQueue,还没更新计数
// 调整ctl 计数
long c; // adjust ctl counts
// AC_UNIT = 1L << 48, AC_MASK 高16位为: 1111 1111 1111 1111,低48位为0
// TC_UNIT = 1L << 32, TC_MASK 高32位为:0000 0000 0000 0000 1111 1111 1111 1111,低32位为0
// 线程活动计数、线程总数分别减1
// c & ~(AC_MASK|TC_MASK) 获取c 低32位的值,高32位的值取0
// 调整ctl计数,直到成功为止。 计算结果为:线程活动数量、线程总数量-1,低32位保持不变
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))));
// tryTerminate() 尝试终止池,因为有可能被移除的线程的存在而导致池不能终止(活动线程数大于0),
// 如果正在终止/已经终止,返回true,否则返回false。 如果返回true,没必要进行以下操作,
// 因为tryTerminate() 方法里面会进行操作
if (!tryTerminate(false, false) && w != null && w.array != null) {
// 取消剩余的任务
w.cancelAll(); // cancel remaining tasks
WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
// (u = (int)((c = ctl) >>> 32)) < 0 说明池中活动的线程数量未达到parallelism
// (e = (int)c) >= 0 说明池未终止,> 0 说明有等待的线程
while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
// e > 0 说明有等待的线程,进行唤醒
if (e > 0) { // activate or create replacement
// SMASK = 1111 1111 1111 1111
if ((ws = workQueues) == null ||
(i = e & SMASK) >= ws.length || // 等待线程的索引超出了ws的索引
(v = ws[i]) == null)
break; // 退出循环
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// UAC_UNIT = 1 << 16; (u + UAC_UNIT) << 32) 活动线程数量加1
long nc = (((long)(v.nextWait & E_MASK)) |
((long)(u + UAC_UNIT) << 32));
// v 为等待线程的顶栈,INT_SIGN = 1 << 31。因为线程在进入等待之前都需要把eventCount设置为负数
// 所以 v.eventCount == (e | INT_SIGN) 时线程才可能处于等待状态。
// 判断eventCount值是否改变,减少不必要的CAS操作,因为如果这里不相等,下面的CAS肯定失败
if (v.eventCount != (e | INT_SIGN))
break;
// 不会存在ABA问题,比如: A线程是要唤醒的线程,而唤醒之前A.eventCount会 + 1,
// 然后A线程又进入了等待状态,虽然 ctl记录的等待线程的顶栈都是A线程,但是ctl的
// 第17位到第31位值已经不一样了。所以不会有ABA问题。
if (U.compareAndSwapLong(this, CTL, c, nc)) {
// E_SEQ = 1 << 16,等待次数 + 1,并把最高位设置回0
v.eventCount = (e + E_SEQ) & E_MASK;
// 唤醒线程
if ((p = v.parker) != null)
U.unpark(p);
break; // 退出循环,只唤醒一个线程
}
}
else {
// 池中没有等待的线程,但是线程总数未达到 parallelism,因此添加新的线程 (前提是池还未进行终止)
if ((short)u < 0)
tryAddWorker();
break;
}
}
}
// 线程退出时,帮助清理过期引用
if (ex == null) // help clean refs on way out
// 轮询过期的异常记录引用并删除它们
ForkJoinTask.helpExpungeStaleExceptions();
else // rethrow
// 重新抛出异常
ForkJoinTask.rethrow(ex);
}
// Submissions
/**
* 除非关闭,否则将给定的任务添加到提交者当前队列索引(模块化提交范围)的提交队列中。
* Unless shutting down, adds the given task to a submission queue
* at submitter's current queue index (modulo submission
* 该方法只直接处理最常见的路径。其他的都转接到fullExternalPush。
* range). Only the most common path is directly handled in this
* method. All others are relayed to fullExternalPush.
*
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a;
// 获取线程的threadLocalRandomProbe
int r = ThreadLocalRandom.getProbe();
int ps = plock;
WorkQueue[] ws = workQueues;
// 1、ps > 0 判断说明池未终止
// 2、ws != null && (m = (ws.length - 1)) >= 0 判断 ws不为空
// 3、SQMASK = 0x007e -> 1111110,强制把最后一位 置为0,即偶数。非FrokJoinWorkerThread线程
// 提交的任务必须提交到 ws的偶数索引位置;
// 4、r != 0 说明 threadLocalRandomProbe 已经初始化完成,否则需要初始化
// 5、使用CAS获取WorkQueue q 的锁
if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock 使用CAS获取q的锁
// 1、workQueue放入workQueues之前没有对array 进行初始化,往workQueue添加任务时,发现array
// 没有初始化/容量不够时,才会对array进行初始化/扩容。因此当 q != null时,q.array可能为null
// 2、(am = a.length - 1) > (n = (s = q.top) - q.base), 判断array是否还有剩余空间。 am -> arrayMask
// 这里可以先读 top (非volatile),而不用先读 base (volatile) 的原因是:已经先获取了WorkQueue q 的锁,
// 因此已经保证了可见性。 对于queueSize()这类的只读操作,没有写操作,不需要获取锁,因此需要先读base,
// 保证可见性
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
// s = q.top。 计算索引 top的地址偏移量
int j = ((am & s) << ASHIFT) + ABASE;
// 把任务放入top 位置(top所在的索引位置是没有元素的).
// 使用 putOrderedObject() 保证刷新到主存中的顺序。因为当top != base时表示队列中
// 一定有元素,如果 top先于 task刷新到主存,那么top != base时,获取的任务可能是空的
U.putOrderedObject(a, j, task);
// top + 1
q.top = s + 1; // push on to deque
// 释放锁。释放锁不需要使用CAS
q.qlock = 0;
// n <= 1 说明workQueues中在提交此任务之前,队列中可能没有任务,工作线程可能因此
// 没有扫描到任务处于等待状态。或者 池中的工作线程由于长时间没有扫描到任务而退出了,
// 导致池中可能没有工作线程了。因此需要 通知/注册 工作线程,保证线程的活跃度
if (n <= 1)
// 参数传入了 workQueues, workQueue
signalWork(ws, q);
return;
}
// 释放锁
q.qlock = 0;
}
fullExternalPush(task);
}
/**
* 完整版的externalPush。这个方法在第一次向池提交第一个任务时调用,因此必须执行二次初始化。
* Full version of externalPush. This method is called, among
* other times, upon the first submission of the first task to the
* pool, so must perform secondary initialization. It also
* 它还通过查找ThreadLocal来检测外部线程的首次提交,如果索引队列为空或争用,
* 则创建一个新的共享队列。
* detects first submission by an external thread by looking up
* its ThreadLocal, and creates a new shared queue if the one at
* index if empty or contended. The plock lock body must be
* plock锁体必须是无异常的(所以没有try/finally),所以我们乐观地在锁
* 外分配新队列,如果不需要(很少)就丢弃它们。
* exception-free (so no try/finally) so we optimistically
* allocate new queues outside the lock and throw them away if
* (very rarely) not needed.
*
* 当plock为0时发生二次初始化,创建工作队列数组并将plock设置为一个有效值。
* Secondary initialization occurs when plock is zero, to create
* workQueue array and set plock to a valid value. This lock body
* 这个锁体也必须是无异常的。因为plock seq值最终可能会包围在0附近,
* 所以如果工作队列存在,这个方法就不会有害地失败初始化,同时仍在推进plock。
* must also be exception-free. Because the plock seq value can
* eventually wrap around zero, this method harmlessly fails to
* reinitialize if workQueues exists, while still advancing plock.
*/
private void fullExternalPush(ForkJoinTask<?> task) {
int r;
// 如果线程的threadLocalRandomProbe 还没初始化,则进行初始化
if ((r = ThreadLocalRandom.getProbe()) == 0) {
// 初始化 threadLocalRandomSeed 和 threadLocalRandomProbe
ThreadLocalRandom.localInit();
// 获取threadLocalRandomProbe
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int ps, m, k;
boolean move = false;
// plock < 0 说明池已经关闭,拒绝任务。池关闭时首先会设置 plock = -1,等池满足了终止条件
// 才会 ctl的停止位为1.
if ((ps = plock) < 0)
throw new RejectedExecutionException();
// ps == 0, plock = 0将发生二次初始化,二次初始化只是将plock设置为一个非0的有效值,
// 不会修改workQueues
// 判断workQueues是否已经进行初始化,如果还没初始化,则进行初始化
else if (ps == 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0) { // initialize workQueues
// 初始化workQueues
// parallelism <= MAX_CAP, MAX_CAP = 0111 1111 1111 1111
// 因此 workQueues的最大长度不会超过 1 << 15
int p = parallelism; // find power of two table size
// 确保至少两个槽位
int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
// 获取最小的大于n的2的幂,并乘以2 (parallelism小于 1 << 15,所以不会溢出)
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
// 创建WorkQueue数组,长度为n。当plock = 0时, ws可能非空,因此 nws 可能为null
WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
new WorkQueue[n] : null);
// PL_LOCK = 2 -> 0010 , 如果 PL_LOCK 的第二位为1
// || PL_LOCK的第二位为0,但使用CAS把第二位修改为1失败
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
// 调用acquirePlock()方法获取锁,并返回获取锁后 plock的值
ps = acquirePlock();
// 获取锁后需要再次判断workQueues 是否已经被初始化。如果 nws = null,不会对workQueues进行赋值
if (((ws = workQueues) == null || ws.length == 0) && nws != null)
// 初始化 workQueues
workQueues = nws;
// SHUTDOWN = 1 << 31
// ps & SHUTDOWN -> 获取ps 第32位的值
// & ~SHUTDOWN 的作用是把第32位的值设置为0,避免溢出而导致第32位的值为1
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
// 每获取一次锁,plock的值加2,释放锁plock的值再加2
// (虽然已经获取锁了,但是最高位和最低位是有可能变的,因此需要使用CAS)
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
// 1、对plock进行赋值 2、唤醒所有等待的线程
releasePlock(nps);
}
// 判断线程在 WorkQueues数组中对应的索引位置是否有workQueue
// r -> 线程的 threadLocalRandomProbe; m = ws.length - 1
// SQMASK = 0x007e -> 1111110,强制把最后一位 置为0,即偶数
// k = r & m & SQMASK 计算线程在WorkQueues数组中对应的索引位置
else if ((q = ws[k = r & m & SQMASK]) != null) {
// qlock 0表示没有锁定,1表示锁定 (plock 池的锁, qlock 工作队列的锁)
// 如果 qlock锁没有被获取,且使用CAS获取锁成功
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
// 创建WorkQueue时不会对 array进行初始化,因此 q.array可能为null
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false;
// 锁定版本的 push
try { // locked version of push
// top - base = 队列中元素的个数。如果 top + 1 - base = length,说明插入元素后
// 队列就没有剩余容量了,因此进行扩容。
// q.growArray() 对q.array进行初始化 / 扩容 (容量扩大一倍)
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) { // must presize
// a.length 一定是2的幂, s = q.top
// 计算索引s的地址偏移量 ( s = q.top )
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
// top + 1
q.top = s + 1;
// submitted = true 表示任务已提交
submitted = true;
}
} finally {
// 释放锁
q.qlock = 0; // unlock
}
if (submitted) {
// 任务提交,唤醒等待的线程或者注册新的worker
signalWork(ws, q);
return;
}
}
// 如果锁已经被其他线程获取了,那么设置move = true,move = true时会修改
// 线程的 threadLocalRandomProbe 值,即修改线程在workQueues中的映射位置
move = true; // move on failure
}
// 如果 workQueues已经初始化,且线程在workQueues 数组中对应的索引位置没有元素
else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
// 创建WorkQueue, 模式为:SHARED_QUEUE, seed 为线程的 threadLocalRandomProbe
// 参数为: ForkJoinPool pool, ForkJoinWorkerThread owner, int mode, nt seed
q = new WorkQueue(this, null, SHARED_QUEUE, r);
// k = r & m & SQMASK q在workQueues中对应的索引位置
// 设置workQueue在 workQueues中的索引位置
q.poolIndex = (short)k;
// 获取plock 获取
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
// 获取锁,并返回获取锁后 plock的值
ps = acquirePlock();
// 获取锁后再次判断 workQueues的索引k位置是否有元素
if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
// 索引k的元素赋值为q
ws[k] = q;
// 计算释放锁后plock的值,计算后的值最低位一定是0,而不管最低位是否已经变成1了,
// 如果已经变成1了CAS会失败,然后调用releasePlock()方法,释放锁,唤醒所有的线程
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
// 1、对plock进行赋值,释放锁 2、唤醒所有等待的线程
releasePlock(nps);
}
// workQueues已经初始化,线程对应的索引位置没有元素,且 plock已经被其他线程获取了
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
// Maintaining ctl counts
/**
* 活动线程数 + 1,主要在阻塞返回后调用
* Increments active count; mainly called upon return from blocking.
*/
final void incrementActiveCount() {
long c;
// AC_MASK 高16位为: 1111 1111 1111 1111,低48位为0
// AC_UNIT = 1L << 48, 计算结果为:低 48位不变,高16位加 1
// 活动线程数 + 1,直到成功
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((c & ~AC_MASK) |
((c & AC_MASK) + AC_UNIT))));
}
/**
* 如果激活的worker太少,则尝试创建或激活worker。
* Tries to create or activate a worker if too few are active.
*
* @param ws the worker array to use to find signallees
* 如果非空,这个队列包含待处理的任务
* @param q if non-null, the queue holding tasks to be processed
*/
final void signalWork(WorkQueue[] ws, WorkQueue q) {
for (;;) {
long c; int e, u, i; WorkQueue w; Thread p;
// 获取ctl高32位的值 (ctl的 33-48位 和 49-64的初始值都是 -parallelism)
// u >= 0 说明池中活动的工作线程已经达到parallelism,因此没有必要唤醒,也没有必要
// 注册新的工作线程,直接退出循环,返回。
if ((u = (int)((c = ctl) >>> 32)) >= 0)
break;
// (e = (int)c) <= 0 -> e = 0 说明池中没有等待的线程, e < 0 说明池已终止,tryAddWorker()方法里
// 会判断池是否已经终止,终止将不进行任何操作
if ((e = (int)c) <= 0) {
// (short)u 表示ctl的 33-48位的值,即 总workers数减去目标并行度 (-parallelism)
// u < 0说明 worker数量还没达到 parallelism的值,因此注册新的worker
if ((short)u < 0)
// 添加worker
tryAddWorker();
// 队列中没有等待的线程,在这里返回
break;
}
// e = (int)c , SMASK = 1111 1111 1111 1111。
// i -> waiters的Treiber顶栈的 poolIndex (等待线程顶栈的在workQueues中的索引)
// w = ws[i]) == null 说明没有等待线程,不需要进行通知,所以直接break,然后返回。
if (ws == null || ws.length <= (i = e & SMASK) ||
(w = ws[i]) == null)
break;
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// w 为等待线程的顶栈
// UAC_UNIT = 1 << 16,因为要唤醒一个线程,所以 活动线程数量 + 1
// 等待线程的栈顶指向下一个等待者,活动线程数 + 1
long nc = (((long)(w.nextWait & E_MASK)) |
((long)(u + UAC_UNIT)) << 32);
// w 为等待线程的顶栈,E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// E_SEQ = 1 << 16 等待次数 + 1,并且设置回正数
int ne = (e + E_SEQ) & E_MASK;
// w 为等待线程的顶栈,INT_SIGN = 1 << 31。因为线程在进入等待之前都需要把eventCount设置为负数
// 所以 w.eventCount == (e | INT_SIGN) 时线程才可能处于等待状态
// CAS操作不会存在ABA问题,比如: A线程是要唤醒的线程,而唤醒之前A.eventCount会 + 1,
// 然后A线程又进入了等待状态,虽然 ctl记录的等待线程的顶栈都是A线程,但是ctl的
// 第17位到第31位值已经不一样了。所以不会有ABA问题。
if (w.eventCount == (e | INT_SIGN) &&
U.compareAndSwapLong(this, CTL, c, nc)) {
// 修改被唤醒线程的 eventCount,即等待次数+1,并且设置回正数。线程的等待次数在被唤醒前
// 才进行 + 1操作,而不是在进入等待之前进行 + 1
w.eventCount = ne;
// 唤醒等待的线程
if ((p = w.parker) != null)
U.unpark(p);
break;
}
// q.base >= q.top 说明队列中已经没有任务了,因此不需要再进行唤醒操作。
// 这里先读 base,再读 top,因为 base是 volatile修饰的,而top 不是
if (q != null && q.base >= q.top)
break;
}
}
// Scanning for tasks
/**
* 通过 ForkJoinWorkerThread.run 调用
* Top-level runloop for workers, called by ForkJoinWorkerThread.run.
*/
final void runWorker(WorkQueue w) {
// 分配队列
w.growArray(); // allocate queue
// scan(w, r) 返回0则继续扫描,否则结束循环
for (int r = w.hint; scan(w, r) == 0; ) {
// 伪随机数
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
/**
* 扫描,如果发现,运行一个任务,否则可能使worker失活。
* Scans for and, if found, runs one task, else possibly
* inactivates the worker.
* 此方法对volatile状态的单次读取进行操作,并设计为连续地重新调用,
* 部分原因是它在检测到不一致、争用或状态更改时返回,这表明重新调用可能成功。
* This method operates on single reads of
* volatile state and is designed to be re-invoked continuously,
* in part because it returns upon detecting inconsistencies,
* contention, or state changes that indicate possible success on
* re-invocation.
*
* 该扫描从一个随机索引开始,在多个队列中搜索任务,每个至少检查两次。
* The scan searches for tasks across queues starting at a random
* index, checking each at least twice. The scan terminates upon
* 扫描在找到非空队列或完成扫描时终止。
* either finding a non-empty queue, or completing the sweep. If
* 如果worker未被灭活,它将从该队列获取并运行一个任务。
* the worker is not inactivated, it takes and runs a task from
* 否则,如果不激活,它试图通过信号来激活自己或其他worker。
* this queue. Otherwise, if not activated, it tries to activate
* itself or some other worker by signalling. On failure to find a
* 如果未能找到任务,则返回(用于重试)在空扫描期间可能更改了池状态,或者如果激活则尝试停用,
* 否则可能阻塞或通过方法awaitWork终止。
* task, returns (for retry) if pool state may have changed during
* an empty scan, or tries to inactivate if active, else possibly
* blocks or terminates via method awaitWork.
*
* @param w the worker (via its WorkQueue)
* @param r a random seed
* @return worker qlock status if would have waited, else 0
*/
private final int scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
// 用于一致性检查
long c = ctl; // for consistency check
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
// m = ws.length - 1, j = m + m + 1 的原因是 保证workQueues,可以遍历两次,因为是先
// 遍历元素,然后j 再减 1,因此起始位置可以遍历3次 (j = m 就可以遍历一次ws的所有元素)。
// eventCount -> registerWorker() 方法初始化时等于 poolIndex 的值
for (int j = m + m + 1, ec = w.eventCount;;) {
WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
// ws[(r - j) & m] 当r为threadLocalRandomProbe时(第一次调scan()时,之后传入的是一个随机数),
// 从 w所在索引位置的下一个索引开始扫描 (因为一开始w的队列中并没有任务可以执行)
// 先读q.base,再读q.top,因为 base是volatile修饰的,而 top不是。
// (b = q.base) - q.top < 0 说明队列中有任务
if ((q = ws[(r - j) & m]) != null &&
(b = q.base) - q.top < 0 && (a = q.array) != null) {
// 计算索引b的地址偏移量, b = q.base, ((a.length - 1) & b 的作用是:
// 每取出一个元素base都会增加,每插入一个元素top也会增加,top - base 结果
// 为队列中的任务个数,只要 top - base 不大于队列array的长度,那么array中
// 就还能存储任务,base 一直增加,任务多的时候就会超过a.length,但是任务也一直
// 在被取出,因此队列可以循环使用的,(a.length - 1) & b 计算任务保存的位置
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 获取索引b的元素,并赋值给t
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null) {
// 如果scan()方法没有扫描到任务,且 ctl == c,那么会设置 ec = -ec(负数),
// 然后返回0,表示继续执行scan()方法,而继续执行scan()方法时扫描到了任务,因此 ec < 0
if (ec < 0)
// 如果ctl上有记录等待的线程,则唤醒一个线程 / 重新激活自己
// 线程在进入真正等待之前就把当前线程记录到ctl的等待线程顶栈上了,所有如果线程在下次扫描时扫描到了任务,
// 如果顶栈记录的还是当前线程,那么可以进行自我激活,也可以被其他线程激活
helpRelease(c, ws, w, q, b);
// q.base == b 判断base索引位置的任务还没有被其他线程获取
// 如果q.base没有变化,使用CAS设置索引b 的元素为null
else if (q.base == b &&
U.compareAndSwapObject(a, i, t, null)) {
// 设置q.base = b + 1 (即 base + 1)
U.putOrderedInt(q, QBASE, b + 1);
// (b + 1) - q.top < 0 说明队列中还有任务,唤醒其他worker,进行任务处理
if ((b + 1) - q.top < 0)
// 如果激活的worker太少,则尝试创建或激活worker
signalWork(ws, q);
// 执行顶级任务和执行后剩余的任何本地任务
w.runTask(t);
}
}
// 结束循环,直接返回0,返回0意味着从新调用scan()方法
break;
}
// ------------- workQueue 中没有任务 -----------
// j = j - 1,如果 j < 0说明workQueues已经遍历过两次,但是没有找到可以执行的任务
else if (--j < 0) {
// ec = w.eventCount, c = ctl, e = (int)c) 只取低32位
// w.eventCount < 0 或者 池已经终止,则调用awaitWork() 方法
if ((ec | (e = (int)c)) < 0) // inactive or terminating
// scan()方法只有两个地方有return,这里一个,另一个直接返回0,意味着从新调用scan()方法
return awaitWork(w, c, ec);
else if (ctl == c) { // try to inactivate and enqueue 尝试失活和排队
// AC_UNIT = 1L << 48, c - AC_UNIT -> 活动运行workers的数量减1
// AC_MASK: 1111 1111 1111 1111 000000000000000000000000000000000000000000000000
// TC_MASK: 0000 0000 0000 0000 111111111111111100000000000000000000000000000000
// AC_MASK|TC_MASK = 1111 1111 1111 1111 1111 1111 1111 1111 00000000000000000000000000000000
// 执行结果为:ctl的高16位 -1,低32的值取 ec的值, 而 ec的初始值为: poolIndex 的值,所以
// ctl 的低16位为:waiters的Treiber顶栈的 poolIndex(16位)
long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
// e = (int)c, 记录原来等待的线程
w.nextWait = e;
// INT_SIGN = 1 << 31 , ec | INT_SIGN 即把 ec 的第32位设置为1, ec为负数,
// 表示 线程不是活动状态
w.eventCount = ec | INT_SIGN;
// 使用CAS修改ctl的值。 线程在进入真正等待之前就把当前线程记录到ctl的等待线程顶栈上了,
// 所有如果线程在下次扫描时扫描到了任务,如果顶栈记录的还是当前线程,那么可以进行自我激活
if (!U.compareAndSwapLong(this, CTL, c, nc))
// CAS失败,回退 w.eventCount 的值
w.eventCount = ec; // back out
}
break;
}
}
}
return 0;
}
/**
* 继续scan(),可能阻塞或终止worker w。
* A continuation of scan(), possibly blocking or terminating
* 如果自上次调用以来池状态明显改变,则返回而不阻塞。
* worker w. Returns without blocking if pool state has apparently
* changed since last invocation.
* 另外,如果停用w导致池变为静态,则检查池是否终止,并且,只要这不是唯一的worker,
* 就在给定的持续时间等待事件。
* Also, if inactivating w has
* caused the pool to become quiescent, checks for pool
* termination, and, so long as this is not the only worker, waits
* for event for up to a given duration. On timeout, if ctl has
* 在超时时,如果ctl没有更改,则终止该worker,这将依次唤醒另一个worker,从而可能重复此过程。
* not changed, terminates the worker, which will in turn wake up
* another worker to possibly repeat this process.
*
* @param w the calling worker
* @param c the ctl value on entry to scan
* @param ec the worker's eventCount on entry to scan
*/
// w的 eventCount < 0, 或者池已经终止会调用这个方法
// 线程进入等待的流程: 1、没有扫描到任务,修改 eventCount为负数,并记录ctl的等待线程的顶栈为该线程;
// 2、如果 w.nsteals 不等于0 ,那么该workQueue窃取的任务数量汇总到ForkJoinPool的实例字段stealCount 中
// 3、w.nsteals = 0,那么判断进行超时等待还是无限等待,如果是无限等待的,线程被唤醒会继续去扫描任务,
// 如果是超时等待的,超过时间后线程会终止退出。
private final int awaitWork(WorkQueue w, long c, int ec) {
int stat, ns; long parkTime, deadline;
// w.qlock -> 1: locked, -1: terminate; else 0
// (stat = w.qlock) >= 0 判断 w是否已经终止了, 如果 stat = -1,则返回 -1,表示终止,工作线程需要退出
// w.eventCount == ec 如果 w已被被其他线程激活了,则结果为false
// 因为中断只用于警示线程检查终止,这是在阻塞时无论如何都进行检查,我们在任何
// 调用park之前清除状态(使用Thread.interrupted),因此park并不会立即返回
// 由于状态被设置通过一些其他不相关的用户代码中调用中断。
if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c &&
!Thread.interrupted()) {
// 获取 ctl 低32位的值
int e = (int)c;
// 获取ctl 高32位的值
int u = (int)(c >>> 32);
// UAC_SHIFT = 16, 计算活动的worker 数量
int d = (u >> UAC_SHIFT) + parallelism; // active count
// e < 0 说明池已经终止
// 如果停用w导致池变为静态,则检查池是否终止,tryTerminate() 如果正在终止/已经终止,返回true
if (e < 0 || (d <= 0 && tryTerminate(false, false)))
// 池已终止,设置 w的 qlock = -1
stat = w.qlock = -1; // pool is terminating
// 获取该WorkQueue 的窃取任务数量
else if ((ns = w.nsteals) != 0) { // collect steals and retry
w.nsteals = 0;
// 该workQueue窃取的任务数量汇总到ForkJoinPool的实例字段stealCount 中
U.getAndAddLong(this, STEALCOUNT, (long)ns);
}
else {
// d 是活动worker的数量 (在调用此方法之前,d 已经减1了), d > 0说明不是最后一个等待的线程,
// 可以进行无限等待; 如果池中没有活跃的线程,或者 w是等待线程的顶栈,那么会进行超时等待,
// 超时等待在超时时 会使用CAS更新ctl的值,如果CAS成功,w会终止退出。
// e : ctl 低32位的值, INT_SIGN = 1 << 31
// ec != (e | INT_SIGN) : 说明w不是等待线程的顶栈 (其他线程也要进入等待状态,顶栈就会变成另一个线程)
// pc 是线程唤醒后修改 ctl的值。 但是pc = 0L的情况下不会更新 ctl的值
long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L :
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
((long)(w.nextWait & E_MASK)) | // ctl to restore
// u: ctl 高32 位的值; UAC_UNIT = 1 << 16。因为线程从等待状态到
// 唤醒后,活动worker数量 + 1, pc是线程唤醒后 ctl的值
// << 优先级比 | 高
((long)(u + UAC_UNIT)) << 32);
// 池中没有活跃的线程,或者 w是等待线程的顶栈,那么会进行超时等待
if (pc != 0L) { // timed wait if last waiter
// TC_SHIFT = 32; dc = parallelism - 线程总数
int dc = -(short)(c >>> TC_SHIFT);
// dc < 0 说明线程总数超出 parallelism
// FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L, 当线程数超过并行度级别时的超时值, 0.2秒。如果线程数量
// 太多了,只会等待很短的时间。
// IDLE_TIMEOUT = 2000L * 1000L * 1000L; 2秒
parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT:
(dc + 1) * IDLE_TIMEOUT); // dc = parallelism - 线程总数,线程数量越少等待时间越长
// TIMEOUT_SLOP = 2 000 000L = 2毫秒
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
else
// U.park(false, parkTime) parkTime = 0L,线程会无限等待
parkTime = deadline = 0L;
// 再次判断状态是否发生改变
if (w.eventCount == ec && ctl == c) {
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
// 把等待的线程保存到 parker
w.parker = wt; // emulate LockSupport.park
// park之前再次检查状态是否发生改变
if (w.eventCount == ec && ctl == c)
U.park(false, parkTime); // must recheck before park
// 注意:如果线程被虚假唤醒也没事,就相当于 (ns = w.nsteals) != 0 条件不满足时返回了
// 设置并清除工作队列的“parker”字段,以减少不必要的取消unpark的调用。
w.parker = null;
// 清除线程的 parkBlocker 字段
U.putObject(wt, PARKBLOCKER, null);
// parkTime != 0L 说明是超时等待。如果池中没有活跃的线程,或者 w是等待线程的顶栈,
// 那么会进行超时等待
// deadline - System.nanoTime() <= 0L 说明已超时
// CAS操作: 在超时时,如果ctl没有更改,并更新ctl成功,则终止该worker,
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, pc))
// 池缩小,在超时时,如果ctl没有更改,则终止该worker,
// 这将依次唤醒另一个worker,从而可能重复此过程。
// w.qlock = -1,说明此workQueue已终止
stat = w.qlock = -1; // shrink pool
}
}
}
return stat;
}
/**
* 可能释放(通知)一个worker。只有当处于非活动状态的worker发现一个非空队列时才从scan()调用。
* Possibly releases (signals) a worker. Called only from scan()
* when a worker with apparently inactive status finds a non-empty
* 这需要重新验证调用者的所有关联状态。
* queue. This requires revalidating all of the associated state
* from caller.
*/
// helpRelease(c, ws, w, q, b) -> c: ctl; ws: workQueues; w: 调用该方法的workQueue;
// q: 扫描到具有非空队列的workQueue; b : q.base,任务的索引位置
private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w,
WorkQueue q, int b) {
WorkQueue v; int e, i; Thread p;
// (e = (int)c) > 0 说明池还没终止,并且有线程在等待 (0没有线程在等待,负数池终止)
// SMASK = 1111 1111 1111 1111, i = e & SMASK -> waiters的Treiber顶栈的 poolIndex
// v = ws[i] waiters的顶栈workQueue
if (w != null && w.eventCount < 0 && (e = (int)c) > 0 &&
ws != null && ws.length > (i = e & SMASK) &&
(v = ws[i]) != null && ctl == c) {
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// v.nextWait & E_MASK把nextWait的最高位置为0
// UAC_UNIT = 1 << 16, (c >>> 32) + UAC_UNIT 活动worker 数量加1
// ctl 是通过CAS进行设置的,所以如果ctl的池状态位变成1了也没事,CAS会失败
long nc = (((long)(v.nextWait & E_MASK)) |
((long)((int)(c >>> 32) + UAC_UNIT)) << 32); // nc -> nextCtl
// E_SEQ = 1 << 16, E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// (e + E_SEQ) & E_MASK, 高16位加 1,并且最高位置为 0
int ne = (e + E_SEQ) & E_MASK; // ne -> nextEventCount
// q != null && q.base == b 判断任务是否已经被取走了
// INT_SIGN = 1 << 31
// v.eventCount == (e | INT_SIGN) 判断v是否已经被唤醒了,如果不相等,说明已经被其他唤醒了
if (q != null && q.base == b && w.eventCount < 0 &&
v.eventCount == (e | INT_SIGN) &&
U.compareAndSwapLong(this, CTL, c, nc)) {
v.eventCount = ne;
// p 可能为null,因为线程ctl记录的等待线程可能还没真正进入等待,只是先记录到ctl上,
if ((p = v.parker) != null)
U.unpark(p);
}
}
}
/**
* 试图为给定任务的窃取者定位和执行任务,或者反过来跟踪它的偷窃者,跟踪currentSteal—> currentJoin链接,
* 寻找在给定任务的后代上带有非空队列的工作线程,并把任务窃取回来并执行任务。
* Tries to locate and execute tasks for a stealer of the given
* task, or in turn one of its stealers, Traces currentSteal ->
* currentJoin links looking for a thread working on a descendant
* of the given task and with a non-empty queue to steal back and
* execute tasks from.
* 在等待join时对这个方法的第一次调用通常需要进行扫描/搜索(这是可以的,
* 因为joiner没有更好的事情要做),但是这个方法会在workers中留下提示来加快后续调用。
* The first call to this method upon a
* waiting join will often entail scanning/search, (which is OK
* because the joiner has nothing better to do), but this method
* leaves hints in workers to speed up subsequent calls. The
* 该实现非常灵活,可以处理潜在的不一致性或循环遇到过时、未知或太长以至于可能是循环的链。
* implementation is very branchy to cope with potential
* inconsistencies or loops encountering chains that are stale,
* unknown, or so long that they are likely cyclic.
*
* @param joiner the joining worker
* @param task the task to join
* 如果没有进展返回0,如果任务已经完成返回返回负数,否则返回正数(任务有进展)
* @return 0 if no progress can be made, negative if task
* known complete, else positive
*/
// 寻找并窃取task的一个子任务来执行,若子任务还进行分割,则执行完队列中的所有子任务
private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
int stat = 0, steps = 0; // bound to avoid cycles
// joiner.base - joiner.top >= 0 说明joiner队列中没有任务,如果joiner的工作队列中有任务
// 则直接返回 0,不能帮助窃取任务
// 队列中有任务不能去窃取任务的原因是:窃取完任务执行后,如果任务又分割了,则队列中的任务不一定都是当前窃取
// 任务的子任务,如果其他工作线程执行tryHelpStealer()根据currentSteal 来这里窃取了任务,则会导致窃取到
// 任务就可能不是其他工作线程想窃取的任务
if (task != null && joiner != null &&
joiner.base - joiner.top >= 0) { // hoist checks
restart: for (;;) {
// 当前等待完成的目标任务 (这个任务可能被线程A窃取了,然后此任务被分割成了任务A和任务B,线程A执行了
// 任务A,然后等待任务B完成,此时joiner等待完成的任务转向任务B,因为目标任务的完成,需要任务B先完成,
// 此时subtask = 任务B)
ForkJoinTask<?> subtask = task; // current target 当前目标
// r
// / \
// b c
// / \ / \
// d e f g
// j 为等待任务完成的等待者。若joiner等待任务c完成,而线程A窃取了任务c,执行了任务f,然后
// 等待任务g完成,此时j = 线程A,subtask = g
for (WorkQueue j = joiner, v;;) { // v is stealer of subtask v是subtask的偷窃者
WorkQueue[] ws; int m, s, h;
// (s = task.status) < 0 判断任务是否已经完成,若已经完成返回任务状态值
if ((s = task.status) < 0) { // 必须是task完成,而不是subtask
stat = s;
break restart;
}
// 如果workQueues 为空,结束循环,此时stat = 0,即返回0
// m = ws.length - 1
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
break restart; // shutting down 池已关闭
// 工作线程的workQueue是在奇数索引位置上, j.hint | 1 保证结果是奇数
if ((v = ws[h = (j.hint | 1) & m]) == null ||
v.currentSteal != subtask) {
for (int origin = h;;) { // find stealer 查找窃取者
// h = (h + 2) & m 计算下一个工作线程的索引位置
// 15 -> 1111 (索引位置为1、17、33...), 并且任务已经完成 (外部循环会返回状态值)
// || join等待完成的任务不是subtask (注意:j不一定是传入的joiner,下面会修改j,所以
// 有可能subtask已经执行完成了,即过时了)
if (((h = (h + 2) & m) & 15) == 1 &&
(subtask.status < 0 || j.currentJoin != subtask))
// 偶尔过时检查 (重新任务是否已经完成或者池是否已经关闭,
// 然后继续从j开始开始遍历)
continue restart; // occasional staleness check
// 找到任务的窃取者
if ((v = ws[h]) != null &&
v.currentSteal == subtask) {
// 在等待join时对这个方法的第一次调用通常需要进行扫描/搜索(这是可以的,
// 因为joiner没有更好的事情要做),但是这个方法会在workers中留下提示来
// 加快后续调用 (窃取回任务然后执行完成后若任务没有完成需要再去窃取任务,
// 留下提示可以快速找到任务窃取者的位置)
// 记录偷窃者的索引位置
j.hint = h; // save hint 保存提示
break;
}
if (h == origin)
// 循环一遍了,没有找到任务窃取者
break restart; // cannot find stealer 找不到任务偷窃者
}
}
// ------- 工作线程v 当前窃取的任务是 subtask ------------
for (;;) { // help stealer or descend to its stealer
ForkJoinTask<?>[] a; int b;
// subtask已经完成
if (subtask.status < 0) // surround probes with
// 外部会判断 task是否完成,如果task没有完成需要继续执行
continue restart; // consistency checks
// 判断工作线程v 的工作队列不为空
if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
// 计算索引b的地址偏移量
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 先拿出任务,然后判断v 当前窃取的任务是否还是subtask,然后通过CAS取出任务,
// 保证窃取出的任务是subtask的子任务 (因为此时有可能v当前窃取的任务不是subtask,而
// 任务t也不是subtask的子任务,设计地太完美了)
ForkJoinTask<?> t =
(ForkJoinTask<?>)U.getObjectVolatile(a, i);
// 1、subtask.status < 0 说明subtask完成了;
// 2、j.currentJoin != subtask 说明j (不一定是传入的joiner,下面会修改) 等待完成的
// 任务subtask完成了;
// 3、v.currentSteal != subtask 说明v 偷窃的任务subtask 完成了
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask)
continue restart; // stale 过时了
stat = 1; // apparent progress 明显的进展
// v.base == b 判断任务是否被其他线程窃取了
if (v.base == b) {
// 任务没有被其他线程窃取,但是可能被线程v自己执行了,所以可能 v.base == b,而t 为null
if (t == null)
break restart;
// 使用CAS取出任务
if (U.compareAndSwapObject(a, i, t, null)) {
// 修改base
U.putOrderedInt(v, QBASE, b + 1);
// ps -> previousSteal 上一个窃取的任务
ForkJoinTask<?> ps = joiner.currentSteal;
int jt = joiner.top; // 记录joiner 当前的top值
do {
// 修改joiner 当前窃取的任务为t
joiner.currentSteal = t;
// 执行任务t
t.doExec(); // clear local tasks too 并且清空本队列的任务
} while (task.status >= 0 && // task任务没有完成
joiner.top != jt && // 如果 top值发生改变了,说明任务t 又进行分割了
(t = joiner.pop()) != null); // 所以继续取出子任务执行
// joiner当前窃取的任务修改回上一个窃取的任务 ps
joiner.currentSteal = ps;
// 跳出外部循环,因为此时stat = 1,所以结束循环后会返回1,外部会判断task任务是否完成,
// 没有完成继续窃取任务,因此已经留下提示了,所以可以很快找到任务窃取者
// (这里没有继续判断v的队列中是否还有task的子任务,)
break restart;
}
}
}
// 工作线程v 的工作队列为空,说明subtask的子任务都执行完了,或者子任务被
// 其他工作者窃取了,所以转向 currentJoin,此为subtask 等待完成的子任务
else { // empty -- try to descend
// next 为v 当前等待完成的任务
// v.currentSteal != subtask 说明任务已经完成了
ForkJoinTask<?> next = v.currentJoin;
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask)
// 过时了todo
continue restart; // stale
// 没有出路或者可能是循环的 (MAX_HELP = 64),任务分割次数超过64次
else if (next == null || ++steps == MAX_HELP)
break restart; // dead-end or maybe cyclic
else {
// 等待完成的任务转向 next
subtask = next;
// 等待者为v
j = v;
// 跳出此循环,查找subtask 的偷窃者
break;
}
}
}
}
}
}
return stat;
}
/**
* 类似tryHelpStealer,用于CountedCompleters。
* Analog of tryHelpStealer for CountedCompleters. Tries to steal
* 试图在目标的计算范围内窃取和运行任务。
* and run tasks within the target's computation.
*
* @param task the task to join
* @param maxTasks the maximum number of other tasks to run
*/
// 1、检查joiner工作队列的top位置,及其他工作线程(包括joiner)的base位置的位置是否是
// 目标任务task或者其子任务,若是取出并执行。
// 2、判断队列中的base索引位置的任务就够了,因为工作队列中的所有任务都是源于同一个父节点任务
final int helpComplete(WorkQueue joiner, CountedCompleter<?> task,
int maxTasks) {
WorkQueue[] ws; int m;
int s = 0;
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
joiner != null && task != null) {
// 获取 joiner在 workQueues中的索引
int j = joiner.poolIndex;
// 扫描次数
int scans = m + m + 1;
long c = 0L; // for stability check 稳定的检查
// j += 2 只判断工作线程的队列
for (int k = scans; ; j += 2) {
WorkQueue q;
// task.status < 0 说明任务已经完成了
if ((s = task.status) < 0)
break;
// 判断工作队列的最后一个任务(top位置)是否是目标任务task,或者其子任务,如果是的话,取出这个任务并执行,
// 然后返回true,否则返回false。 (每次循环都要检查joiner的工作队列的top是否为目标任务或者其子任务,因为
// 窃取窃取到任务并执行后,队列中可能会有窃取到的任务的子任务,需要先执行完自己队列中的任务)
else if (joiner.internalPopAndExecCC(task)) {
// maxTasks = Integer.MAX_VALUE
if (--maxTasks <= 0) {
s = task.status;
break;
}
// 重置k
k = scans;
}
// 判断任务是否已经执行完成,若已执行完成,返回任务状态
else if ((s = task.status) < 0)
break;
// 判断队列中base索引位置的任务是否是root任务或者其子任务,如果是,取出并执行。
// 如果base索引位置的任务已经被其他线程获取,或者base索引位置的任务是root任务
// 或者其子任务,返回true; 否则返回false
// q 可能为 joiner (第一次循环时就是joiner)
else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
if (--maxTasks <= 0) {
s = task.status;
break;
}
// 重置k
k = scans;
}
// --k < 0说明工作队列已经扫描过4遍了 (因为只扫描奇数索引的工作者队列)
else if (--k < 0) {
// 先判断 c是否等于 ctl,再把 ctl赋值给c。 初始值 c = 0L,所以第一次 --k < 0,
// c == ctl 为false
if (c == (c = ctl))
break;
// 重置k
k = scans;
}
}
}
return s;
}
/**
* 尝试减少活动计数(有时是隐式的),并可能释放或创建一个补偿worker,为阻塞做准备。
* Tries to decrement active count (sometimes implicitly) and
* possibly release or create a compensating worker in preparation
* 在竞争或者终止时失败。否则,添加一个新线程,如果没有空闲的workers可用和池可能成为饥饿。
* for blocking. Fails on contention or termination. Otherwise,
* adds a new thread if no idle workers are available and pool
* may become starved.
*
* @param c the assumed ctl value
*/
// 1、如果有空闲的线程,则唤醒一个空闲的线程,并返回true(池中增加了一个活动的线程,而这个线程要进入等待状态,
// 活动线程减1,刚好进行补偿,所以不用修改 ctl的值);
// 2、总线程数 >= parallelism && 活动线程数 > 1,则修改ctl活动线程数减1,并返回true
// 3、没有空闲的线程,且 总线程数少于parallelism || 活动线程数只有一个,并且总线程数少于MAX_CAP,则创建一个新的线程
// 创建并启动线程成功返回true,失败返回false (活动线程数不变,所以不用修改ctl的值)
final boolean tryCompensate(long c) {
WorkQueue[] ws = workQueues;
int pc = parallelism, e = (int)c, m, tc;
// 第一次调用 c传入的是0, 之后传入的ctl的值
if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) {
// ctl低16位为:waiters的Treiber顶栈的 poolIndex,所以 w为顶栈的等待线程的索引
WorkQueue w = ws[e & m];
// w != null,说明有等待线程
if (e != 0 && w != null) {
Thread p;
// E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
// AC_MASK 高16位为:1111 1111 1111 1111,低48位为0
// TC_MASK 高32位为:0000 0000 0000 0000 1111 1111 1111 1111,低32位为0
// 计算ctl新的值,高32为不变,低32位变为 w.nextWait (nextWait保存的是ctl低32位的值)
long nc = ((long)(w.nextWait & E_MASK) |
(c & (AC_MASK|TC_MASK)));
// E_SEQ = 1 << 16 ; E_MASK = 0111 1111 1111 1111 1111 1111 1111 1111
int ne = (e + E_SEQ) & E_MASK;
// INT_SIGN = 1 << 31
// w.eventCount == (e | INT_SIGN) 时线程才可能处于等待状态, 因为线程在进入等待之前
// 都需要把eventCount设置为负数
if (w.eventCount == (e | INT_SIGN) &&
U.compareAndSwapLong(this, CTL, c, nc)) { // CAS修改ctl的值
// 修改被唤醒线程的 eventCount,即等待次数+1,并且设置回正数。线程的等待次数在被唤醒前
// 才进行 + 1操作,而不是在进入等待之前进行 + 1 (修改等待次数,避免ABA问题)
w.eventCount = ne;
if ((p = w.parker) != null)
// 唤醒线程
U.unpark(p);
// 使用空闲线程替换, 不补偿
return true; // replace with idle worker
}
}
// ----- 没有空闲线程 -----
// TC_SHIFT = 32, (short)(c >>> TC_SHIFT) 的值为总线程数 - parallelism
// AC_SHIFT = 48,(int)(c >> AC_SHIFT) + pc > 1 判断活动线程数是否大于1
// 总线程数 >= parallelism && 活动线程数 > 1
else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
(int)(c >> AC_SHIFT) + pc > 1) {
// AC_UNIT = 1L << 48, AC_MASK 高16位为: 1111 1111 1111 1111,低48位为0
// 计算结果为:高16位减1,即活动线程数减1,低48位不变
long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
// 使用CAS 修改 ctl的值
if (U.compareAndSwapLong(this, CTL, c, nc))
return true; // no compensation 不补偿
}
// tc = (short)(c >>> TC_SHIFT), TC_SHIFT = 32, pc = parallelism
// MAX_CAP = 0111 1111 1111 1111
else if (tc + pc < MAX_CAP) { // 判断总线程数少于 MAX_CAP
// TC_UNIT = 1L << 32, TC_MASK 高32位为:0000 0000 0000 0000 1111 1111 1111 1111,低32位为0
// 计算结果为:总线程数加1,其他不变 (注意:活动线程数没有加1,因为工作线程要进入等待状态,活动线程
// 少了一个,所以刚好进行补偿)
long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
// 使用CAS修改ctl的值
if (U.compareAndSwapLong(this, CTL, c, nc)) {
ForkJoinWorkerThreadFactory fac;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
// 创建线程,this -> ForkJoinPool
if ((fac = factory) != null &&
(wt = fac.newThread(this)) != null) {
// 启动线程
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
// 创建或者启动线程出现异常,注销wt
deregisterWorker(wt, ex); // clean up and return false
}
}
}
return false;
}
/**
* 帮助 和/或 阻塞直到给定的任务完成
* Helps and/or blocks until the given task is done.
*
* @param joiner the joining worker
* @param task the task
* @return task status on exit
*/
/**
* 1、工作线程等待完成的任务一般都是其窃取的任务的子任务(除非个别情况,等待完成的是用户提交的另一个任务),
* 而且工作线程的任务队列一定都是当前窃取任务的子任务,不会存在其他任务,但是这些任务有可能被其他线程窃取了。
* 所以等待任务完成优先判断任务是否在当前队列里 (CounterCompleter判断方式有点不一样,因为CounterCompleter
* 只有根任务会等待完成,所以需要判断是否其叶子任务,是的话取出执行)
*
* 2、一般的ForkJoinTask任务判断工作线程窃取的任务是否是当前等待完成的任务,如果是帮助完成其队列中的任务,
* 因为肯定都是子任务。但是这些任务有可能被其他线程窃取了,因此这个工作队列可能在等待子任务的完成,获取这个
* 子任务,作为下一个等待完成的任务。。。
*
* 3、只会检查工作线程的队列是否有目标任务/目标任务的子任务,不会检查公共队列。
*/
final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
int s = 0;
// (s = task.status) >= 0 说明任务还未完成
if (task != null && (s = task.status) >= 0 && joiner != null) {
// 保存上一个等待完成的任务 (执行awaitJoin()时可能会去执行另一个任务,而任务里面也有task.join(),
// 因此又执行到了awaitJoin()方法... 产生了链式调用,所以需要保存上一个等待完成的任务)
ForkJoinTask<?> prevJoin = joiner.currentJoin;
// 修改当前等待完成的任务
joiner.currentJoin = task;
// doJoin()调用tryUnpush(this) 仅当this是workQueue的最后一个任务才会取出任务,
// tryRemoveAndExec() 则是遍历整个workQueue队列去查找任务,找到则执行这个任务
do {} while (joiner.tryRemoveAndExec(task) && // process local tasks 处理本地任务
(s = task.status) >= 0);
if (s >= 0 && (task instanceof CountedCompleter))
// 检查joiner工作队列的top位置,及其他工作线程(包括joiner)的base位置的位置是否是
// 目标任务task或者其子任务,若是取出并执行。
s = helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
long cc = 0; // for stability checks 稳定性检查
while (s >= 0 && (s = task.status) >= 0) {
// tryHelpStealer()如果没有进展返回0,如果任务已经完成返回返回负数,否则返回正数(任务有进展)
// 没有进展,且任务还没有完成
if ((s = tryHelpStealer(joiner, task)) == 0 &&
(s = task.status) >= 0) {
// tryCompensate()如果存在空闲线程则唤醒一个,如果没有空闲线程,且只剩下一个
// 活动的线程,则添加一个线程 (方法里面已经维护好了活动线程数,即 ctl的值)
if (!tryCompensate(cc))
cc = ctl;
else {
// 尝试设置SIGNAL状态,表明有线程在等待此任务完成,需要唤醒。
if (task.trySetSignal() && (s = task.status) >= 0) {
synchronized (task) {
if (task.status >= 0) {
try { // see ForkJoinTask
// 线程进入等待状态
task.wait(); // for explanation
} catch (InterruptedException ie) {
// 忽略中断
}
}
else
// 如果任务已经完成,唤醒所有的等待线程
task.notifyAll();
}
}
long c; // reactivate
// AC_MASK 高16位为: 1111 1111 1111 1111,低48位为0, AC_UNIT = 1L << 48
// 更新ctl,高16位加1,低48位保持不变,表示活动线程数 +1
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl,
((c & ~AC_MASK) |
((c & AC_MASK) + AC_UNIT))));
}
}
}
// 等待完成的任务重置为上一个等待完成的任务
joiner.currentJoin = prevJoin;
}
return s;
}
/**
* Stripped-down variant of awaitJoin used by timed joins. Tries
* 只有在有持续的进展时,才试图帮助加join。(调用者将进入定时等待。)
* to help join only while there is continuous progress. (Caller
* will then enter a timed wait.)
*
* @param joiner the joining worker
* @param task the task
*/
// 帮助任务完成,和awaitJoin() 不同的是线程不会进入等待状态,帮助任务完成的过程是一样的
final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) {
int s;
if (joiner != null && task != null && (s = task.status) >= 0) {
// 保存joiner当前等待完成的任务
ForkJoinTask<?> prevJoin = joiner.currentJoin;
// 修改joiner当前等待完成的任务为 task
joiner.currentJoin = task;
// tryRemoveAndExec()遍历整个workQueue队列去查找任务,找到则执行这个任务
do {} while (joiner.tryRemoveAndExec(task) && // process local tasks 处理本地任务
(s = task.status) >= 0);
if (s >= 0) {
if (task instanceof CountedCompleter)
// 检查joiner工作队列的top位置,及其他工作线程(包括joiner)的base位置的位置是否是
// 目标任务task或者其子任务,若是取出并执行。
helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
// tryHelpStealer() 寻找并窃取task的一个子任务来执行,若子任务还进行分割,则执行完队列中的所有子任务
do {} while (task.status >= 0 &&
tryHelpStealer(joiner, task) > 0);
}
// 重置 currentJoin
joiner.currentJoin = prevJoin;
}
}
/**
* 返回一个(可能)非空的窃取队列(如果在扫描期间找到),否则返回null
* Returns a (probably) non-empty steal queue, if one is found
* during a scan, else null. This method must be retried by
* 如果调用者在尝试使用队列时该队列为空,则必须重试此方法。
* caller if, by the time it tries to use the queue, it is empty.
*/
private WorkQueue findNonEmptyStealQueue() {
int r = ThreadLocalRandom.nextSecondarySeed();
for (;;) {
int ps = plock, m; WorkQueue[] ws; WorkQueue q;
// 判断 workQueues 不为空, m = ws.length - 1
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
// (m + 1) << 2,因此只扫描奇数索引位置,所以会扫描8次所有工作线程的队列
for (int j = (m + 1) << 2; j >= 0; --j) {
// 1、 << 1 可以将结果放大一倍,即 --j会导致 ((r - j) << 1) 结果少2,
// 因为需要跳过偶数索引位置,每次需要减2
// 2、 | 1保证结果为奇数,因为工作者的队列在奇数索引位置上
if ((q = ws[(((r - j) << 1) | 1) & m]) != null &&
q.base - q.top < 0)
// q.base - q.top < 0 队列不为空,返回此队列
return q;
}
}
// plock = ps,说明没有注册新的工作线程,也就是workQueues 中没有增加新工作线程
// 的工作队列,因为注册工作线程需要获取plock 锁,plock值会变化
if (plock == ps)
return null;
// plock变化了则有可能增加了新工作队列,需要重新扫描
}
}