View Javadoc
1   /*
2    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3    *
4    * This code is free software; you can redistribute it and/or modify it
5    * under the terms of the GNU General Public License version 2 only, as
6    * published by the Free Software Foundation.  Oracle designates this
7    * particular file as subject to the "Classpath" exception as provided
8    * by Oracle in the LICENSE file that accompanied this code.
9    *
10   * This code is distributed in the hope that it will be useful, but WITHOUT
11   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12   * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13   * version 2 for more details (a copy is included in the LICENSE file that
14   * accompanied this code).
15   *
16   * You should have received a copy of the GNU General Public License version
17   * 2 along with this work; if not, write to the Free Software Foundation,
18   * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19   *
20   * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21   * or visit www.oracle.com if you need additional information or have any
22   * questions.
23   */
24  
25  /*
26   * This file is available under and governed by the GNU General Public
27   * License version 2 only, as published by the Free Software Foundation.
28   * However, the following notice accompanied the original version of this
29   * file:
30   *
31   * Written by Doug Lea with assistance from members of JCP JSR-166
32   * Expert Group and released to the public domain, as explained at
33   * http://creativecommons.org/publicdomain/zero/1.0/
34   */
35  
36  package java.util.concurrent;
37  
38  import java.lang.Thread.UncaughtExceptionHandler;
39  import java.util.ArrayList;
40  import java.util.Arrays;
41  import java.util.Collection;
42  import java.util.Collections;
43  import java.util.List;
44  import java.util.concurrent.AbstractExecutorService;
45  import java.util.concurrent.Callable;
46  import java.util.concurrent.ExecutorService;
47  import java.util.concurrent.Future;
48  import java.util.concurrent.RejectedExecutionException;
49  import java.util.concurrent.RunnableFuture;
50  import java.util.concurrent.ThreadLocalRandom;
51  import java.util.concurrent.TimeUnit;
52  import java.security.AccessControlContext;
53  import java.security.ProtectionDomain;
54  import java.security.Permissions;
55  
56  /**
57   * An {@link ExecutorService} for running {@link ForkJoinTask}s.
58   * A {@code ForkJoinPool} provides the entry point for submissions
59   * from non-{@code ForkJoinTask} clients, as well as management and
60   * monitoring operations.
61   *
62   * <p>A {@code ForkJoinPool} differs from other kinds of {@link
63   * ExecutorService} mainly by virtue of employing
64   * <em>work-stealing</em>: all threads in the pool attempt to find and
65   * execute tasks submitted to the pool and/or created by other active
66   * tasks (eventually blocking waiting for work if none exist). This
67   * enables efficient processing when most tasks spawn other subtasks
68   * (as do most {@code ForkJoinTask}s), as well as when many small
69   * tasks are submitted to the pool from external clients.  Especially
70   * when setting <em>asyncMode</em> to true in constructors, {@code
71   * ForkJoinPool}s may also be appropriate for use with event-style
72   * tasks that are never joined.
73   *
74   * <p>A static {@link #commonPool()} is available and appropriate for
75   * most applications. The common pool is used by any ForkJoinTask that
76   * is not explicitly submitted to a specified pool. Using the common
77   * pool normally reduces resource usage (its threads are slowly
78   * reclaimed during periods of non-use, and reinstated upon subsequent
79   * use).
80   *
81   * <p>For applications that require separate or custom pools, a {@code
82   * ForkJoinPool} may be constructed with a given target parallelism
83   * level; by default, equal to the number of available processors. The
84   * pool attempts to maintain enough active (or available) threads by
85   * dynamically adding, suspending, or resuming internal worker
86   * threads, even if some tasks are stalled waiting to join others.
87   * However, no such adjustments are guaranteed in the face of blocked
88   * I/O or other unmanaged synchronization. The nested {@link
89   * ManagedBlocker} interface enables extension of the kinds of
90   * synchronization accommodated.
91   *
92   * <p>In addition to execution and lifecycle control methods, this
93   * class provides status check methods (for example
94   * {@link #getStealCount}) that are intended to aid in developing,
95   * tuning, and monitoring fork/join applications. Also, method
96   * {@link #toString} returns indications of pool state in a
97   * convenient form for informal monitoring.
98   *
99   * <p>As is the case with other ExecutorServices, there are three
100  * main task execution methods summarized in the following table.
101  * These are designed to be used primarily by clients not already
102  * engaged in fork/join computations in the current pool.  The main
103  * forms of these methods accept instances of {@code ForkJoinTask},
104  * but overloaded forms also allow mixed execution of plain {@code
105  * Runnable}- or {@code Callable}- based activities as well.  However,
106  * tasks that are already executing in a pool should normally instead
107  * use the within-computation forms listed in the table unless using
108  * async event-style tasks that are not usually joined, in which case
109  * there is little difference among choice of methods.
110  *
111  * <table BORDER CELLPADDING=3 CELLSPACING=1>
112  * <caption>Summary of task execution methods</caption>
113  *  <tr>
114  *    <td></td>
115  *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
116  *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
117  *  </tr>
118  *  <tr>
119  *    <td> <b>Arrange async execution</b></td>
120  *    <td> {@link #execute(ForkJoinTask)}</td>
121  *    <td> {@link ForkJoinTask#fork}</td>
122  *  </tr>
123  *  <tr>
124  *    <td> <b>Await and obtain result</b></td>
125  *    <td> {@link #invoke(ForkJoinTask)}</td>
126  *    <td> {@link ForkJoinTask#invoke}</td>
127  *  </tr>
128  *  <tr>
129  *    <td> <b>Arrange exec and obtain Future</b></td>
130  *    <td> {@link #submit(ForkJoinTask)}</td>
131  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
132  *  </tr>
133  * </table>
134  *
135  * <p>The common pool is by default constructed with default
136  * parameters, but these may be controlled by setting three
137  * {@linkplain System#getProperty system properties}:
138  * <ul>
139  * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
140  * - the parallelism level, a non-negative integer
141  * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
142  * - the class name of a {@link ForkJoinWorkerThreadFactory}
143  * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
144  * - the class name of a {@link UncaughtExceptionHandler}
145  * </ul>
146  * If a {@link SecurityManager} is present and no factory is
147  * specified, then the default pool uses a factory supplying
148  * threads that have no {@link Permissions} enabled.
149  * The system class loader is used to load these classes.
150  * Upon any error in establishing these settings, default parameters
151  * are used. It is possible to disable or limit the use of threads in
152  * the common pool by setting the parallelism property to zero, and/or
153  * using a factory that may return {@code null}. However doing so may
154  * cause unjoined tasks to never be executed.
155  *
156  * <p><b>Implementation notes</b>: This implementation restricts the
157  * maximum number of running threads to 32767. Attempts to create
158  * pools with greater than the maximum number result in
159  * {@code IllegalArgumentException}.
160  *
161  * <p>This implementation rejects submitted tasks (that is, by throwing
162  * {@link RejectedExecutionException}) only when the pool is shut down
163  * or internal resources have been exhausted.
164  *
165  * @since 1.7
166  * @author Doug Lea
167  */
168 @sun.misc.Contended
169 public class ForkJoinPool extends AbstractExecutorService {
170 
171     /*
172      * Implementation Overview
173      *
174      * This class and its nested classes provide the main
175      * functionality and control for a set of worker threads:
176      * Submissions from non-FJ threads enter into submission queues.
177      * Workers take these tasks and typically split them into subtasks
178      * that may be stolen by other workers.  Preference rules give
179      * first priority to processing tasks from their own queues (LIFO
180      * or FIFO, depending on mode), then to randomized FIFO steals of
181      * tasks in other queues.
182      *
183      * WorkQueues
184      * ==========
185      *
186      * Most operations occur within work-stealing queues (in nested
187      * class WorkQueue).  These are special forms of Deques that
188      * support only three of the four possible end-operations -- push,
189      * pop, and poll (aka steal), under the further constraints that
190      * push and pop are called only from the owning thread (or, as
191      * extended here, under a lock), while poll may be called from
192      * other threads.  (If you are unfamiliar with them, you probably
193      * want to read Herlihy and Shavit's book "The Art of
194      * Multiprocessor programming", chapter 16 describing these in
195      * more detail before proceeding.)  The main work-stealing queue
196      * design is roughly similar to those in the papers "Dynamic
197      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
198      * (http://research.sun.com/scalable/pubs/index.html) and
199      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
200      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
201      * See also "Correct and Efficient Work-Stealing for Weak Memory
202      * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
203      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
204      * analysis of memory ordering (atomic, volatile etc) issues.  The
205      * main differences ultimately stem from GC requirements that we
206      * null out taken slots as soon as we can, to maintain as small a
207      * footprint as possible even in programs generating huge numbers
208      * of tasks. To accomplish this, we shift the CAS arbitrating pop
209      * vs poll (steal) from being on the indices ("base" and "top") to
210      * the slots themselves.  So, both a successful pop and poll
211      * mainly entail a CAS of a slot from non-null to null.  Because
212      * we rely on CASes of references, we do not need tag bits on base
213      * or top.  They are simple ints as used in any circular
214      * array-based queue (see for example ArrayDeque).  Updates to the
215      * indices must still be ordered in a way that guarantees that top
216      * == base means the queue is empty, but otherwise may err on the
217      * side of possibly making the queue appear nonempty when a push,
218      * pop, or poll have not fully committed. Note that this means
219      * that the poll operation, considered individually, is not
220      * wait-free. One thief cannot successfully continue until another
221      * in-progress one (or, if previously empty, a push) completes.
222      * However, in the aggregate, we ensure at least probabilistic
223      * non-blockingness.  If an attempted steal fails, a thief always
224      * chooses a different random victim target to try next. So, in
225      * order for one thief to progress, it suffices for any
226      * in-progress poll or new push on any empty queue to
227      * complete. (This is why we normally use method pollAt and its
228      * variants that try once at the apparent base index, else
229      * consider alternative actions, rather than method poll.)
230      *
231      * This approach also enables support of a user mode in which local
232      * task processing is in FIFO, not LIFO order, simply by using
233      * poll rather than pop.  This can be useful in message-passing
234      * frameworks in which tasks are never joined.  However neither
235      * mode considers affinities, loads, cache localities, etc, so
236      * rarely provide the best possible performance on a given
237      * machine, but portably provide good throughput by averaging over
238      * these factors.  (Further, even if we did try to use such
239      * information, we do not usually have a basis for exploiting it.
240      * For example, some sets of tasks profit from cache affinities,
241      * but others are harmed by cache pollution effects.)
242      *
243      * WorkQueues are also used in a similar way for tasks submitted
244      * to the pool. We cannot mix these tasks in the same queues used
245      * for work-stealing (this would contaminate lifo/fifo
246      * processing). Instead, we randomly associate submission queues
247      * with submitting threads, using a form of hashing.  The
248      * ThreadLocalRandom probe value serves as a hash code for
249      * choosing existing queues, and may be randomly repositioned upon
250      * contention with other submitters.  In essence, submitters act
251      * like workers except that they are restricted to executing local
252      * tasks that they submitted (or in the case of CountedCompleters,
253      * others with the same root task).  However, because most
254      * shared/external queue operations are more expensive than
255      * internal, and because, at steady state, external submitters
256      * will compete for CPU with workers, ForkJoinTask.join and
257      * related methods disable them from repeatedly helping to process
258      * tasks if all workers are active.  Insertion of tasks in shared
259      * mode requires a lock (mainly to protect in the case of
260      * resizing) but we use only a simple spinlock (using bits in
261      * field qlock), because submitters encountering a busy queue move
262      * on to try or create other queues -- they block only when
263      * creating and registering new queues.
264      *
265      * Management
266      * ==========
267      *
268      * The main throughput advantages of work-stealing stem from
269      * decentralized control -- workers mostly take tasks from
270      * themselves or each other. We cannot negate this in the
271      * implementation of other management responsibilities. The main
272      * tactic for avoiding bottlenecks is packing nearly all
273      * essentially atomic control state into two volatile variables
274      * that are by far most often read (not written) as status and
275      * consistency checks.
276      *
277      * Field "ctl" contains 64 bits holding all the information needed
278      * to atomically decide to add, inactivate, enqueue (on an event
279      * queue), dequeue, and/or re-activate workers.  To enable this
280      * packing, we restrict maximum parallelism to (1<<15)-1 (which is
281      * far in excess of normal operating range) to allow ids, counts,
282      * and their negations (used for thresholding) to fit into 16bit
283      * fields.
284      *
285      * Field "plock" is a form of sequence lock with a saturating
286      * shutdown bit (similarly for per-queue "qlocks"), mainly
287      * protecting updates to the workQueues array, as well as to
288      * enable shutdown.  When used as a lock, it is normally only very
289      * briefly held, so is nearly always available after at most a
290      * brief spin, but we use a monitor-based backup strategy to
291      * block when needed.
292      *
293      * Recording WorkQueues.  WorkQueues are recorded in the
294      * "workQueues" array that is created upon first use and expanded
295      * if necessary.  Updates to the array while recording new workers
296      * and unrecording terminated ones are protected from each other
297      * by a lock but the array is otherwise concurrently readable, and
298      * accessed directly.  To simplify index-based operations, the
299      * array size is always a power of two, and all readers must
300      * tolerate null slots. Worker queues are at odd indices. Shared
301      * (submission) queues are at even indices, up to a maximum of 64
302      * slots, to limit growth even if array needs to expand to add
303      * more workers. Grouping them together in this way simplifies and
304      * speeds up task scanning.
305      *
306      * All worker thread creation is on-demand, triggered by task
307      * submissions, replacement of terminated workers, and/or
308      * compensation for blocked workers. However, all other support
309      * code is set up to work with other policies.  To ensure that we
310      * do not hold on to worker references that would prevent GC, ALL
311      * accesses to workQueues are via indices into the workQueues
312      * array (which is one source of some of the messy code
313      * constructions here). In essence, the workQueues array serves as
314      * a weak reference mechanism. Thus for example the wait queue
315      * field of ctl stores indices, not references.  Access to the
316      * workQueues in associated methods (for example signalWork) must
317      * both index-check and null-check the IDs. All such accesses
318      * ignore bad IDs by returning out early from what they are doing,
319      * since this can only be associated with termination, in which
320      * case it is OK to give up.  All uses of the workQueues array
321      * also check that it is non-null (even if previously
322      * non-null). This allows nulling during termination, which is
323      * currently not necessary, but remains an option for
324      * resource-revocation-based shutdown schemes. It also helps
325      * reduce JIT issuance of uncommon-trap code, which tends to
326      * unnecessarily complicate control flow in some methods.
327      *
328      * Event Queuing. Unlike HPC work-stealing frameworks, we cannot
329      * let workers spin indefinitely scanning for tasks when none can
330      * be found immediately, and we cannot start/resume workers unless
331      * there appear to be tasks available.  On the other hand, we must
332      * quickly prod them into action when new tasks are submitted or
333      * generated. In many usages, ramp-up time to activate workers is
334      * the main limiting factor in overall performance (this is
335      * compounded at program start-up by JIT compilation and
336      * allocation). So we try to streamline this as much as possible.
337      * We park/unpark workers after placing in an event wait queue
338      * when they cannot find work. This "queue" is actually a simple
339      * Treiber stack, headed by the "id" field of ctl, plus a 15bit
340      * counter value (that reflects the number of times a worker has
341      * been inactivated) to avoid ABA effects (we need only as many
342      * version numbers as worker threads). Successors are held in
343      * field WorkQueue.nextWait.  Queuing deals with several intrinsic
344      * races, mainly that a task-producing thread can miss seeing (and
345      * signalling) another thread that gave up looking for work but
346      * has not yet entered the wait queue. We solve this by requiring
347      * a full sweep of all workers (via repeated calls to method
348      * scan()) both before and after a newly waiting worker is added
349      * to the wait queue.  Because enqueued workers may actually be
350      * rescanning rather than waiting, we set and clear the "parker"
351      * field of WorkQueues to reduce unnecessary calls to unpark.
352      * (This requires a secondary recheck to avoid missed signals.)
353      * Note the unusual conventions about Thread.interrupts
354      * surrounding parking and other blocking: Because interrupts are
355      * used solely to alert threads to check termination, which is
356      * checked anyway upon blocking, we clear status (using
357      * Thread.interrupted) before any call to park, so that park does
358      * not immediately return due to status being set via some other
359      * unrelated call to interrupt in user code.
360      *
361      * Signalling.  We create or wake up workers only when there
362      * appears to be at least one task they might be able to find and
363      * execute.  When a submission is added or another worker adds a
364      * task to a queue that has fewer than two tasks, they signal
365      * waiting workers (or trigger creation of new ones if fewer than
366      * the given parallelism level -- signalWork).  These primary
367      * signals are buttressed by others whenever other threads remove
368      * a task from a queue and notice that there are other tasks there
369      * as well.  So in general, pools will be over-signalled. On most
370      * platforms, signalling (unpark) overhead time is noticeably
371      * long, and the time between signalling a thread and it actually
372      * making progress can be very noticeably long, so it is worth
373      * offloading these delays from critical paths as much as
374      * possible. Additionally, workers spin-down gradually, by staying
375      * alive so long as they see the ctl state changing.  Similar
376      * stability-sensing techniques are also used before blocking in
377      * awaitJoin and helpComplete.
378      *
379      * Trimming workers. To release resources after periods of lack of
380      * use, a worker starting to wait when the pool is quiescent will
381      * time out and terminate if the pool has remained quiescent for a
382      * given period -- a short period if there are more threads than
383      * parallelism, longer as the number of threads decreases. This
384      * will slowly propagate, eventually terminating all workers after
385      * periods of non-use.
386      *
387      * Shutdown and Termination. A call to shutdownNow atomically sets
388      * a plock bit and then (non-atomically) sets each worker's
389      * qlock status, cancels all unprocessed tasks, and wakes up
390      * all waiting workers.  Detecting whether termination should
391      * commence after a non-abrupt shutdown() call requires more work
392      * and bookkeeping. We need consensus about quiescence (i.e., that
393      * there is no more work). The active count provides a primary
394      * indication but non-abrupt shutdown still requires a rechecking
395      * scan for any workers that are inactive but not queued.
396      *
397      * Joining Tasks
398      * =============
399      *
400      * Any of several actions may be taken when one worker is waiting
401      * to join a task stolen (or always held) by another.  Because we
402      * are multiplexing many tasks on to a pool of workers, we can't
403      * just let them block (as in Thread.join).  We also cannot just
404      * reassign the joiner's run-time stack with another and replace
405      * it later, which would be a form of "continuation", that even if
406      * possible is not necessarily a good idea since we sometimes need
407      * both an unblocked task and its continuation to progress.
408      * Instead we combine two tactics:
409      *
410      *   Helping: Arranging for the joiner to execute some task that it
411      *      would be running if the steal had not occurred.
412      *
413      *   Compensating: Unless there are already enough live threads,
414      *      method tryCompensate() may create or re-activate a spare
415      *      thread to compensate for blocked joiners until they unblock.
416      *
417      * A third form (implemented in tryRemoveAndExec) amounts to
418      * helping a hypothetical compensator: If we can readily tell that
419      * a possible action of a compensator is to steal and execute the
420      * task being joined, the joining thread can do so directly,
421      * without the need for a compensation thread (although at the
422      * expense of larger run-time stacks, but the tradeoff is
423      * typically worthwhile).
424      *
425      * The ManagedBlocker extension API can't use helping so relies
426      * only on compensation in method awaitBlocker.
427      *
428      * The algorithm in tryHelpStealer entails a form of "linear"
429      * helping: Each worker records (in field currentSteal) the most
430      * recent task it stole from some other worker. Plus, it records
431      * (in field currentJoin) the task it is currently actively
432      * joining. Method tryHelpStealer uses these markers to try to
433      * find a worker to help (i.e., steal back a task from and execute
434      * it) that could hasten completion of the actively joined task.
435      * In essence, the joiner executes a task that would be on its own
436      * local deque had the to-be-joined task not been stolen. This may
437      * be seen as a conservative variant of the approach in Wagner &
438      * Calder "Leapfrogging: a portable technique for implementing
439      * efficient futures" SIGPLAN Notices, 1993
440      * (http://portal.acm.org/citation.cfm?id=155354). It differs in
441      * that: (1) We only maintain dependency links across workers upon
442      * steals, rather than use per-task bookkeeping.  This sometimes
443      * requires a linear scan of workQueues array to locate stealers,
444      * but often doesn't because stealers leave hints (that may become
445      * stale/wrong) of where to locate them.  It is only a hint
446      * because a worker might have had multiple steals and the hint
447      * records only one of them (usually the most current).  Hinting
448      * isolates cost to when it is needed, rather than adding to
449      * per-task overhead.  (2) It is "shallow", ignoring nesting and
450      * potentially cyclic mutual steals.  (3) It is intentionally
451      * racy: field currentJoin is updated only while actively joining,
452      * which means that we miss links in the chain during long-lived
453      * tasks, GC stalls etc (which is OK since blocking in such cases
454      * is usually a good idea).  (4) We bound the number of attempts
455      * to find work (see MAX_HELP) and fall back to suspending the
456      * worker and if necessary replacing it with another.
457      *
458      * Helping actions for CountedCompleters are much simpler: Method
459      * helpComplete can take and execute any task with the same root
460      * as the task being waited on. However, this still entails some
461      * traversal of completer chains, so is less efficient than using
462      * CountedCompleters without explicit joins.
463      *
464      * It is impossible to keep exactly the target parallelism number
465      * of threads running at any given time.  Determining the
466      * existence of conservatively safe helping targets, the
467      * availability of already-created spares, and the apparent need
468      * to create new spares are all racy, so we rely on multiple
469      * retries of each.  Compensation in the apparent absence of
470      * helping opportunities is challenging to control on JVMs, where
471      * GC and other activities can stall progress of tasks that in
472      * turn stall out many other dependent tasks, without us being
473      * able to determine whether they will ever require compensation.
474      * Even though work-stealing otherwise encounters little
475      * degradation in the presence of more threads than cores,
476      * aggressively adding new threads in such cases entails risk of
477      * unwanted positive feedback control loops in which more threads
478      * cause more dependent stalls (as well as delayed progress of
479      * unblocked threads to the point that we know they are available)
480      * leading to more situations requiring more threads, and so
481      * on. This aspect of control can be seen as an (analytically
482      * intractable) game with an opponent that may choose the worst
483      * (for us) active thread to stall at any time.  We take several
484      * precautions to bound losses (and thus bound gains), mainly in
485      * methods tryCompensate and awaitJoin.
486      *
487      * Common Pool
488      * ===========
489      *
490      * The static common pool always exists after static
491      * initialization.  Since it (or any other created pool) need
492      * never be used, we minimize initial construction overhead and
493      * footprint to the setup of about a dozen fields, with no nested
494      * allocation. Most bootstrapping occurs within method
495      * fullExternalPush during the first submission to the pool.
496      *
497      * When external threads submit to the common pool, they can
498      * perform subtask processing (see externalHelpJoin and related
499      * methods).  This caller-helps policy makes it sensible to set
500      * common pool parallelism level to one (or more) less than the
501      * total number of available cores, or even zero for pure
502      * caller-runs.  We do not need to record whether external
503      * submissions are to the common pool -- if not, externalHelpJoin
504      * returns quickly (at the most helping to signal some common pool
505      * workers). These submitters would otherwise be blocked waiting
506      * for completion, so the extra effort (with liberally sprinkled
507      * task status checks) in inapplicable cases amounts to an odd
508      * form of limited spin-wait before blocking in ForkJoinTask.join.
509      *
510      * As a more appropriate default in managed environments, unless
511      * overridden by system properties, we use workers of subclass
512      * InnocuousForkJoinWorkerThread when there is a SecurityManager
513      * present. These workers have no permissions set, do not belong
514      * to any user-defined ThreadGroup, and erase all ThreadLocals
515      * after executing any top-level task (see WorkQueue.runTask). The
516      * associated mechanics (mainly in ForkJoinWorkerThread) may be
517      * JVM-dependent and must access particular Thread class fields to
518      * achieve this effect.
519      *
520      * Style notes
521      * ===========
522      *
523      * There is a lot of representation-level coupling among classes
524      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
525      * fields of WorkQueue maintain data structures managed by
526      * ForkJoinPool, so are directly accessed.  There is little point
527      * trying to reduce this, since any associated future changes in
528      * representations will need to be accompanied by algorithmic
529      * changes anyway. Several methods intrinsically sprawl because
530      * they must accumulate sets of consistent reads of volatiles held
531      * in local variables.  Methods signalWork() and scan() are the
532      * main bottlenecks, so are especially heavily
533      * micro-optimized/mangled.  There are lots of inline assignments
534      * (of form "while ((local = field) != 0)") which are usually the
535      * simplest way to ensure the required read orderings (which are
536      * sometimes critical). This leads to a "C"-like style of listing
537      * declarations of these locals at the heads of methods or blocks.
538      * There are several occurrences of the unusual "do {} while
539      * (!cas...)"  which is the simplest way to force an update of a
540      * CAS'ed variable. There are also other coding oddities (including
541      * several unnecessary-looking hoisted null checks) that help
542      * some methods perform reasonably even when interpreted (not
543      * compiled).
544      *
545      * The order of declarations in this file is:
546      * (1) Static utility functions
547      * (2) Nested (static) classes
548      * (3) Static fields
549      * (4) Fields, along with constants used when unpacking some of them
550      * (5) Internal control methods
551      * (6) Callbacks and other support for ForkJoinTask methods
552      * (7) Exported methods
553      * (8) Static block initializing statics in minimally dependent order
554      */
555 
556     // Static utilities
557 
558     /**
559      * If there is a security manager, makes sure caller has
560      * permission to modify threads.
561      */
562     private static void checkPermission() {
563         SecurityManager security = System.getSecurityManager();
564         if (security != null)
565             security.checkPermission(modifyThreadPermission);
566     }
567 
568     // Nested classes
569 
570     /**
571      * Factory for creating new {@link ForkJoinWorkerThread}s.
572      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
573      * for {@code ForkJoinWorkerThread} subclasses that extend base
574      * functionality or initialize threads with different contexts.
575      */
576     public static interface ForkJoinWorkerThreadFactory {
577         /**
578          * Returns a new worker thread operating in the given pool.
579          *
580          * @param pool the pool this thread works in
581          * @return the new worker thread
582          * @throws NullPointerException if the pool is null
583          */
584         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
585     }
586 
587     /**
588      * Default ForkJoinWorkerThreadFactory implementation; creates a
589      * new ForkJoinWorkerThread.
590      */
591     static final class DefaultForkJoinWorkerThreadFactory
592         implements ForkJoinWorkerThreadFactory {
593         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
594             return new ForkJoinWorkerThread(pool);
595         }
596     }
597 
598     /**
599      * Class for artificial tasks that are used to replace the target
600      * of local joins if they are removed from an interior queue slot
601      * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
602      * actually do anything beyond having a unique identity.
603      */
604     static final class EmptyTask extends ForkJoinTask<Void> {
605         private static final long serialVersionUID = -7721805057305804111L;
606         EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
607         public final Void getRawResult() { return null; }
608         public final void setRawResult(Void x) {}
609         public final boolean exec() { return true; }
610     }
611 
612     /**
613      * Queues supporting work-stealing as well as external task
614      * submission. See above for main rationale and algorithms.
615      * Implementation relies heavily on "Unsafe" intrinsics
616      * and selective use of "volatile":
617      *
618      * Field "base" is the index (mod array.length) of the least valid
619      * queue slot, which is always the next position to steal (poll)
620      * from if nonempty. Reads and writes require volatile orderings
621      * but not CAS, because updates are only performed after slot
622      * CASes.
623      *
624      * Field "top" is the index (mod array.length) of the next queue
625      * slot to push to or pop from. It is written only by owner thread
626      * for push, or under lock for external/shared push, and accessed
627      * by other threads only after reading (volatile) base.  Both top
628      * and base are allowed to wrap around on overflow, but (top -
629      * base) (or more commonly -(base - top) to force volatile read of
630      * base before top) still estimates size. The lock ("qlock") is
631      * forced to -1 on termination, causing all further lock attempts
632      * to fail. (Note: we don't need CAS for termination state because
633      * upon pool shutdown, all shared-queues will stop being used
634      * anyway.)  Nearly all lock bodies are set up so that exceptions
635      * within lock bodies are "impossible" (modulo JVM errors that
636      * would cause failure anyway.)
637      *
638      * The array slots are read and written using the emulation of
639      * volatiles/atomics provided by Unsafe. Insertions must in
640      * general use putOrderedObject as a form of releasing store to
641      * ensure that all writes to the task object are ordered before
642      * its publication in the queue.  All removals entail a CAS to
643      * null.  The array is always a power of two. To ensure safety of
644      * Unsafe array operations, all accesses perform explicit null
645      * checks and implicit bounds checks via power-of-two masking.
646      *
647      * In addition to basic queuing support, this class contains
648      * fields described elsewhere to control execution. It turns out
649      * to work better memory-layout-wise to include them in this class
650      * rather than a separate class.
651      *
652      * Performance on most platforms is very sensitive to placement of
653      * instances of both WorkQueues and their arrays -- we absolutely
654      * do not want multiple WorkQueue instances or multiple queue
655      * arrays sharing cache lines. (It would be best for queue objects
656      * and their arrays to share, but there is nothing available to
657      * help arrange that). The @Contended annotation alerts JVMs to
658      * try to keep instances apart.
659      */
660     @sun.misc.Contended
661     static final class WorkQueue {
662         /**
663          * Capacity of work-stealing queue array upon initialization.
664          * Must be a power of two; at least 4, but should be larger to
665          * reduce or eliminate cacheline sharing among queues.
666          * Currently, it is much larger, as a partial workaround for
667          * the fact that JVMs often place arrays in locations that
668          * share GC bookkeeping (especially cardmarks) such that
669          * per-write accesses encounter serious memory contention.
670          */
671         static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
672 
673         /**
674          * Maximum size for queue arrays. Must be a power of two less
675          * than or equal to 1 << (31 - width of array entry) to ensure
676          * lack of wraparound of index calculations, but defined to a
677          * value a bit less than this to help users trap runaway
678          * programs before saturating systems.
679          */
680         static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
681 
682         volatile int eventCount;   // encoded inactivation count; < 0 if inactive
683         int nextWait;              // encoded record of next event waiter
684         int nsteals;               // number of steals
685         int hint;                  // steal index hint
686         short poolIndex;           // index of this queue in pool
687         final short mode;          // 0: lifo, > 0: fifo, < 0: shared
688         volatile int qlock;        // 1: locked, -1: terminate; else 0
689         volatile int base;         // index of next slot for poll
690         int top;                   // index of next slot for push
691         ForkJoinTask<?>[] array;   // the elements (initially unallocated)
692         final ForkJoinPool pool;   // the containing pool (may be null)
693         final ForkJoinWorkerThread owner; // owning thread or null if shared
694         volatile Thread parker;    // == owner during call to park; else null
695         volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
696         ForkJoinTask<?> currentSteal; // current non-local task being executed
697 
698         WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
699                   int seed) {
700             this.pool = pool;
701             this.owner = owner;
702             this.mode = (short)mode;
703             this.hint = seed; // store initial seed for runWorker
704             // Place indices in the center of array (that is not yet allocated)
705             base = top = INITIAL_QUEUE_CAPACITY >>> 1;
706         }
707 
708         /**
709          * Returns the approximate number of tasks in the queue.
710          */
711         final int queueSize() {
712             int n = base - top;       // non-owner callers must read base first
713             return (n >= 0) ? 0 : -n; // ignore transient negative
714         }
715 
716         /**
717          * Provides a more accurate estimate of whether this queue has
718          * any tasks than does queueSize, by checking whether a
719          * near-empty queue has at least one unclaimed task.
720          */
721         final boolean isEmpty() {
722             ForkJoinTask<?>[] a; int m, s;
723             int n = base - (s = top);
724             return (n >= 0 ||
725                     (n == -1 &&
726                      ((a = array) == null ||
727                       (m = a.length - 1) < 0 ||
728                       U.getObject
729                       (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
730         }
731 
732         /**
733          * Pushes a task. Call only by owner in unshared queues.  (The
734          * shared-queue version is embedded in method externalPush.)
735          *
736          * @param task the task. Caller must ensure non-null.
737          * @throws RejectedExecutionException if array cannot be resized
738          */
739         final void push(ForkJoinTask<?> task) {
740             ForkJoinTask<?>[] a; ForkJoinPool p;
741             int s = top, n;
742             if ((a = array) != null) {    // ignore if queue removed
743                 int m = a.length - 1;
744                 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
745                 if ((n = (top = s + 1) - base) <= 2)
746                     (p = pool).signalWork(p.workQueues, this);
747                 else if (n >= m)
748                     growArray();
749             }
750         }
751 
752         /**
753          * Initializes or doubles the capacity of array. Call either
754          * by owner or with lock held -- it is OK for base, but not
755          * top, to move while resizings are in progress.
756          */
757         final ForkJoinTask<?>[] growArray() {
758             ForkJoinTask<?>[] oldA = array;
759             int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
760             if (size > MAXIMUM_QUEUE_CAPACITY)
761                 throw new RejectedExecutionException("Queue capacity exceeded");
762             int oldMask, t, b;
763             ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
764             if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
765                 (t = top) - (b = base) > 0) {
766                 int mask = size - 1;
767                 do {
768                     ForkJoinTask<?> x;
769                     int oldj = ((b & oldMask) << ASHIFT) + ABASE;
770                     int j    = ((b &    mask) << ASHIFT) + ABASE;
771                     x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
772                     if (x != null &&
773                         U.compareAndSwapObject(oldA, oldj, x, null))
774                         U.putObjectVolatile(a, j, x);
775                 } while (++b != t);
776             }
777             return a;
778         }
779 
780         /**
781          * Takes next task, if one exists, in LIFO order.  Call only
782          * by owner in unshared queues.
783          */
784         final ForkJoinTask<?> pop() {
785             ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
786             if ((a = array) != null && (m = a.length - 1) >= 0) {
787                 for (int s; (s = top - 1) - base >= 0;) {
788                     long j = ((m & s) << ASHIFT) + ABASE;
789                     if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
790                         break;
791                     if (U.compareAndSwapObject(a, j, t, null)) {
792                         top = s;
793                         return t;
794                     }
795                 }
796             }
797             return null;
798         }
799 
800         /**
801          * Takes a task in FIFO order if b is base of queue and a task
802          * can be claimed without contention. Specialized versions
803          * appear in ForkJoinPool methods scan and tryHelpStealer.
804          */
805         final ForkJoinTask<?> pollAt(int b) {
806             ForkJoinTask<?> t; ForkJoinTask<?>[] a;
807             if ((a = array) != null) {
808                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
809                 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
810                     base == b && U.compareAndSwapObject(a, j, t, null)) {
811                     U.putOrderedInt(this, QBASE, b + 1);
812                     return t;
813                 }
814             }
815             return null;
816         }
817 
818         /**
819          * Takes next task, if one exists, in FIFO order.
820          */
821         final ForkJoinTask<?> poll() {
822             ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
823             while ((b = base) - top < 0 && (a = array) != null) {
824                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
825                 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
826                 if (t != null) {
827                     if (U.compareAndSwapObject(a, j, t, null)) {
828                         U.putOrderedInt(this, QBASE, b + 1);
829                         return t;
830                     }
831                 }
832                 else if (base == b) {
833                     if (b + 1 == top)
834                         break;
835                     Thread.yield(); // wait for lagging update (very rare)
836                 }
837             }
838             return null;
839         }
840 
841         /**
842          * Takes next task, if one exists, in order specified by mode.
843          */
844         final ForkJoinTask<?> nextLocalTask() {
845             return mode == 0 ? pop() : poll();
846         }
847 
848         /**
849          * Returns next task, if one exists, in order specified by mode.
850          */
851         final ForkJoinTask<?> peek() {
852             ForkJoinTask<?>[] a = array; int m;
853             if (a == null || (m = a.length - 1) < 0)
854                 return null;
855             int i = mode == 0 ? top - 1 : base;
856             int j = ((i & m) << ASHIFT) + ABASE;
857             return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
858         }
859 
860         /**
861          * Pops the given task only if it is at the current top.
862          * (A shared version is available only via FJP.tryExternalUnpush)
863          */
864         final boolean tryUnpush(ForkJoinTask<?> t) {
865             ForkJoinTask<?>[] a; int s;
866             if ((a = array) != null && (s = top) != base &&
867                 U.compareAndSwapObject
868                 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
869                 top = s;
870                 return true;
871             }
872             return false;
873         }
874 
875         /**
876          * Removes and cancels all known tasks, ignoring any exceptions.
877          */
878         final void cancelAll() {
879             ForkJoinTask.cancelIgnoringExceptions(currentJoin);
880             ForkJoinTask.cancelIgnoringExceptions(currentSteal);
881             for (ForkJoinTask<?> t; (t = poll()) != null; )
882                 ForkJoinTask.cancelIgnoringExceptions(t);
883         }
884 
885         // Specialized execution methods
886 
887         /**
888          * Polls and runs tasks until empty.
889          */
890         final void pollAndExecAll() {
891             for (ForkJoinTask<?> t; (t = poll()) != null;)
892                 t.doExec();
893         }
894 
895         /**
896          * Executes a top-level task and any local tasks remaining
897          * after execution.
898          */
899         final void runTask(ForkJoinTask<?> task) {
900             if ((currentSteal = task) != null) {
901                 ForkJoinWorkerThread thread;
902                 task.doExec();
903                 ForkJoinTask<?>[] a = array;
904                 int md = mode;
905                 ++nsteals;
906                 currentSteal = null;
907                 if (md != 0)
908                     pollAndExecAll();
909                 else if (a != null) {
910                     int s, m = a.length - 1;
911                     ForkJoinTask<?> t;
912                     while ((s = top - 1) - base >= 0 &&
913                            (t = (ForkJoinTask<?>)U.getAndSetObject
914                             (a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
915                         top = s;
916                         t.doExec();
917                     }
918                 }
919                 if ((thread = owner) != null) // no need to do in finally clause
920                     thread.afterTopLevelExec();
921             }
922         }
923 
924         /**
925          * If present, removes from queue and executes the given task,
926          * or any other cancelled task. Returns (true) on any CAS
927          * or consistency check failure so caller can retry.
928          *
929          * @return false if no progress can be made, else true
930          */
931         final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
932             boolean stat;
933             ForkJoinTask<?>[] a; int m, s, b, n;
934             if (task != null && (a = array) != null && (m = a.length - 1) >= 0 &&
935                 (n = (s = top) - (b = base)) > 0) {
936                 boolean removed = false, empty = true;
937                 stat = true;
938                 for (ForkJoinTask<?> t;;) {           // traverse from s to b
939                     long j = ((--s & m) << ASHIFT) + ABASE;
940                     t = (ForkJoinTask<?>)U.getObject(a, j);
941                     if (t == null)                    // inconsistent length
942                         break;
943                     else if (t == task) {
944                         if (s + 1 == top) {           // pop
945                             if (!U.compareAndSwapObject(a, j, task, null))
946                                 break;
947                             top = s;
948                             removed = true;
949                         }
950                         else if (base == b)           // replace with proxy
951                             removed = U.compareAndSwapObject(a, j, task,
952                                                              new EmptyTask());
953                         break;
954                     }
955                     else if (t.status >= 0)
956                         empty = false;
957                     else if (s + 1 == top) {          // pop and throw away
958                         if (U.compareAndSwapObject(a, j, t, null))
959                             top = s;
960                         break;
961                     }
962                     if (--n == 0) {
963                         if (!empty && base == b)
964                             stat = false;
965                         break;
966                     }
967                 }
968                 if (removed)
969                     task.doExec();
970             }
971             else
972                 stat = false;
973             return stat;
974         }
975 
976         /**
977          * Tries to poll for and execute the given task or any other
978          * task in its CountedCompleter computation.
979          */
980         final boolean pollAndExecCC(CountedCompleter<?> root) {
981             ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r;
982             if ((b = base) - top < 0 && (a = array) != null) {
983                 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
984                 if ((o = U.getObjectVolatile(a, j)) == null)
985                     return true; // retry
986                 if (o instanceof CountedCompleter) {
987                     for (t = (CountedCompleter<?>)o, r = t;;) {
988                         if (r == root) {
989                             if (base == b &&
990                                 U.compareAndSwapObject(a, j, t, null)) {
991                                 U.putOrderedInt(this, QBASE, b + 1);
992                                 t.doExec();
993                             }
994                             return true;
995                         }
996                         else if ((r = r.completer) == null)
997                             break; // not part of root computation
998                     }
999                 }
1000             }
1001             return false;
1002         }
1003 
1004         /**
1005          * Tries to pop and execute the given task or any other task
1006          * in its CountedCompleter computation.
1007          */
1008         final boolean externalPopAndExecCC(CountedCompleter<?> root) {
1009             ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
1010             if (base - (s = top) < 0 && (a = array) != null) {
1011                 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
1012                 if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
1013                     for (t = (CountedCompleter<?>)o, r = t;;) {
1014                         if (r == root) {
1015                             if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1016                                 if (top == s && array == a &&
1017                                     U.compareAndSwapObject(a, j, t, null)) {
1018                                     top = s - 1;
1019                                     qlock = 0;
1020                                     t.doExec();
1021                                 }
1022                                 else
1023                                     qlock = 0;
1024                             }
1025                             return true;
1026                         }
1027                         else if ((r = r.completer) == null)
1028                             break;
1029                     }
1030                 }
1031             }
1032             return false;
1033         }
1034 
1035         /**
1036          * Internal version
1037          */
1038         final boolean internalPopAndExecCC(CountedCompleter<?> root) {
1039             ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
1040             if (base - (s = top) < 0 && (a = array) != null) {
1041                 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
1042                 if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
1043                     for (t = (CountedCompleter<?>)o, r = t;;) {
1044                         if (r == root) {
1045                             if (U.compareAndSwapObject(a, j, t, null)) {
1046                                 top = s - 1;
1047                                 t.doExec();
1048                             }
1049                             return true;
1050                         }
1051                         else if ((r = r.completer) == null)
1052                             break;
1053                     }
1054                 }
1055             }
1056             return false;
1057         }
1058 
1059         /**
1060          * Returns true if owned and not known to be blocked.
1061          */
1062         final boolean isApparentlyUnblocked() {
1063             Thread wt; Thread.State s;
1064             return (eventCount >= 0 &&
1065                     (wt = owner) != null &&
1066                     (s = wt.getState()) != Thread.State.BLOCKED &&
1067                     s != Thread.State.WAITING &&
1068                     s != Thread.State.TIMED_WAITING);
1069         }
1070 
1071         // Unsafe mechanics
1072         private static final sun.misc.Unsafe U;
1073         private static final long QBASE;
1074         private static final long QLOCK;
1075         private static final int ABASE;
1076         private static final int ASHIFT;
1077         static {
1078             try {
1079                 U = sun.misc.Unsafe.getUnsafe();
1080                 Class<?> k = WorkQueue.class;
1081                 Class<?> ak = ForkJoinTask[].class;
1082                 QBASE = U.objectFieldOffset
1083                     (k.getDeclaredField("base"));
1084                 QLOCK = U.objectFieldOffset
1085                     (k.getDeclaredField("qlock"));
1086                 ABASE = U.arrayBaseOffset(ak);
1087                 int scale = U.arrayIndexScale(ak);
1088                 if ((scale & (scale - 1)) != 0)
1089                     throw new Error("data type scale not a power of two");
1090                 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1091             } catch (Exception e) {
1092                 throw new Error(e);
1093             }
1094         }
1095     }
1096 
1097     // static fields (initialized in static initializer below)
1098 
1099     /**
1100      * Creates a new ForkJoinWorkerThread. This factory is used unless
1101      * overridden in ForkJoinPool constructors.
1102      */
1103     public static final ForkJoinWorkerThreadFactory
1104         defaultForkJoinWorkerThreadFactory;
1105 
1106     /**
1107      * Permission required for callers of methods that may start or
1108      * kill threads.
1109      */
1110     private static final RuntimePermission modifyThreadPermission;
1111 
1112     /**
1113      * Common (static) pool. Non-null for public use unless a static
1114      * construction exception, but internal usages null-check on use
1115      * to paranoically avoid potential initialization circularities
1116      * as well as to simplify generated code.
1117      */
1118     static final ForkJoinPool common;
1119 
1120     /**
1121      * Common pool parallelism. To allow simpler use and management
1122      * when common pool threads are disabled, we allow the underlying
1123      * common.parallelism field to be zero, but in that case still report
1124      * parallelism as 1 to reflect resulting caller-runs mechanics.
1125      */
1126     static final int commonParallelism;
1127 
1128     /**
1129      * Sequence number for creating workerNamePrefix.
1130      */
1131     private static int poolNumberSequence;
1132 
1133     /**
1134      * Returns the next sequence number. We don't expect this to
1135      * ever contend, so use simple builtin sync.
1136      */
1137     private static final synchronized int nextPoolId() {
1138         return ++poolNumberSequence;
1139     }
1140 
1141     // static constants
1142 
1143     /**
1144      * Initial timeout value (in nanoseconds) for the thread
1145      * triggering quiescence to park waiting for new work. On timeout,
1146      * the thread will instead try to shrink the number of
1147      * workers. The value should be large enough to avoid overly
1148      * aggressive shrinkage during most transient stalls (long GCs
1149      * etc).
1150      */
1151     private static final long IDLE_TIMEOUT      = 2000L * 1000L * 1000L; // 2sec
1152 
1153     /**
1154      * Timeout value when there are more threads than parallelism level
1155      */
1156     private static final long FAST_IDLE_TIMEOUT =  200L * 1000L * 1000L;
1157 
1158     /**
1159      * Tolerance for idle timeouts, to cope with timer undershoots
1160      */
1161     private static final long TIMEOUT_SLOP = 2000000L;
1162 
1163     /**
1164      * The maximum stolen->joining link depth allowed in method
1165      * tryHelpStealer.  Must be a power of two.  Depths for legitimate
1166      * chains are unbounded, but we use a fixed constant to avoid
1167      * (otherwise unchecked) cycles and to bound staleness of
1168      * traversal parameters at the expense of sometimes blocking when
1169      * we could be helping.
1170      */
1171     private static final int MAX_HELP = 64;
1172 
1173     /**
1174      * Increment for seed generators. See class ThreadLocal for
1175      * explanation.
1176      */
1177     private static final int SEED_INCREMENT = 0x9e3779b9;
1178 
1179     /*
1180      * Bits and masks for control variables
1181      *
1182      * Field ctl is a long packed with:
1183      * AC: Number of active running workers minus target parallelism (16 bits)
1184      * TC: Number of total workers minus target parallelism (16 bits)
1185      * ST: true if pool is terminating (1 bit)
1186      * EC: the wait count of top waiting thread (15 bits)
1187      * ID: poolIndex of top of Treiber stack of waiters (16 bits)
1188      *
1189      * When convenient, we can extract the upper 32 bits of counts and
1190      * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
1191      * (int)ctl.  The ec field is never accessed alone, but always
1192      * together with id and st. The offsets of counts by the target
1193      * parallelism and the positionings of fields makes it possible to
1194      * perform the most common checks via sign tests of fields: When
1195      * ac is negative, there are not enough active workers, when tc is
1196      * negative, there are not enough total workers, and when e is
1197      * negative, the pool is terminating.  To deal with these possibly
1198      * negative fields, we use casts in and out of "short" and/or
1199      * signed shifts to maintain signedness.
1200      *
1201      * When a thread is queued (inactivated), its eventCount field is
1202      * set negative, which is the only way to tell if a worker is
1203      * prevented from executing tasks, even though it must continue to
1204      * scan for them to avoid queuing races. Note however that
1205      * eventCount updates lag releases so usage requires care.
1206      *
1207      * Field plock is an int packed with:
1208      * SHUTDOWN: true if shutdown is enabled (1 bit)
1209      * SEQ:  a sequence lock, with PL_LOCK bit set if locked (30 bits)
1210      * SIGNAL: set when threads may be waiting on the lock (1 bit)
1211      *
1212      * The sequence number enables simple consistency checks:
1213      * Staleness of read-only operations on the workQueues array can
1214      * be checked by comparing plock before vs after the reads.
1215      */
1216 
1217     // bit positions/shifts for fields
1218     private static final int  AC_SHIFT   = 48;
1219     private static final int  TC_SHIFT   = 32;
1220     private static final int  ST_SHIFT   = 31;
1221     private static final int  EC_SHIFT   = 16;
1222 
1223     // bounds
1224     private static final int  SMASK      = 0xffff;  // short bits
1225     private static final int  MAX_CAP    = 0x7fff;  // max #workers - 1
1226     private static final int  EVENMASK   = 0xfffe;  // even short bits
1227     private static final int  SQMASK     = 0x007e;  // max 64 (even) slots
1228     private static final int  SHORT_SIGN = 1 << 15;
1229     private static final int  INT_SIGN   = 1 << 31;
1230 
1231     // masks
1232     private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
1233     private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
1234     private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
1235 
1236     // units for incrementing and decrementing
1237     private static final long TC_UNIT    = 1L << TC_SHIFT;
1238     private static final long AC_UNIT    = 1L << AC_SHIFT;
1239 
1240     // masks and units for dealing with u = (int)(ctl >>> 32)
1241     private static final int  UAC_SHIFT  = AC_SHIFT - 32;
1242     private static final int  UTC_SHIFT  = TC_SHIFT - 32;
1243     private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
1244     private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
1245     private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
1246     private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
1247 
1248     // masks and units for dealing with e = (int)ctl
1249     private static final int E_MASK      = 0x7fffffff; // no STOP_BIT
1250     private static final int E_SEQ       = 1 << EC_SHIFT;
1251 
1252     // plock bits
1253     private static final int SHUTDOWN    = 1 << 31;
1254     private static final int PL_LOCK     = 2;
1255     private static final int PL_SIGNAL   = 1;
1256     private static final int PL_SPINS    = 1 << 8;
1257 
1258     // access mode for WorkQueue
1259     static final int LIFO_QUEUE          =  0;
1260     static final int FIFO_QUEUE          =  1;
1261     static final int SHARED_QUEUE        = -1;
1262 
1263     // Instance fields
1264     volatile long stealCount;                  // collects worker counts
1265     volatile long ctl;                         // main pool control
1266     volatile int plock;                        // shutdown status and seqLock
1267     volatile int indexSeed;                    // worker/submitter index seed
1268     final short parallelism;                   // parallelism level
1269     final short mode;                          // LIFO/FIFO
1270     WorkQueue[] workQueues;                    // main registry
1271     final ForkJoinWorkerThreadFactory factory;
1272     final UncaughtExceptionHandler ueh;        // per-worker UEH
1273     final String workerNamePrefix;             // to create worker name string
1274 
1275     /**
1276      * Acquires the plock lock to protect worker array and related
1277      * updates. This method is called only if an initial CAS on plock
1278      * fails. This acts as a spinlock for normal cases, but falls back
1279      * to builtin monitor to block when (rarely) needed. This would be
1280      * a terrible idea for a highly contended lock, but works fine as
1281      * a more conservative alternative to a pure spinlock.
1282      */
1283     private int acquirePlock() {
1284         int spins = PL_SPINS, ps, nps;
1285         for (;;) {
1286             if (((ps = plock) & PL_LOCK) == 0 &&
1287                 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
1288                 return nps;
1289             else if (spins >= 0) {
1290                 if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1291                     --spins;
1292             }
1293             else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
1294                 synchronized (this) {
1295                     if ((plock & PL_SIGNAL) != 0) {
1296                         try {
1297                             wait();
1298                         } catch (InterruptedException ie) {
1299                             try {
1300                                 Thread.currentThread().interrupt();
1301                             } catch (SecurityException ignore) {
1302                             }
1303                         }
1304                     }
1305                     else
1306                         notifyAll();
1307                 }
1308             }
1309         }
1310     }
1311 
1312     /**
1313      * Unlocks and signals any thread waiting for plock. Called only
1314      * when CAS of seq value for unlock fails.
1315      */
1316     private void releasePlock(int ps) {
1317         plock = ps;
1318         synchronized (this) { notifyAll(); }
1319     }
1320 
1321     /**
1322      * Tries to create and start one worker if fewer than target
1323      * parallelism level exist. Adjusts counts etc on failure.
1324      */
1325     private void tryAddWorker() {
1326         long c; int u, e;
1327         while ((u = (int)((c = ctl) >>> 32)) < 0 &&
1328                (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) {
1329             long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) |
1330                               ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e;
1331             if (U.compareAndSwapLong(this, CTL, c, nc)) {
1332                 ForkJoinWorkerThreadFactory fac;
1333                 Throwable ex = null;
1334                 ForkJoinWorkerThread wt = null;
1335                 try {
1336                     if ((fac = factory) != null &&
1337                         (wt = fac.newThread(this)) != null) {
1338                         wt.start();
1339                         break;
1340                     }
1341                 } catch (Throwable rex) {
1342                     ex = rex;
1343                 }
1344                 deregisterWorker(wt, ex);
1345                 break;
1346             }
1347         }
1348     }
1349 
1350     //  Registering and deregistering workers
1351 
1352     /**
1353      * Callback from ForkJoinWorkerThread to establish and record its
1354      * WorkQueue. To avoid scanning bias due to packing entries in
1355      * front of the workQueues array, we treat the array as a simple
1356      * power-of-two hash table using per-thread seed as hash,
1357      * expanding as needed.
1358      *
1359      * @param wt the worker thread
1360      * @return the worker's queue
1361      */
1362     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1363         UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
1364         wt.setDaemon(true);
1365         if ((handler = ueh) != null)
1366             wt.setUncaughtExceptionHandler(handler);
1367         do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
1368                                           s += SEED_INCREMENT) ||
1369                      s == 0); // skip 0
1370         WorkQueue w = new WorkQueue(this, wt, mode, s);
1371         if (((ps = plock) & PL_LOCK) != 0 ||
1372             !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1373             ps = acquirePlock();
1374         int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1375         try {
1376             if ((ws = workQueues) != null) {    // skip if shutting down
1377                 int n = ws.length, m = n - 1;
1378                 int r = (s << 1) | 1;           // use odd-numbered indices
1379                 if (ws[r &= m] != null) {       // collision
1380                     int probes = 0;             // step by approx half size
1381                     int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1382                     while (ws[r = (r + step) & m] != null) {
1383                         if (++probes >= n) {
1384                             workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1385                             m = n - 1;
1386                             probes = 0;
1387                         }
1388                     }
1389                 }
1390                 w.poolIndex = (short)r;
1391                 w.eventCount = r; // volatile write orders
1392                 ws[r] = w;
1393             }
1394         } finally {
1395             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1396                 releasePlock(nps);
1397         }
1398         wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1)));
1399         return w;
1400     }
1401 
1402     /**
1403      * Final callback from terminating worker, as well as upon failure
1404      * to construct or start a worker.  Removes record of worker from
1405      * array, and adjusts counts. If pool is shutting down, tries to
1406      * complete termination.
1407      *
1408      * @param wt the worker thread, or null if construction failed
1409      * @param ex the exception causing failure, or null if none
1410      */
1411     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1412         WorkQueue w = null;
1413         if (wt != null && (w = wt.workQueue) != null) {
1414             int ps;
1415             w.qlock = -1;                // ensure set
1416             U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
1417             if (((ps = plock) & PL_LOCK) != 0 ||
1418                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1419                 ps = acquirePlock();
1420             int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1421             try {
1422                 int idx = w.poolIndex;
1423                 WorkQueue[] ws = workQueues;
1424                 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
1425                     ws[idx] = null;
1426             } finally {
1427                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1428                     releasePlock(nps);
1429             }
1430         }
1431 
1432         long c;                          // adjust ctl counts
1433         do {} while (!U.compareAndSwapLong
1434                      (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
1435                                            ((c - TC_UNIT) & TC_MASK) |
1436                                            (c & ~(AC_MASK|TC_MASK)))));
1437 
1438         if (!tryTerminate(false, false) && w != null && w.array != null) {
1439             w.cancelAll();               // cancel remaining tasks
1440             WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
1441             while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
1442                 if (e > 0) {             // activate or create replacement
1443                     if ((ws = workQueues) == null ||
1444                         (i = e & SMASK) >= ws.length ||
1445                         (v = ws[i]) == null)
1446                         break;
1447                     long nc = (((long)(v.nextWait & E_MASK)) |
1448                                ((long)(u + UAC_UNIT) << 32));
1449                     if (v.eventCount != (e | INT_SIGN))
1450                         break;
1451                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1452                         v.eventCount = (e + E_SEQ) & E_MASK;
1453                         if ((p = v.parker) != null)
1454                             U.unpark(p);
1455                         break;
1456                     }
1457                 }
1458                 else {
1459                     if ((short)u < 0)
1460                         tryAddWorker();
1461                     break;
1462                 }
1463             }
1464         }
1465         if (ex == null)                     // help clean refs on way out
1466             ForkJoinTask.helpExpungeStaleExceptions();
1467         else                                // rethrow
1468             ForkJoinTask.rethrow(ex);
1469     }
1470 
1471     // Submissions
1472 
1473     /**
1474      * Unless shutting down, adds the given task to a submission queue
1475      * at submitter's current queue index (modulo submission
1476      * range). Only the most common path is directly handled in this
1477      * method. All others are relayed to fullExternalPush.
1478      *
1479      * @param task the task. Caller must ensure non-null.
1480      */
1481     final void externalPush(ForkJoinTask<?> task) {
1482         WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a;
1483         int r = ThreadLocalRandom.getProbe();
1484         int ps = plock;
1485         WorkQueue[] ws = workQueues;
1486         if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
1487             (q = ws[m & r & SQMASK]) != null && r != 0 &&
1488             U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1489             if ((a = q.array) != null &&
1490                 (am = a.length - 1) > (n = (s = q.top) - q.base)) {
1491                 int j = ((am & s) << ASHIFT) + ABASE;
1492                 U.putOrderedObject(a, j, task);
1493                 q.top = s + 1;                     // push on to deque
1494                 q.qlock = 0;
1495                 if (n <= 1)
1496                     signalWork(ws, q);
1497                 return;
1498             }
1499             q.qlock = 0;
1500         }
1501         fullExternalPush(task);
1502     }
1503 
1504     /**
1505      * Full version of externalPush. This method is called, among
1506      * other times, upon the first submission of the first task to the
1507      * pool, so must perform secondary initialization.  It also
1508      * detects first submission by an external thread by looking up
1509      * its ThreadLocal, and creates a new shared queue if the one at
1510      * index if empty or contended. The plock lock body must be
1511      * exception-free (so no try/finally) so we optimistically
1512      * allocate new queues outside the lock and throw them away if
1513      * (very rarely) not needed.
1514      *
1515      * Secondary initialization occurs when plock is zero, to create
1516      * workQueue array and set plock to a valid value.  This lock body
1517      * must also be exception-free. Because the plock seq value can
1518      * eventually wrap around zero, this method harmlessly fails to
1519      * reinitialize if workQueues exists, while still advancing plock.
1520      */
1521     private void fullExternalPush(ForkJoinTask<?> task) {
1522         int r;
1523         if ((r = ThreadLocalRandom.getProbe()) == 0) {
1524             ThreadLocalRandom.localInit();
1525             r = ThreadLocalRandom.getProbe();
1526         }
1527         for (;;) {
1528             WorkQueue[] ws; WorkQueue q; int ps, m, k;
1529             boolean move = false;
1530             if ((ps = plock) < 0)
1531                 throw new RejectedExecutionException();
1532             else if (ps == 0 || (ws = workQueues) == null ||
1533                      (m = ws.length - 1) < 0) { // initialize workQueues
1534                 int p = parallelism;            // find power of two table size
1535                 int n = (p > 1) ? p - 1 : 1;    // ensure at least 2 slots
1536                 n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
1537                 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
1538                 WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
1539                                    new WorkQueue[n] : null);
1540                 if (((ps = plock) & PL_LOCK) != 0 ||
1541                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1542                     ps = acquirePlock();
1543                 if (((ws = workQueues) == null || ws.length == 0) && nws != null)
1544                     workQueues = nws;
1545                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1546                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1547                     releasePlock(nps);
1548             }
1549             else if ((q = ws[k = r & m & SQMASK]) != null) {
1550                 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
1551                     ForkJoinTask<?>[] a = q.array;
1552                     int s = q.top;
1553                     boolean submitted = false;
1554                     try {                      // locked version of push
1555                         if ((a != null && a.length > s + 1 - q.base) ||
1556                             (a = q.growArray()) != null) {   // must presize
1557                             int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
1558                             U.putOrderedObject(a, j, task);
1559                             q.top = s + 1;
1560                             submitted = true;
1561                         }
1562                     } finally {
1563                         q.qlock = 0;  // unlock
1564                     }
1565                     if (submitted) {
1566                         signalWork(ws, q);
1567                         return;
1568                     }
1569                 }
1570                 move = true; // move on failure
1571             }
1572             else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1573                 q = new WorkQueue(this, null, SHARED_QUEUE, r);
1574                 q.poolIndex = (short)k;
1575                 if (((ps = plock) & PL_LOCK) != 0 ||
1576                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1577                     ps = acquirePlock();
1578                 if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
1579                     ws[k] = q;
1580                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1581                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1582                     releasePlock(nps);
1583             }
1584             else
1585                 move = true; // move if busy
1586             if (move)
1587                 r = ThreadLocalRandom.advanceProbe(r);
1588         }
1589     }
1590 
1591     // Maintaining ctl counts
1592 
1593     /**
1594      * Increments active count; mainly called upon return from blocking.
1595      */
1596     final void incrementActiveCount() {
1597         long c;
1598         do {} while (!U.compareAndSwapLong
1599                      (this, CTL, c = ctl, ((c & ~AC_MASK) |
1600                                            ((c & AC_MASK) + AC_UNIT))));
1601     }
1602 
1603     /**
1604      * Tries to create or activate a worker if too few are active.
1605      *
1606      * @param ws the worker array to use to find signallees
1607      * @param q if non-null, the queue holding tasks to be processed
1608      */
1609     final void signalWork(WorkQueue[] ws, WorkQueue q) {
1610         for (;;) {
1611             long c; int e, u, i; WorkQueue w; Thread p;
1612             if ((u = (int)((c = ctl) >>> 32)) >= 0)
1613                 break;
1614             if ((e = (int)c) <= 0) {
1615                 if ((short)u < 0)
1616                     tryAddWorker();
1617                 break;
1618             }
1619             if (ws == null || ws.length <= (i = e & SMASK) ||
1620                 (w = ws[i]) == null)
1621                 break;
1622             long nc = (((long)(w.nextWait & E_MASK)) |
1623                        ((long)(u + UAC_UNIT)) << 32);
1624             int ne = (e + E_SEQ) & E_MASK;
1625             if (w.eventCount == (e | INT_SIGN) &&
1626                 U.compareAndSwapLong(this, CTL, c, nc)) {
1627                 w.eventCount = ne;
1628                 if ((p = w.parker) != null)
1629                     U.unpark(p);
1630                 break;
1631             }
1632             if (q != null && q.base >= q.top)
1633                 break;
1634         }
1635     }
1636 
1637     // Scanning for tasks
1638 
1639     /**
1640      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1641      */
1642     final void runWorker(WorkQueue w) {
1643         w.growArray(); // allocate queue
1644         for (int r = w.hint; scan(w, r) == 0; ) {
1645             r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1646         }
1647     }
1648 
1649     /**
1650      * Scans for and, if found, runs one task, else possibly
1651      * inactivates the worker. This method operates on single reads of
1652      * volatile state and is designed to be re-invoked continuously,
1653      * in part because it returns upon detecting inconsistencies,
1654      * contention, or state changes that indicate possible success on
1655      * re-invocation.
1656      *
1657      * The scan searches for tasks across queues starting at a random
1658      * index, checking each at least twice.  The scan terminates upon
1659      * either finding a non-empty queue, or completing the sweep. If
1660      * the worker is not inactivated, it takes and runs a task from
1661      * this queue. Otherwise, if not activated, it tries to activate
1662      * itself or some other worker by signalling. On failure to find a
1663      * task, returns (for retry) if pool state may have changed during
1664      * an empty scan, or tries to inactivate if active, else possibly
1665      * blocks or terminates via method awaitWork.
1666      *
1667      * @param w the worker (via its WorkQueue)
1668      * @param r a random seed
1669      * @return worker qlock status if would have waited, else 0
1670      */
1671     private final int scan(WorkQueue w, int r) {
1672         WorkQueue[] ws; int m;
1673         long c = ctl;                            // for consistency check
1674         if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
1675             for (int j = m + m + 1, ec = w.eventCount;;) {
1676                 WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1677                 if ((q = ws[(r - j) & m]) != null &&
1678                     (b = q.base) - q.top < 0 && (a = q.array) != null) {
1679                     long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1680                     if ((t = ((ForkJoinTask<?>)
1681                               U.getObjectVolatile(a, i))) != null) {
1682                         if (ec < 0)
1683                             helpRelease(c, ws, w, q, b);
1684                         else if (q.base == b &&
1685                                  U.compareAndSwapObject(a, i, t, null)) {
1686                             U.putOrderedInt(q, QBASE, b + 1);
1687                             if ((b + 1) - q.top < 0)
1688                                 signalWork(ws, q);
1689                             w.runTask(t);
1690                         }
1691                     }
1692                     break;
1693                 }
1694                 else if (--j < 0) {
1695                     if ((ec | (e = (int)c)) < 0) // inactive or terminating
1696                         return awaitWork(w, c, ec);
1697                     else if (ctl == c) {         // try to inactivate and enqueue
1698                         long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
1699                         w.nextWait = e;
1700                         w.eventCount = ec | INT_SIGN;
1701                         if (!U.compareAndSwapLong(this, CTL, c, nc))
1702                             w.eventCount = ec;   // back out
1703                     }
1704                     break;
1705                 }
1706             }
1707         }
1708         return 0;
1709     }
1710 
1711     /**
1712      * A continuation of scan(), possibly blocking or terminating
1713      * worker w. Returns without blocking if pool state has apparently
1714      * changed since last invocation.  Also, if inactivating w has
1715      * caused the pool to become quiescent, checks for pool
1716      * termination, and, so long as this is not the only worker, waits
1717      * for event for up to a given duration.  On timeout, if ctl has
1718      * not changed, terminates the worker, which will in turn wake up
1719      * another worker to possibly repeat this process.
1720      *
1721      * @param w the calling worker
1722      * @param c the ctl value on entry to scan
1723      * @param ec the worker's eventCount on entry to scan
1724      */
1725     private final int awaitWork(WorkQueue w, long c, int ec) {
1726         int stat, ns; long parkTime, deadline;
1727         if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c &&
1728             !Thread.interrupted()) {
1729             int e = (int)c;
1730             int u = (int)(c >>> 32);
1731             int d = (u >> UAC_SHIFT) + parallelism; // active count
1732 
1733             if (e < 0 || (d <= 0 && tryTerminate(false, false)))
1734                 stat = w.qlock = -1;          // pool is terminating
1735             else if ((ns = w.nsteals) != 0) { // collect steals and retry
1736                 w.nsteals = 0;
1737                 U.getAndAddLong(this, STEALCOUNT, (long)ns);
1738             }
1739             else {
1740                 long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L :
1741                            ((long)(w.nextWait & E_MASK)) | // ctl to restore
1742                            ((long)(u + UAC_UNIT)) << 32);
1743                 if (pc != 0L) {               // timed wait if last waiter
1744                     int dc = -(short)(c >>> TC_SHIFT);
1745                     parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT:
1746                                 (dc + 1) * IDLE_TIMEOUT);
1747                     deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1748                 }
1749                 else
1750                     parkTime = deadline = 0L;
1751                 if (w.eventCount == ec && ctl == c) {
1752                     Thread wt = Thread.currentThread();
1753                     U.putObject(wt, PARKBLOCKER, this);
1754                     w.parker = wt;            // emulate LockSupport.park
1755                     if (w.eventCount == ec && ctl == c)
1756                         U.park(false, parkTime);  // must recheck before park
1757                     w.parker = null;
1758                     U.putObject(wt, PARKBLOCKER, null);
1759                     if (parkTime != 0L && ctl == c &&
1760                         deadline - System.nanoTime() <= 0L &&
1761                         U.compareAndSwapLong(this, CTL, c, pc))
1762                         stat = w.qlock = -1;  // shrink pool
1763                 }
1764             }
1765         }
1766         return stat;
1767     }
1768 
1769     /**
1770      * Possibly releases (signals) a worker. Called only from scan()
1771      * when a worker with apparently inactive status finds a non-empty
1772      * queue. This requires revalidating all of the associated state
1773      * from caller.
1774      */
1775     private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w,
1776                                    WorkQueue q, int b) {
1777         WorkQueue v; int e, i; Thread p;
1778         if (w != null && w.eventCount < 0 && (e = (int)c) > 0 &&
1779             ws != null && ws.length > (i = e & SMASK) &&
1780             (v = ws[i]) != null && ctl == c) {
1781             long nc = (((long)(v.nextWait & E_MASK)) |
1782                        ((long)((int)(c >>> 32) + UAC_UNIT)) << 32);
1783             int ne = (e + E_SEQ) & E_MASK;
1784             if (q != null && q.base == b && w.eventCount < 0 &&
1785                 v.eventCount == (e | INT_SIGN) &&
1786                 U.compareAndSwapLong(this, CTL, c, nc)) {
1787                 v.eventCount = ne;
1788                 if ((p = v.parker) != null)
1789                     U.unpark(p);
1790             }
1791         }
1792     }
1793 
1794     /**
1795      * Tries to locate and execute tasks for a stealer of the given
1796      * task, or in turn one of its stealers, Traces currentSteal ->
1797      * currentJoin links looking for a thread working on a descendant
1798      * of the given task and with a non-empty queue to steal back and
1799      * execute tasks from. The first call to this method upon a
1800      * waiting join will often entail scanning/search, (which is OK
1801      * because the joiner has nothing better to do), but this method
1802      * leaves hints in workers to speed up subsequent calls. The
1803      * implementation is very branchy to cope with potential
1804      * inconsistencies or loops encountering chains that are stale,
1805      * unknown, or so long that they are likely cyclic.
1806      *
1807      * @param joiner the joining worker
1808      * @param task the task to join
1809      * @return 0 if no progress can be made, negative if task
1810      * known complete, else positive
1811      */
1812     private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1813         int stat = 0, steps = 0;                    // bound to avoid cycles
1814         if (task != null && joiner != null &&
1815             joiner.base - joiner.top >= 0) {        // hoist checks
1816             restart: for (;;) {
1817                 ForkJoinTask<?> subtask = task;     // current target
1818                 for (WorkQueue j = joiner, v;;) {   // v is stealer of subtask
1819                     WorkQueue[] ws; int m, s, h;
1820                     if ((s = task.status) < 0) {
1821                         stat = s;
1822                         break restart;
1823                     }
1824                     if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
1825                         break restart;              // shutting down
1826                     if ((v = ws[h = (j.hint | 1) & m]) == null ||
1827                         v.currentSteal != subtask) {
1828                         for (int origin = h;;) {    // find stealer
1829                             if (((h = (h + 2) & m) & 15) == 1 &&
1830                                 (subtask.status < 0 || j.currentJoin != subtask))
1831                                 continue restart;   // occasional staleness check
1832                             if ((v = ws[h]) != null &&
1833                                 v.currentSteal == subtask) {
1834                                 j.hint = h;        // save hint
1835                                 break;
1836                             }
1837                             if (h == origin)
1838                                 break restart;      // cannot find stealer
1839                         }
1840                     }
1841                     for (;;) { // help stealer or descend to its stealer
1842                         ForkJoinTask<?>[] a; int b;
1843                         if (subtask.status < 0)     // surround probes with
1844                             continue restart;       //   consistency checks
1845                         if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
1846                             int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1847                             ForkJoinTask<?> t =
1848                                 (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1849                             if (subtask.status < 0 || j.currentJoin != subtask ||
1850                                 v.currentSteal != subtask)
1851                                 continue restart;   // stale
1852                             stat = 1;               // apparent progress
1853                             if (v.base == b) {
1854                                 if (t == null)
1855                                     break restart;
1856                                 if (U.compareAndSwapObject(a, i, t, null)) {
1857                                     U.putOrderedInt(v, QBASE, b + 1);
1858                                     ForkJoinTask<?> ps = joiner.currentSteal;
1859                                     int jt = joiner.top;
1860                                     do {
1861                                         joiner.currentSteal = t;
1862                                         t.doExec(); // clear local tasks too
1863                                     } while (task.status >= 0 &&
1864                                              joiner.top != jt &&
1865                                              (t = joiner.pop()) != null);
1866                                     joiner.currentSteal = ps;
1867                                     break restart;
1868                                 }
1869                             }
1870                         }
1871                         else {                      // empty -- try to descend
1872                             ForkJoinTask<?> next = v.currentJoin;
1873                             if (subtask.status < 0 || j.currentJoin != subtask ||
1874                                 v.currentSteal != subtask)
1875                                 continue restart;   // stale
1876                             else if (next == null || ++steps == MAX_HELP)
1877                                 break restart;      // dead-end or maybe cyclic
1878                             else {
1879                                 subtask = next;
1880                                 j = v;
1881                                 break;
1882                             }
1883                         }
1884                     }
1885                 }
1886             }
1887         }
1888         return stat;
1889     }
1890 
1891     /**
1892      * Analog of tryHelpStealer for CountedCompleters. Tries to steal
1893      * and run tasks within the target's computation.
1894      *
1895      * @param task the task to join
1896      * @param maxTasks the maximum number of other tasks to run
1897      */
1898     final int helpComplete(WorkQueue joiner, CountedCompleter<?> task,
1899                            int maxTasks) {
1900         WorkQueue[] ws; int m;
1901         int s = 0;
1902         if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
1903             joiner != null && task != null) {
1904             int j = joiner.poolIndex;
1905             int scans = m + m + 1;
1906             long c = 0L;              // for stability check
1907             for (int k = scans; ; j += 2) {
1908                 WorkQueue q;
1909                 if ((s = task.status) < 0)
1910                     break;
1911                 else if (joiner.internalPopAndExecCC(task)) {
1912                     if (--maxTasks <= 0) {
1913                         s = task.status;
1914                         break;
1915                     }
1916                     k = scans;
1917                 }
1918                 else if ((s = task.status) < 0)
1919                     break;
1920                 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
1921                     if (--maxTasks <= 0) {
1922                         s = task.status;
1923                         break;
1924                     }
1925                     k = scans;
1926                 }
1927                 else if (--k < 0) {
1928                     if (c == (c = ctl))
1929                         break;
1930                     k = scans;
1931                 }
1932             }
1933         }
1934         return s;
1935     }
1936 
1937     /**
1938      * Tries to decrement active count (sometimes implicitly) and
1939      * possibly release or create a compensating worker in preparation
1940      * for blocking. Fails on contention or termination. Otherwise,
1941      * adds a new thread if no idle workers are available and pool
1942      * may become starved.
1943      *
1944      * @param c the assumed ctl value
1945      */
1946     final boolean tryCompensate(long c) {
1947         WorkQueue[] ws = workQueues;
1948         int pc = parallelism, e = (int)c, m, tc;
1949         if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) {
1950             WorkQueue w = ws[e & m];
1951             if (e != 0 && w != null) {
1952                 Thread p;
1953                 long nc = ((long)(w.nextWait & E_MASK) |
1954                            (c & (AC_MASK|TC_MASK)));
1955                 int ne = (e + E_SEQ) & E_MASK;
1956                 if (w.eventCount == (e | INT_SIGN) &&
1957                     U.compareAndSwapLong(this, CTL, c, nc)) {
1958                     w.eventCount = ne;
1959                     if ((p = w.parker) != null)
1960                         U.unpark(p);
1961                     return true;   // replace with idle worker
1962                 }
1963             }
1964             else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
1965                      (int)(c >> AC_SHIFT) + pc > 1) {
1966                 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
1967                 if (U.compareAndSwapLong(this, CTL, c, nc))
1968                     return true;   // no compensation
1969             }
1970             else if (tc + pc < MAX_CAP) {
1971                 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1972                 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1973                     ForkJoinWorkerThreadFactory fac;
1974                     Throwable ex = null;
1975                     ForkJoinWorkerThread wt = null;
1976                     try {
1977                         if ((fac = factory) != null &&
1978                             (wt = fac.newThread(this)) != null) {
1979                             wt.start();
1980                             return true;
1981                         }
1982                     } catch (Throwable rex) {
1983                         ex = rex;
1984                     }
1985                     deregisterWorker(wt, ex); // clean up and return false
1986                 }
1987             }
1988         }
1989         return false;
1990     }
1991 
1992     /**
1993      * Helps and/or blocks until the given task is done.
1994      *
1995      * @param joiner the joining worker
1996      * @param task the task
1997      * @return task status on exit
1998      */
1999     final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
2000         int s = 0;
2001         if (task != null && (s = task.status) >= 0 && joiner != null) {
2002             ForkJoinTask<?> prevJoin = joiner.currentJoin;
2003             joiner.currentJoin = task;
2004             do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
2005                          (s = task.status) >= 0);
2006             if (s >= 0 && (task instanceof CountedCompleter))
2007                 s = helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
2008             long cc = 0;        // for stability checks
2009             while (s >= 0 && (s = task.status) >= 0) {
2010                 if ((s = tryHelpStealer(joiner, task)) == 0 &&
2011                     (s = task.status) >= 0) {
2012                     if (!tryCompensate(cc))
2013                         cc = ctl;
2014                     else {
2015                         if (task.trySetSignal() && (s = task.status) >= 0) {
2016                             synchronized (task) {
2017                                 if (task.status >= 0) {
2018                                     try {                // see ForkJoinTask
2019                                         task.wait();     //  for explanation
2020                                     } catch (InterruptedException ie) {
2021                                     }
2022                                 }
2023                                 else
2024                                     task.notifyAll();
2025                             }
2026                         }
2027                         long c; // reactivate
2028                         do {} while (!U.compareAndSwapLong
2029                                      (this, CTL, c = ctl,
2030                                       ((c & ~AC_MASK) |
2031                                        ((c & AC_MASK) + AC_UNIT))));
2032                     }
2033                 }
2034             }
2035             joiner.currentJoin = prevJoin;
2036         }
2037         return s;
2038     }
2039 
2040     /**
2041      * Stripped-down variant of awaitJoin used by timed joins. Tries
2042      * to help join only while there is continuous progress. (Caller
2043      * will then enter a timed wait.)
2044      *
2045      * @param joiner the joining worker
2046      * @param task the task
2047      */
2048     final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) {
2049         int s;
2050         if (joiner != null && task != null && (s = task.status) >= 0) {
2051             ForkJoinTask<?> prevJoin = joiner.currentJoin;
2052             joiner.currentJoin = task;
2053             do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
2054                          (s = task.status) >= 0);
2055             if (s >= 0) {
2056                 if (task instanceof CountedCompleter)
2057                     helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE);
2058                 do {} while (task.status >= 0 &&
2059                              tryHelpStealer(joiner, task) > 0);
2060             }
2061             joiner.currentJoin = prevJoin;
2062         }
2063     }
2064 
2065     /**
2066      * Returns a (probably) non-empty steal queue, if one is found
2067      * during a scan, else null.  This method must be retried by
2068      * caller if, by the time it tries to use the queue, it is empty.
2069      */
2070     private WorkQueue findNonEmptyStealQueue() {
2071         int r = ThreadLocalRandom.nextSecondarySeed();
2072         for (;;) {
2073             int ps = plock, m; WorkQueue[] ws; WorkQueue q;
2074             if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
2075                 for (int j = (m + 1) << 2; j >= 0; --j) {
2076                     if ((q = ws[(((r - j) << 1) | 1) & m]) != null &&
2077                         q.base - q.top < 0)
2078                         return q;
2079                 }
2080             }
2081             if (plock == ps)
2082                 return null;
2083         }
2084     }
2085 
2086     /**
2087      * Runs tasks until {@code isQuiescent()}. We piggyback on
2088      * active count ctl maintenance, but rather than blocking
2089      * when tasks cannot be found, we rescan until all others cannot
2090      * find tasks either.
2091      */
2092     final void helpQuiescePool(WorkQueue w) {
2093         ForkJoinTask<?> ps = w.currentSteal;
2094         for (boolean active = true;;) {
2095             long c; WorkQueue q; ForkJoinTask<?> t; int b;
2096             while ((t = w.nextLocalTask()) != null)
2097                 t.doExec();
2098             if ((q = findNonEmptyStealQueue()) != null) {
2099                 if (!active) {      // re-establish active count
2100                     active = true;
2101                     do {} while (!U.compareAndSwapLong
2102                                  (this, CTL, c = ctl,
2103                                   ((c & ~AC_MASK) |
2104                                    ((c & AC_MASK) + AC_UNIT))));
2105                 }
2106                 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2107                     w.runTask(t);
2108             }
2109             else if (active) {      // decrement active count without queuing
2110                 long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT);
2111                 if ((int)(nc >> AC_SHIFT) + parallelism == 0)
2112                     break;          // bypass decrement-then-increment
2113                 if (U.compareAndSwapLong(this, CTL, c, nc))
2114                     active = false;
2115             }
2116             else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 &&
2117                      U.compareAndSwapLong
2118                      (this, CTL, c, ((c & ~AC_MASK) |
2119                                      ((c & AC_MASK) + AC_UNIT))))
2120                 break;
2121         }
2122     }
2123 
2124     /**
2125      * Gets and removes a local or stolen task for the given worker.
2126      *
2127      * @return a task, if available
2128      */
2129     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2130         for (ForkJoinTask<?> t;;) {
2131             WorkQueue q; int b;
2132             if ((t = w.nextLocalTask()) != null)
2133                 return t;
2134             if ((q = findNonEmptyStealQueue()) == null)
2135                 return null;
2136             if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2137                 return t;
2138         }
2139     }
2140 
2141     /**
2142      * Returns a cheap heuristic guide for task partitioning when
2143      * programmers, frameworks, tools, or languages have little or no
2144      * idea about task granularity.  In essence by offering this
2145      * method, we ask users only about tradeoffs in overhead vs
2146      * expected throughput and its variance, rather than how finely to
2147      * partition tasks.
2148      *
2149      * In a steady state strict (tree-structured) computation, each
2150      * thread makes available for stealing enough tasks for other
2151      * threads to remain active. Inductively, if all threads play by
2152      * the same rules, each thread should make available only a
2153      * constant number of tasks.
2154      *
2155      * The minimum useful constant is just 1. But using a value of 1
2156      * would require immediate replenishment upon each steal to
2157      * maintain enough tasks, which is infeasible.  Further,
2158      * partitionings/granularities of offered tasks should minimize
2159      * steal rates, which in general means that threads nearer the top
2160      * of computation tree should generate more than those nearer the
2161      * bottom. In perfect steady state, each thread is at
2162      * approximately the same level of computation tree. However,
2163      * producing extra tasks amortizes the uncertainty of progress and
2164      * diffusion assumptions.
2165      *
2166      * So, users will want to use values larger (but not much larger)
2167      * than 1 to both smooth over transient shortages and hedge
2168      * against uneven progress; as traded off against the cost of
2169      * extra task overhead. We leave the user to pick a threshold
2170      * value to compare with the results of this call to guide
2171      * decisions, but recommend values such as 3.
2172      *
2173      * When all threads are active, it is on average OK to estimate
2174      * surplus strictly locally. In steady-state, if one thread is
2175      * maintaining say 2 surplus tasks, then so are others. So we can
2176      * just use estimated queue length.  However, this strategy alone
2177      * leads to serious mis-estimates in some non-steady-state
2178      * conditions (ramp-up, ramp-down, other stalls). We can detect
2179      * many of these by further considering the number of "idle"
2180      * threads, that are known to have zero queued tasks, so
2181      * compensate by a factor of (#idle/#active) threads.
2182      *
2183      * Note: The approximation of #busy workers as #active workers is
2184      * not very good under current signalling scheme, and should be
2185      * improved.
2186      */
2187     static int getSurplusQueuedTaskCount() {
2188         Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2189         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
2190             int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism;
2191             int n = (q = wt.workQueue).top - q.base;
2192             int a = (int)(pool.ctl >> AC_SHIFT) + p;
2193             return n - (a > (p >>>= 1) ? 0 :
2194                         a > (p >>>= 1) ? 1 :
2195                         a > (p >>>= 1) ? 2 :
2196                         a > (p >>>= 1) ? 4 :
2197                         8);
2198         }
2199         return 0;
2200     }
2201 
2202     //  Termination
2203 
2204     /**
2205      * Possibly initiates and/or completes termination.  The caller
2206      * triggering termination runs three passes through workQueues:
2207      * (0) Setting termination status, followed by wakeups of queued
2208      * workers; (1) cancelling all tasks; (2) interrupting lagging
2209      * threads (likely in external tasks, but possibly also blocked in
2210      * joins).  Each pass repeats previous steps because of potential
2211      * lagging thread creation.
2212      *
2213      * @param now if true, unconditionally terminate, else only
2214      * if no work and no active workers
2215      * @param enable if true, enable shutdown when next possible
2216      * @return true if now terminating or terminated
2217      */
2218     private boolean tryTerminate(boolean now, boolean enable) {
2219         int ps;
2220         if (this == common)                        // cannot shut down
2221             return false;
2222         if ((ps = plock) >= 0) {                   // enable by setting plock
2223             if (!enable)
2224                 return false;
2225             if ((ps & PL_LOCK) != 0 ||
2226                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
2227                 ps = acquirePlock();
2228             int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
2229             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
2230                 releasePlock(nps);
2231         }
2232         for (long c;;) {
2233             if (((c = ctl) & STOP_BIT) != 0) {     // already terminating
2234                 if ((short)(c >>> TC_SHIFT) + parallelism <= 0) {
2235                     synchronized (this) {
2236                         notifyAll();               // signal when 0 workers
2237                     }
2238                 }
2239                 return true;
2240             }
2241             if (!now) {                            // check if idle & no tasks
2242                 WorkQueue[] ws; WorkQueue w;
2243                 if ((int)(c >> AC_SHIFT) + parallelism > 0)
2244                     return false;
2245                 if ((ws = workQueues) != null) {
2246                     for (int i = 0; i < ws.length; ++i) {
2247                         if ((w = ws[i]) != null &&
2248                             (!w.isEmpty() ||
2249                              ((i & 1) != 0 && w.eventCount >= 0))) {
2250                             signalWork(ws, w);
2251                             return false;
2252                         }
2253                     }
2254                 }
2255             }
2256             if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
2257                 for (int pass = 0; pass < 3; ++pass) {
2258                     WorkQueue[] ws; WorkQueue w; Thread wt;
2259                     if ((ws = workQueues) != null) {
2260                         int n = ws.length;
2261                         for (int i = 0; i < n; ++i) {
2262                             if ((w = ws[i]) != null) {
2263                                 w.qlock = -1;
2264                                 if (pass > 0) {
2265                                     w.cancelAll();
2266                                     if (pass > 1 && (wt = w.owner) != null) {
2267                                         if (!wt.isInterrupted()) {
2268                                             try {
2269                                                 wt.interrupt();
2270                                             } catch (Throwable ignore) {
2271                                             }
2272                                         }
2273                                         U.unpark(wt);
2274                                     }
2275                                 }
2276                             }
2277                         }
2278                         // Wake up workers parked on event queue
2279                         int i, e; long cc; Thread p;
2280                         while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
2281                                (i = e & SMASK) < n && i >= 0 &&
2282                                (w = ws[i]) != null) {
2283                             long nc = ((long)(w.nextWait & E_MASK) |
2284                                        ((cc + AC_UNIT) & AC_MASK) |
2285                                        (cc & (TC_MASK|STOP_BIT)));
2286                             if (w.eventCount == (e | INT_SIGN) &&
2287                                 U.compareAndSwapLong(this, CTL, cc, nc)) {
2288                                 w.eventCount = (e + E_SEQ) & E_MASK;
2289                                 w.qlock = -1;
2290                                 if ((p = w.parker) != null)
2291                                     U.unpark(p);
2292                             }
2293                         }
2294                     }
2295                 }
2296             }
2297         }
2298     }
2299 
2300     // external operations on common pool
2301 
2302     /**
2303      * Returns common pool queue for a thread that has submitted at
2304      * least one task.
2305      */
2306     static WorkQueue commonSubmitterQueue() {
2307         ForkJoinPool p; WorkQueue[] ws; int m, z;
2308         return ((z = ThreadLocalRandom.getProbe()) != 0 &&
2309                 (p = common) != null &&
2310                 (ws = p.workQueues) != null &&
2311                 (m = ws.length - 1) >= 0) ?
2312             ws[m & z & SQMASK] : null;
2313     }
2314 
2315     /**
2316      * Tries to pop the given task from submitter's queue in common pool.
2317      */
2318     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
2319         WorkQueue joiner; ForkJoinTask<?>[] a; int m, s;
2320         WorkQueue[] ws = workQueues;
2321         int z = ThreadLocalRandom.getProbe();
2322         boolean popped = false;
2323         if (ws != null && (m = ws.length - 1) >= 0 &&
2324             (joiner = ws[z & m & SQMASK]) != null &&
2325             joiner.base != (s = joiner.top) &&
2326             (a = joiner.array) != null) {
2327             long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
2328             if (U.getObject(a, j) == task &&
2329                 U.compareAndSwapInt(joiner, QLOCK, 0, 1)) {
2330                 if (joiner.top == s && joiner.array == a &&
2331                     U.compareAndSwapObject(a, j, task, null)) {
2332                     joiner.top = s - 1;
2333                     popped = true;
2334                 }
2335                 joiner.qlock = 0;
2336             }
2337         }
2338         return popped;
2339     }
2340 
2341     final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
2342         WorkQueue joiner; int m;
2343         WorkQueue[] ws = workQueues;
2344         int j = ThreadLocalRandom.getProbe();
2345         int s = 0;
2346         if (ws != null && (m = ws.length - 1) >= 0 &&
2347             (joiner = ws[j & m & SQMASK]) != null && task != null) {
2348             int scans = m + m + 1;
2349             long c = 0L;             // for stability check
2350             j |= 1;                  // poll odd queues
2351             for (int k = scans; ; j += 2) {
2352                 WorkQueue q;
2353                 if ((s = task.status) < 0)
2354                     break;
2355                 else if (joiner.externalPopAndExecCC(task)) {
2356                     if (--maxTasks <= 0) {
2357                         s = task.status;
2358                         break;
2359                     }
2360                     k = scans;
2361                 }
2362                 else if ((s = task.status) < 0)
2363                     break;
2364                 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
2365                     if (--maxTasks <= 0) {
2366                         s = task.status;
2367                         break;
2368                     }
2369                     k = scans;
2370                 }
2371                 else if (--k < 0) {
2372                     if (c == (c = ctl))
2373                         break;
2374                     k = scans;
2375                 }
2376             }
2377         }
2378         return s;
2379     }
2380 
2381     // Exported methods
2382 
2383     // Constructors
2384 
2385     /**
2386      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2387      * java.lang.Runtime#availableProcessors}, using the {@linkplain
2388      * #defaultForkJoinWorkerThreadFactory default thread factory},
2389      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2390      *
2391      * @throws SecurityException if a security manager exists and
2392      *         the caller is not permitted to modify threads
2393      *         because it does not hold {@link
2394      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2395      */
2396     public ForkJoinPool() {
2397         this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2398              defaultForkJoinWorkerThreadFactory, null, false);
2399     }
2400 
2401     /**
2402      * Creates a {@code ForkJoinPool} with the indicated parallelism
2403      * level, the {@linkplain
2404      * #defaultForkJoinWorkerThreadFactory default thread factory},
2405      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2406      *
2407      * @param parallelism the parallelism level
2408      * @throws IllegalArgumentException if parallelism less than or
2409      *         equal to zero, or greater than implementation limit
2410      * @throws SecurityException if a security manager exists and
2411      *         the caller is not permitted to modify threads
2412      *         because it does not hold {@link
2413      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2414      */
2415     public ForkJoinPool(int parallelism) {
2416         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2417     }
2418 
2419     /**
2420      * Creates a {@code ForkJoinPool} with the given parameters.
2421      *
2422      * @param parallelism the parallelism level. For default value,
2423      * use {@link java.lang.Runtime#availableProcessors}.
2424      * @param factory the factory for creating new threads. For default value,
2425      * use {@link #defaultForkJoinWorkerThreadFactory}.
2426      * @param handler the handler for internal worker threads that
2427      * terminate due to unrecoverable errors encountered while executing
2428      * tasks. For default value, use {@code null}.
2429      * @param asyncMode if true,
2430      * establishes local first-in-first-out scheduling mode for forked
2431      * tasks that are never joined. This mode may be more appropriate
2432      * than default locally stack-based mode in applications in which
2433      * worker threads only process event-style asynchronous tasks.
2434      * For default value, use {@code false}.
2435      * @throws IllegalArgumentException if parallelism less than or
2436      *         equal to zero, or greater than implementation limit
2437      * @throws NullPointerException if the factory is null
2438      * @throws SecurityException if a security manager exists and
2439      *         the caller is not permitted to modify threads
2440      *         because it does not hold {@link
2441      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2442      */
2443     public ForkJoinPool(int parallelism,
2444                         ForkJoinWorkerThreadFactory factory,
2445                         UncaughtExceptionHandler handler,
2446                         boolean asyncMode) {
2447         this(checkParallelism(parallelism),
2448              checkFactory(factory),
2449              handler,
2450              (asyncMode ? FIFO_QUEUE : LIFO_QUEUE),
2451              "ForkJoinPool-" + nextPoolId() + "-worker-");
2452         checkPermission();
2453     }
2454 
2455     private static int checkParallelism(int parallelism) {
2456         if (parallelism <= 0 || parallelism > MAX_CAP)
2457             throw new IllegalArgumentException();
2458         return parallelism;
2459     }
2460 
2461     private static ForkJoinWorkerThreadFactory checkFactory
2462         (ForkJoinWorkerThreadFactory factory) {
2463         if (factory == null)
2464             throw new NullPointerException();
2465         return factory;
2466     }
2467 
2468     /**
2469      * Creates a {@code ForkJoinPool} with the given parameters, without
2470      * any security checks or parameter validation.  Invoked directly by
2471      * makeCommonPool.
2472      */
2473     private ForkJoinPool(int parallelism,
2474                          ForkJoinWorkerThreadFactory factory,
2475                          UncaughtExceptionHandler handler,
2476                          int mode,
2477                          String workerNamePrefix) {
2478         this.workerNamePrefix = workerNamePrefix;
2479         this.factory = factory;
2480         this.ueh = handler;
2481         this.mode = (short)mode;
2482         this.parallelism = (short)parallelism;
2483         long np = (long)(-parallelism); // offset ctl counts
2484         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2485     }
2486 
2487     /**
2488      * Returns the common pool instance. This pool is statically
2489      * constructed; its run state is unaffected by attempts to {@link
2490      * #shutdown} or {@link #shutdownNow}. However this pool and any
2491      * ongoing processing are automatically terminated upon program
2492      * {@link System#exit}.  Any program that relies on asynchronous
2493      * task processing to complete before program termination should
2494      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2495      * before exit.
2496      *
2497      * @return the common pool instance
2498      * @since 1.8
2499      */
2500     public static ForkJoinPool commonPool() {
2501         // assert common != null : "static init error";
2502         return common;
2503     }
2504 
2505     // Execution methods
2506 
2507     /**
2508      * Performs the given task, returning its result upon completion.
2509      * If the computation encounters an unchecked Exception or Error,
2510      * it is rethrown as the outcome of this invocation.  Rethrown
2511      * exceptions behave in the same way as regular exceptions, but,
2512      * when possible, contain stack traces (as displayed for example
2513      * using {@code ex.printStackTrace()}) of both the current thread
2514      * as well as the thread actually encountering the exception;
2515      * minimally only the latter.
2516      *
2517      * @param task the task
2518      * @param <T> the type of the task's result
2519      * @return the task's result
2520      * @throws NullPointerException if the task is null
2521      * @throws RejectedExecutionException if the task cannot be
2522      *         scheduled for execution
2523      */
2524     public <T> T invoke(ForkJoinTask<T> task) {
2525         if (task == null)
2526             throw new NullPointerException();
2527         externalPush(task);
2528         return task.join();
2529     }
2530 
2531     /**
2532      * Arranges for (asynchronous) execution of the given task.
2533      *
2534      * @param task the task
2535      * @throws NullPointerException if the task is null
2536      * @throws RejectedExecutionException if the task cannot be
2537      *         scheduled for execution
2538      */
2539     public void execute(ForkJoinTask<?> task) {
2540         if (task == null)
2541             throw new NullPointerException();
2542         externalPush(task);
2543     }
2544 
2545     // AbstractExecutorService methods
2546 
2547     /**
2548      * @throws NullPointerException if the task is null
2549      * @throws RejectedExecutionException if the task cannot be
2550      *         scheduled for execution
2551      */
2552     public void execute(Runnable task) {
2553         if (task == null)
2554             throw new NullPointerException();
2555         ForkJoinTask<?> job;
2556         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2557             job = (ForkJoinTask<?>) task;
2558         else
2559             job = new ForkJoinTask.RunnableExecuteAction(task);
2560         externalPush(job);
2561     }
2562 
2563     /**
2564      * Submits a ForkJoinTask for execution.
2565      *
2566      * @param task the task to submit
2567      * @param <T> the type of the task's result
2568      * @return the task
2569      * @throws NullPointerException if the task is null
2570      * @throws RejectedExecutionException if the task cannot be
2571      *         scheduled for execution
2572      */
2573     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2574         if (task == null)
2575             throw new NullPointerException();
2576         externalPush(task);
2577         return task;
2578     }
2579 
2580     /**
2581      * @throws NullPointerException if the task is null
2582      * @throws RejectedExecutionException if the task cannot be
2583      *         scheduled for execution
2584      */
2585     public <T> ForkJoinTask<T> submit(Callable<T> task) {
2586         ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
2587         externalPush(job);
2588         return job;
2589     }
2590 
2591     /**
2592      * @throws NullPointerException if the task is null
2593      * @throws RejectedExecutionException if the task cannot be
2594      *         scheduled for execution
2595      */
2596     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2597         ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
2598         externalPush(job);
2599         return job;
2600     }
2601 
2602     /**
2603      * @throws NullPointerException if the task is null
2604      * @throws RejectedExecutionException if the task cannot be
2605      *         scheduled for execution
2606      */
2607     public ForkJoinTask<?> submit(Runnable task) {
2608         if (task == null)
2609             throw new NullPointerException();
2610         ForkJoinTask<?> job;
2611         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2612             job = (ForkJoinTask<?>) task;
2613         else
2614             job = new ForkJoinTask.AdaptedRunnableAction(task);
2615         externalPush(job);
2616         return job;
2617     }
2618 
2619     /**
2620      * @throws NullPointerException       {@inheritDoc}
2621      * @throws RejectedExecutionException {@inheritDoc}
2622      */
2623     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2624         // In previous versions of this class, this method constructed
2625         // a task to run ForkJoinTask.invokeAll, but now external
2626         // invocation of multiple tasks is at least as efficient.
2627         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
2628 
2629         boolean done = false;
2630         try {
2631             for (Callable<T> t : tasks) {
2632                 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2633                 futures.add(f);
2634                 externalPush(f);
2635             }
2636             for (int i = 0, size = futures.size(); i < size; i++)
2637                 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2638             done = true;
2639             return futures;
2640         } finally {
2641             if (!done)
2642                 for (int i = 0, size = futures.size(); i < size; i++)
2643                     futures.get(i).cancel(false);
2644         }
2645     }
2646 
2647     /**
2648      * Returns the factory used for constructing new workers.
2649      *
2650      * @return the factory used for constructing new workers
2651      */
2652     public ForkJoinWorkerThreadFactory getFactory() {
2653         return factory;
2654     }
2655 
2656     /**
2657      * Returns the handler for internal worker threads that terminate
2658      * due to unrecoverable errors encountered while executing tasks.
2659      *
2660      * @return the handler, or {@code null} if none
2661      */
2662     public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2663         return ueh;
2664     }
2665 
2666     /**
2667      * Returns the targeted parallelism level of this pool.
2668      *
2669      * @return the targeted parallelism level of this pool
2670      */
2671     public int getParallelism() {
2672         int par;
2673         return ((par = parallelism) > 0) ? par : 1;
2674     }
2675 
2676     /**
2677      * Returns the targeted parallelism level of the common pool.
2678      *
2679      * @return the targeted parallelism level of the common pool
2680      * @since 1.8
2681      */
2682     public static int getCommonPoolParallelism() {
2683         return commonParallelism;
2684     }
2685 
2686     /**
2687      * Returns the number of worker threads that have started but not
2688      * yet terminated.  The result returned by this method may differ
2689      * from {@link #getParallelism} when threads are created to
2690      * maintain parallelism when others are cooperatively blocked.
2691      *
2692      * @return the number of worker threads
2693      */
2694     public int getPoolSize() {
2695         return parallelism + (short)(ctl >>> TC_SHIFT);
2696     }
2697 
2698     /**
2699      * Returns {@code true} if this pool uses local first-in-first-out
2700      * scheduling mode for forked tasks that are never joined.
2701      *
2702      * @return {@code true} if this pool uses async mode
2703      */
2704     public boolean getAsyncMode() {
2705         return mode == FIFO_QUEUE;
2706     }
2707 
2708     /**
2709      * Returns an estimate of the number of worker threads that are
2710      * not blocked waiting to join tasks or for other managed
2711      * synchronization. This method may overestimate the
2712      * number of running threads.
2713      *
2714      * @return the number of worker threads
2715      */
2716     public int getRunningThreadCount() {
2717         int rc = 0;
2718         WorkQueue[] ws; WorkQueue w;
2719         if ((ws = workQueues) != null) {
2720             for (int i = 1; i < ws.length; i += 2) {
2721                 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2722                     ++rc;
2723             }
2724         }
2725         return rc;
2726     }
2727 
2728     /**
2729      * Returns an estimate of the number of threads that are currently
2730      * stealing or executing tasks. This method may overestimate the
2731      * number of active threads.
2732      *
2733      * @return the number of active threads
2734      */
2735     public int getActiveThreadCount() {
2736         int r = parallelism + (int)(ctl >> AC_SHIFT);
2737         return (r <= 0) ? 0 : r; // suppress momentarily negative values
2738     }
2739 
2740     /**
2741      * Returns {@code true} if all worker threads are currently idle.
2742      * An idle worker is one that cannot obtain a task to execute
2743      * because none are available to steal from other threads, and
2744      * there are no pending submissions to the pool. This method is
2745      * conservative; it might not return {@code true} immediately upon
2746      * idleness of all threads, but will eventually become true if
2747      * threads remain inactive.
2748      *
2749      * @return {@code true} if all threads are currently idle
2750      */
2751     public boolean isQuiescent() {
2752         return parallelism + (int)(ctl >> AC_SHIFT) <= 0;
2753     }
2754 
2755     /**
2756      * Returns an estimate of the total number of tasks stolen from
2757      * one thread's work queue by another. The reported value
2758      * underestimates the actual total number of steals when the pool
2759      * is not quiescent. This value may be useful for monitoring and
2760      * tuning fork/join programs: in general, steal counts should be
2761      * high enough to keep threads busy, but low enough to avoid
2762      * overhead and contention across threads.
2763      *
2764      * @return the number of steals
2765      */
2766     public long getStealCount() {
2767         long count = stealCount;
2768         WorkQueue[] ws; WorkQueue w;
2769         if ((ws = workQueues) != null) {
2770             for (int i = 1; i < ws.length; i += 2) {
2771                 if ((w = ws[i]) != null)
2772                     count += w.nsteals;
2773             }
2774         }
2775         return count;
2776     }
2777 
2778     /**
2779      * Returns an estimate of the total number of tasks currently held
2780      * in queues by worker threads (but not including tasks submitted
2781      * to the pool that have not begun executing). This value is only
2782      * an approximation, obtained by iterating across all threads in
2783      * the pool. This method may be useful for tuning task
2784      * granularities.
2785      *
2786      * @return the number of queued tasks
2787      */
2788     public long getQueuedTaskCount() {
2789         long count = 0;
2790         WorkQueue[] ws; WorkQueue w;
2791         if ((ws = workQueues) != null) {
2792             for (int i = 1; i < ws.length; i += 2) {
2793                 if ((w = ws[i]) != null)
2794                     count += w.queueSize();
2795             }
2796         }
2797         return count;
2798     }
2799 
2800     /**
2801      * Returns an estimate of the number of tasks submitted to this
2802      * pool that have not yet begun executing.  This method may take
2803      * time proportional to the number of submissions.
2804      *
2805      * @return the number of queued submissions
2806      */
2807     public int getQueuedSubmissionCount() {
2808         int count = 0;
2809         WorkQueue[] ws; WorkQueue w;
2810         if ((ws = workQueues) != null) {
2811             for (int i = 0; i < ws.length; i += 2) {
2812                 if ((w = ws[i]) != null)
2813                     count += w.queueSize();
2814             }
2815         }
2816         return count;
2817     }
2818 
2819     /**
2820      * Returns {@code true} if there are any tasks submitted to this
2821      * pool that have not yet begun executing.
2822      *
2823      * @return {@code true} if there are any queued submissions
2824      */
2825     public boolean hasQueuedSubmissions() {
2826         WorkQueue[] ws; WorkQueue w;
2827         if ((ws = workQueues) != null) {
2828             for (int i = 0; i < ws.length; i += 2) {
2829                 if ((w = ws[i]) != null && !w.isEmpty())
2830                     return true;
2831             }
2832         }
2833         return false;
2834     }
2835 
2836     /**
2837      * Removes and returns the next unexecuted submission if one is
2838      * available.  This method may be useful in extensions to this
2839      * class that re-assign work in systems with multiple pools.
2840      *
2841      * @return the next submission, or {@code null} if none
2842      */
2843     protected ForkJoinTask<?> pollSubmission() {
2844         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2845         if ((ws = workQueues) != null) {
2846             for (int i = 0; i < ws.length; i += 2) {
2847                 if ((w = ws[i]) != null && (t = w.poll()) != null)
2848                     return t;
2849             }
2850         }
2851         return null;
2852     }
2853 
2854     /**
2855      * Removes all available unexecuted submitted and forked tasks
2856      * from scheduling queues and adds them to the given collection,
2857      * without altering their execution status. These may include
2858      * artificially generated or wrapped tasks. This method is
2859      * designed to be invoked only when the pool is known to be
2860      * quiescent. Invocations at other times may not remove all
2861      * tasks. A failure encountered while attempting to add elements
2862      * to collection {@code c} may result in elements being in
2863      * neither, either or both collections when the associated
2864      * exception is thrown.  The behavior of this operation is
2865      * undefined if the specified collection is modified while the
2866      * operation is in progress.
2867      *
2868      * @param c the collection to transfer elements into
2869      * @return the number of elements transferred
2870      */
2871     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2872         int count = 0;
2873         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2874         if ((ws = workQueues) != null) {
2875             for (int i = 0; i < ws.length; ++i) {
2876                 if ((w = ws[i]) != null) {
2877                     while ((t = w.poll()) != null) {
2878                         c.add(t);
2879                         ++count;
2880                     }
2881                 }
2882             }
2883         }
2884         return count;
2885     }
2886 
2887     /**
2888      * Returns a string identifying this pool, as well as its state,
2889      * including indications of run state, parallelism level, and
2890      * worker and task counts.
2891      *
2892      * @return a string identifying this pool, as well as its state
2893      */
2894     public String toString() {
2895         // Use a single pass through workQueues to collect counts
2896         long qt = 0L, qs = 0L; int rc = 0;
2897         long st = stealCount;
2898         long c = ctl;
2899         WorkQueue[] ws; WorkQueue w;
2900         if ((ws = workQueues) != null) {
2901             for (int i = 0; i < ws.length; ++i) {
2902                 if ((w = ws[i]) != null) {
2903                     int size = w.queueSize();
2904                     if ((i & 1) == 0)
2905                         qs += size;
2906                     else {
2907                         qt += size;
2908                         st += w.nsteals;
2909                         if (w.isApparentlyUnblocked())
2910                             ++rc;
2911                     }
2912                 }
2913             }
2914         }
2915         int pc = parallelism;
2916         int tc = pc + (short)(c >>> TC_SHIFT);
2917         int ac = pc + (int)(c >> AC_SHIFT);
2918         if (ac < 0) // ignore transient negative
2919             ac = 0;
2920         String level;
2921         if ((c & STOP_BIT) != 0)
2922             level = (tc == 0) ? "Terminated" : "Terminating";
2923         else
2924             level = plock < 0 ? "Shutting down" : "Running";
2925         return super.toString() +
2926             "[" + level +
2927             ", parallelism = " + pc +
2928             ", size = " + tc +
2929             ", active = " + ac +
2930             ", running = " + rc +
2931             ", steals = " + st +
2932             ", tasks = " + qt +
2933             ", submissions = " + qs +
2934             "]";
2935     }
2936 
2937     /**
2938      * Possibly initiates an orderly shutdown in which previously
2939      * submitted tasks are executed, but no new tasks will be
2940      * accepted. Invocation has no effect on execution state if this
2941      * is the {@link #commonPool()}, and no additional effect if
2942      * already shut down.  Tasks that are in the process of being
2943      * submitted concurrently during the course of this method may or
2944      * may not be rejected.
2945      *
2946      * @throws SecurityException if a security manager exists and
2947      *         the caller is not permitted to modify threads
2948      *         because it does not hold {@link
2949      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2950      */
2951     public void shutdown() {
2952         checkPermission();
2953         tryTerminate(false, true);
2954     }
2955 
2956     /**
2957      * Possibly attempts to cancel and/or stop all tasks, and reject
2958      * all subsequently submitted tasks.  Invocation has no effect on
2959      * execution state if this is the {@link #commonPool()}, and no
2960      * additional effect if already shut down. Otherwise, tasks that
2961      * are in the process of being submitted or executed concurrently
2962      * during the course of this method may or may not be
2963      * rejected. This method cancels both existing and unexecuted
2964      * tasks, in order to permit termination in the presence of task
2965      * dependencies. So the method always returns an empty list
2966      * (unlike the case for some other Executors).
2967      *
2968      * @return an empty list
2969      * @throws SecurityException if a security manager exists and
2970      *         the caller is not permitted to modify threads
2971      *         because it does not hold {@link
2972      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2973      */
2974     public List<Runnable> shutdownNow() {
2975         checkPermission();
2976         tryTerminate(true, true);
2977         return Collections.emptyList();
2978     }
2979 
2980     /**
2981      * Returns {@code true} if all tasks have completed following shut down.
2982      *
2983      * @return {@code true} if all tasks have completed following shut down
2984      */
2985     public boolean isTerminated() {
2986         long c = ctl;
2987         return ((c & STOP_BIT) != 0L &&
2988                 (short)(c >>> TC_SHIFT) + parallelism <= 0);
2989     }
2990 
2991     /**
2992      * Returns {@code true} if the process of termination has
2993      * commenced but not yet completed.  This method may be useful for
2994      * debugging. A return of {@code true} reported a sufficient
2995      * period after shutdown may indicate that submitted tasks have
2996      * ignored or suppressed interruption, or are waiting for I/O,
2997      * causing this executor not to properly terminate. (See the
2998      * advisory notes for class {@link ForkJoinTask} stating that
2999      * tasks should not normally entail blocking operations.  But if
3000      * they do, they must abort them on interrupt.)
3001      *
3002      * @return {@code true} if terminating but not yet terminated
3003      */
3004     public boolean isTerminating() {
3005         long c = ctl;
3006         return ((c & STOP_BIT) != 0L &&
3007                 (short)(c >>> TC_SHIFT) + parallelism > 0);
3008     }
3009 
3010     /**
3011      * Returns {@code true} if this pool has been shut down.
3012      *
3013      * @return {@code true} if this pool has been shut down
3014      */
3015     public boolean isShutdown() {
3016         return plock < 0;
3017     }
3018 
3019     /**
3020      * Blocks until all tasks have completed execution after a
3021      * shutdown request, or the timeout occurs, or the current thread
3022      * is interrupted, whichever happens first. Because the {@link
3023      * #commonPool()} never terminates until program shutdown, when
3024      * applied to the common pool, this method is equivalent to {@link
3025      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3026      *
3027      * @param timeout the maximum time to wait
3028      * @param unit the time unit of the timeout argument
3029      * @return {@code true} if this executor terminated and
3030      *         {@code false} if the timeout elapsed before termination
3031      * @throws InterruptedException if interrupted while waiting
3032      */
3033     public boolean awaitTermination(long timeout, TimeUnit unit)
3034         throws InterruptedException {
3035         if (Thread.interrupted())
3036             throw new InterruptedException();
3037         if (this == common) {
3038             awaitQuiescence(timeout, unit);
3039             return false;
3040         }
3041         long nanos = unit.toNanos(timeout);
3042         if (isTerminated())
3043             return true;
3044         if (nanos <= 0L)
3045             return false;
3046         long deadline = System.nanoTime() + nanos;
3047         synchronized (this) {
3048             for (;;) {
3049                 if (isTerminated())
3050                     return true;
3051                 if (nanos <= 0L)
3052                     return false;
3053                 long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
3054                 wait(millis > 0L ? millis : 1L);
3055                 nanos = deadline - System.nanoTime();
3056             }
3057         }
3058     }
3059 
3060     /**
3061      * If called by a ForkJoinTask operating in this pool, equivalent
3062      * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3063      * waits and/or attempts to assist performing tasks until this
3064      * pool {@link #isQuiescent} or the indicated timeout elapses.
3065      *
3066      * @param timeout the maximum time to wait
3067      * @param unit the time unit of the timeout argument
3068      * @return {@code true} if quiescent; {@code false} if the
3069      * timeout elapsed.
3070      */
3071     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3072         long nanos = unit.toNanos(timeout);
3073         ForkJoinWorkerThread wt;
3074         Thread thread = Thread.currentThread();
3075         if ((thread instanceof ForkJoinWorkerThread) &&
3076             (wt = (ForkJoinWorkerThread)thread).pool == this) {
3077             helpQuiescePool(wt.workQueue);
3078             return true;
3079         }
3080         long startTime = System.nanoTime();
3081         WorkQueue[] ws;
3082         int r = 0, m;
3083         boolean found = true;
3084         while (!isQuiescent() && (ws = workQueues) != null &&
3085                (m = ws.length - 1) >= 0) {
3086             if (!found) {
3087                 if ((System.nanoTime() - startTime) > nanos)
3088                     return false;
3089                 Thread.yield(); // cannot block
3090             }
3091             found = false;
3092             for (int j = (m + 1) << 2; j >= 0; --j) {
3093                 ForkJoinTask<?> t; WorkQueue q; int b;
3094                 if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
3095                     found = true;
3096                     if ((t = q.pollAt(b)) != null)
3097                         t.doExec();
3098                     break;
3099                 }
3100             }
3101         }
3102         return true;
3103     }
3104 
3105     /**
3106      * Waits and/or attempts to assist performing tasks indefinitely
3107      * until the {@link #commonPool()} {@link #isQuiescent}.
3108      */
3109     static void quiesceCommonPool() {
3110         common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3111     }
3112 
3113     /**
3114      * Interface for extending managed parallelism for tasks running
3115      * in {@link ForkJoinPool}s.
3116      *
3117      * <p>A {@code ManagedBlocker} provides two methods.  Method
3118      * {@code isReleasable} must return {@code true} if blocking is
3119      * not necessary. Method {@code block} blocks the current thread
3120      * if necessary (perhaps internally invoking {@code isReleasable}
3121      * before actually blocking). These actions are performed by any
3122      * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3123      * The unusual methods in this API accommodate synchronizers that
3124      * may, but don't usually, block for long periods. Similarly, they
3125      * allow more efficient internal handling of cases in which
3126      * additional workers may be, but usually are not, needed to
3127      * ensure sufficient parallelism.  Toward this end,
3128      * implementations of method {@code isReleasable} must be amenable
3129      * to repeated invocation.
3130      *
3131      * <p>For example, here is a ManagedBlocker based on a
3132      * ReentrantLock:
3133      *  <pre> {@code
3134      * class ManagedLocker implements ManagedBlocker {
3135      *   final ReentrantLock lock;
3136      *   boolean hasLock = false;
3137      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3138      *   public boolean block() {
3139      *     if (!hasLock)
3140      *       lock.lock();
3141      *     return true;
3142      *   }
3143      *   public boolean isReleasable() {
3144      *     return hasLock || (hasLock = lock.tryLock());
3145      *   }
3146      * }}</pre>
3147      *
3148      * <p>Here is a class that possibly blocks waiting for an
3149      * item on a given queue:
3150      *  <pre> {@code
3151      * class QueueTaker<E> implements ManagedBlocker {
3152      *   final BlockingQueue<E> queue;
3153      *   volatile E item = null;
3154      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3155      *   public boolean block() throws InterruptedException {
3156      *     if (item == null)
3157      *       item = queue.take();
3158      *     return true;
3159      *   }
3160      *   public boolean isReleasable() {
3161      *     return item != null || (item = queue.poll()) != null;
3162      *   }
3163      *   public E getItem() { // call after pool.managedBlock completes
3164      *     return item;
3165      *   }
3166      * }}</pre>
3167      */
3168     public static interface ManagedBlocker {
3169         /**
3170          * Possibly blocks the current thread, for example waiting for
3171          * a lock or condition.
3172          *
3173          * @return {@code true} if no additional blocking is necessary
3174          * (i.e., if isReleasable would return true)
3175          * @throws InterruptedException if interrupted while waiting
3176          * (the method is not required to do so, but is allowed to)
3177          */
3178         boolean block() throws InterruptedException;
3179 
3180         /**
3181          * Returns {@code true} if blocking is unnecessary.
3182          * @return {@code true} if blocking is unnecessary
3183          */
3184         boolean isReleasable();
3185     }
3186 
3187     /**
3188      * Blocks in accord with the given blocker.  If the current thread
3189      * is a {@link ForkJoinWorkerThread}, this method possibly
3190      * arranges for a spare thread to be activated if necessary to
3191      * ensure sufficient parallelism while the current thread is blocked.
3192      *
3193      * <p>If the caller is not a {@link ForkJoinTask}, this method is
3194      * behaviorally equivalent to
3195      *  <pre> {@code
3196      * while (!blocker.isReleasable())
3197      *   if (blocker.block())
3198      *     return;
3199      * }</pre>
3200      *
3201      * If the caller is a {@code ForkJoinTask}, then the pool may
3202      * first be expanded to ensure parallelism, and later adjusted.
3203      *
3204      * @param blocker the blocker
3205      * @throws InterruptedException if blocker.block did so
3206      */
3207     public static void managedBlock(ManagedBlocker blocker)
3208         throws InterruptedException {
3209         Thread t = Thread.currentThread();
3210         if (t instanceof ForkJoinWorkerThread) {
3211             ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
3212             while (!blocker.isReleasable()) {
3213                 if (p.tryCompensate(p.ctl)) {
3214                     try {
3215                         do {} while (!blocker.isReleasable() &&
3216                                      !blocker.block());
3217                     } finally {
3218                         p.incrementActiveCount();
3219                     }
3220                     break;
3221                 }
3222             }
3223         }
3224         else {
3225             do {} while (!blocker.isReleasable() &&
3226                          !blocker.block());
3227         }
3228     }
3229 
3230     // AbstractExecutorService overrides.  These rely on undocumented
3231     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3232     // implement RunnableFuture.
3233 
3234     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3235         return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3236     }
3237 
3238     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3239         return new ForkJoinTask.AdaptedCallable<T>(callable);
3240     }
3241 
3242     // Unsafe mechanics
3243     private static final sun.misc.Unsafe U;
3244     private static final long CTL;
3245     private static final long PARKBLOCKER;
3246     private static final int ABASE;
3247     private static final int ASHIFT;
3248     private static final long STEALCOUNT;
3249     private static final long PLOCK;
3250     private static final long INDEXSEED;
3251     private static final long QBASE;
3252     private static final long QLOCK;
3253 
3254     static {
3255         // initialize field offsets for CAS etc
3256         try {
3257             U = sun.misc.Unsafe.getUnsafe();
3258             Class<?> k = ForkJoinPool.class;
3259             CTL = U.objectFieldOffset
3260                 (k.getDeclaredField("ctl"));
3261             STEALCOUNT = U.objectFieldOffset
3262                 (k.getDeclaredField("stealCount"));
3263             PLOCK = U.objectFieldOffset
3264                 (k.getDeclaredField("plock"));
3265             INDEXSEED = U.objectFieldOffset
3266                 (k.getDeclaredField("indexSeed"));
3267             Class<?> tk = Thread.class;
3268             PARKBLOCKER = U.objectFieldOffset
3269                 (tk.getDeclaredField("parkBlocker"));
3270             Class<?> wk = WorkQueue.class;
3271             QBASE = U.objectFieldOffset
3272                 (wk.getDeclaredField("base"));
3273             QLOCK = U.objectFieldOffset
3274                 (wk.getDeclaredField("qlock"));
3275             Class<?> ak = ForkJoinTask[].class;
3276             ABASE = U.arrayBaseOffset(ak);
3277             int scale = U.arrayIndexScale(ak);
3278             if ((scale & (scale - 1)) != 0)
3279                 throw new Error("data type scale not a power of two");
3280             ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3281         } catch (Exception e) {
3282             throw new Error(e);
3283         }
3284 
3285         defaultForkJoinWorkerThreadFactory =
3286             new DefaultForkJoinWorkerThreadFactory();
3287         modifyThreadPermission = new RuntimePermission("modifyThread");
3288 
3289         common = java.security.AccessController.doPrivileged
3290             (new java.security.PrivilegedAction<ForkJoinPool>() {
3291                 public ForkJoinPool run() { return makeCommonPool(); }});
3292         int par = common.parallelism; // report 1 even if threads disabled
3293         commonParallelism = par > 0 ? par : 1;
3294     }
3295 
3296     /**
3297      * Creates and returns the common pool, respecting user settings
3298      * specified via system properties.
3299      */
3300     private static ForkJoinPool makeCommonPool() {
3301         int parallelism = -1;
3302         ForkJoinWorkerThreadFactory factory = null;
3303         UncaughtExceptionHandler handler = null;
3304         try {  // ignore exceptions in accessing/parsing properties
3305             String pp = System.getProperty
3306                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
3307             String fp = System.getProperty
3308                 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3309             String hp = System.getProperty
3310                 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3311             if (pp != null)
3312                 parallelism = Integer.parseInt(pp);
3313             if (fp != null)
3314                 factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
3315                            getSystemClassLoader().loadClass(fp).newInstance());
3316             if (hp != null)
3317                 handler = ((UncaughtExceptionHandler)ClassLoader.
3318                            getSystemClassLoader().loadClass(hp).newInstance());
3319         } catch (Exception ignore) {
3320         }
3321         if (factory == null) {
3322             if (System.getSecurityManager() == null)
3323                 factory = defaultForkJoinWorkerThreadFactory;
3324             else // use security-managed default
3325                 factory = new InnocuousForkJoinWorkerThreadFactory();
3326         }
3327         if (parallelism < 0 && // default 1 less than #cores
3328             (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
3329             parallelism = 1;
3330         if (parallelism > MAX_CAP)
3331             parallelism = MAX_CAP;
3332         return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
3333                                 "ForkJoinPool.commonPool-worker-");
3334     }
3335 
3336     /**
3337      * Factory for innocuous worker threads
3338      */
3339     static final class InnocuousForkJoinWorkerThreadFactory
3340         implements ForkJoinWorkerThreadFactory {
3341 
3342         /**
3343          * An ACC to restrict permissions for the factory itself.
3344          * The constructed workers have no permissions set.
3345          */
3346         private static final AccessControlContext innocuousAcc;
3347         static {
3348             Permissions innocuousPerms = new Permissions();
3349             innocuousPerms.add(modifyThreadPermission);
3350             innocuousPerms.add(new RuntimePermission(
3351                                    "enableContextClassLoaderOverride"));
3352             innocuousPerms.add(new RuntimePermission(
3353                                    "modifyThreadGroup"));
3354             innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
3355                     new ProtectionDomain(null, innocuousPerms)
3356                 });
3357         }
3358 
3359         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3360             return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
3361                 java.security.AccessController.doPrivileged(
3362                     new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
3363                     public ForkJoinWorkerThread run() {
3364                         return new ForkJoinWorkerThread.
3365                             InnocuousForkJoinWorkerThread(pool);
3366                     }}, innocuousAcc);
3367         }
3368     }
3369 
3370 }