View Javadoc
1   /*
2    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3    *
4    * This code is free software; you can redistribute it and/or modify it
5    * under the terms of the GNU General Public License version 2 only, as
6    * published by the Free Software Foundation.  Oracle designates this
7    * particular file as subject to the "Classpath" exception as provided
8    * by Oracle in the LICENSE file that accompanied this code.
9    *
10   * This code is distributed in the hope that it will be useful, but WITHOUT
11   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12   * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13   * version 2 for more details (a copy is included in the LICENSE file that
14   * accompanied this code).
15   *
16   * You should have received a copy of the GNU General Public License version
17   * 2 along with this work; if not, write to the Free Software Foundation,
18   * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19   *
20   * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21   * or visit www.oracle.com if you need additional information or have any
22   * questions.
23   */
24  
25  /*
26   * This file is available under and governed by the GNU General Public
27   * License version 2 only, as published by the Free Software Foundation.
28   * However, the following notice accompanied the original version of this
29   * file:
30   *
31   * Written by Doug Lea with assistance from members of JCP JSR-166
32   * Expert Group and released to the public domain, as explained at
33   * http://creativecommons.org/publicdomain/zero/1.0/
34   */
35  
36  package java.util.concurrent;
37  import static java.util.concurrent.TimeUnit.NANOSECONDS;
38  import java.util.concurrent.atomic.AtomicLong;
39  import java.util.concurrent.locks.Condition;
40  import java.util.concurrent.locks.ReentrantLock;
41  import java.util.*;
42  
43  /**
44   * A {@link ThreadPoolExecutor} that can additionally schedule
45   * commands to run after a given delay, or to execute
46   * periodically. This class is preferable to {@link java.util.Timer}
47   * when multiple worker threads are needed, or when the additional
48   * flexibility or capabilities of {@link ThreadPoolExecutor} (which
49   * this class extends) are required.
50   *
51   * <p>Delayed tasks execute no sooner than they are enabled, but
52   * without any real-time guarantees about when, after they are
53   * enabled, they will commence. Tasks scheduled for exactly the same
54   * execution time are enabled in first-in-first-out (FIFO) order of
55   * submission.
56   *
57   * <p>When a submitted task is cancelled before it is run, execution
58   * is suppressed. By default, such a cancelled task is not
59   * automatically removed from the work queue until its delay
60   * elapses. While this enables further inspection and monitoring, it
61   * may also cause unbounded retention of cancelled tasks. To avoid
62   * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
63   * causes tasks to be immediately removed from the work queue at
64   * time of cancellation.
65   *
66   * <p>Successive executions of a task scheduled via
67   * {@code scheduleAtFixedRate} or
68   * {@code scheduleWithFixedDelay} do not overlap. While different
69   * executions may be performed by different threads, the effects of
70   * prior executions <a
71   * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
72   * those of subsequent ones.
73   *
74   * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
75   * of the inherited tuning methods are not useful for it. In
76   * particular, because it acts as a fixed-sized pool using
77   * {@code corePoolSize} threads and an unbounded queue, adjustments
78   * to {@code maximumPoolSize} have no useful effect. Additionally, it
79   * is almost never a good idea to set {@code corePoolSize} to zero or
80   * use {@code allowCoreThreadTimeOut} because this may leave the pool
81   * without threads to handle tasks once they become eligible to run.
82   *
83   * <p><b>Extension notes:</b> This class overrides the
84   * {@link ThreadPoolExecutor#execute(Runnable) execute} and
85   * {@link AbstractExecutorService#submit(Runnable) submit}
86   * methods to generate internal {@link ScheduledFuture} objects to
87   * control per-task delays and scheduling.  To preserve
88   * functionality, any further overrides of these methods in
89   * subclasses must invoke superclass versions, which effectively
90   * disables additional task customization.  However, this class
91   * provides alternative protected extension method
92   * {@code decorateTask} (one version each for {@code Runnable} and
93   * {@code Callable}) that can be used to customize the concrete task
94   * types used to execute commands entered via {@code execute},
95   * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
96   * and {@code scheduleWithFixedDelay}.  By default, a
97   * {@code ScheduledThreadPoolExecutor} uses a task type extending
98   * {@link FutureTask}. However, this may be modified or replaced using
99   * subclasses of the form:
100  *
101  *  <pre> {@code
102  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
103  *
104  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
105  *
106  *   protected <V> RunnableScheduledFuture<V> decorateTask(
107  *                Runnable r, RunnableScheduledFuture<V> task) {
108  *       return new CustomTask<V>(r, task);
109  *   }
110  *
111  *   protected <V> RunnableScheduledFuture<V> decorateTask(
112  *                Callable<V> c, RunnableScheduledFuture<V> task) {
113  *       return new CustomTask<V>(c, task);
114  *   }
115  *   // ... add constructors, etc.
116  * }}</pre>
117  *
118  * @since 1.5
119  * @author Doug Lea
120  */
121 public class ScheduledThreadPoolExecutor
122         extends ThreadPoolExecutor
123         implements ScheduledExecutorService {
124 
125     /*
126      * This class specializes ThreadPoolExecutor implementation by
127      *
128      * 1. Using a custom task type, ScheduledFutureTask for
129      *    tasks, even those that don't require scheduling (i.e.,
130      *    those submitted using ExecutorService execute, not
131      *    ScheduledExecutorService methods) which are treated as
132      *    delayed tasks with a delay of zero.
133      *
134      * 2. Using a custom queue (DelayedWorkQueue), a variant of
135      *    unbounded DelayQueue. The lack of capacity constraint and
136      *    the fact that corePoolSize and maximumPoolSize are
137      *    effectively identical simplifies some execution mechanics
138      *    (see delayedExecute) compared to ThreadPoolExecutor.
139      *
140      * 3. Supporting optional run-after-shutdown parameters, which
141      *    leads to overrides of shutdown methods to remove and cancel
142      *    tasks that should NOT be run after shutdown, as well as
143      *    different recheck logic when task (re)submission overlaps
144      *    with a shutdown.
145      *
146      * 4. Task decoration methods to allow interception and
147      *    instrumentation, which are needed because subclasses cannot
148      *    otherwise override submit methods to get this effect. These
149      *    don't have any impact on pool control logic though.
150      */
151 
152     /**
153      * False if should cancel/suppress periodic tasks on shutdown.
154      */
155     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
156 
157     /**
158      * False if should cancel non-periodic tasks on shutdown.
159      */
160     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
161 
162     /**
163      * True if ScheduledFutureTask.cancel should remove from queue
164      */
165     private volatile boolean removeOnCancel = false;
166 
167     /**
168      * Sequence number to break scheduling ties, and in turn to
169      * guarantee FIFO order among tied entries.
170      */
171     private static final AtomicLong sequencer = new AtomicLong();
172 
173     /**
174      * Returns current nanosecond time.
175      */
176     final long now() {
177         return System.nanoTime();
178     }
179 
180     private class ScheduledFutureTask<V>
181             extends FutureTask<V> implements RunnableScheduledFuture<V> {
182 
183         /** Sequence number to break ties FIFO */
184         private final long sequenceNumber;
185 
186         /** The time the task is enabled to execute in nanoTime units */
187         private long time;
188 
189         /**
190          * Period in nanoseconds for repeating tasks.  A positive
191          * value indicates fixed-rate execution.  A negative value
192          * indicates fixed-delay execution.  A value of 0 indicates a
193          * non-repeating task.
194          */
195         private final long period;
196 
197         /** The actual task to be re-enqueued by reExecutePeriodic */
198         RunnableScheduledFuture<V> outerTask = this;
199 
200         /**
201          * Index into delay queue, to support faster cancellation.
202          */
203         int heapIndex;
204 
205         /**
206          * Creates a one-shot action with given nanoTime-based trigger time.
207          */
208         ScheduledFutureTask(Runnable r, V result, long ns) {
209             super(r, result);
210             this.time = ns;
211             this.period = 0;
212             this.sequenceNumber = sequencer.getAndIncrement();
213         }
214 
215         /**
216          * Creates a periodic action with given nano time and period.
217          */
218         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
219             super(r, result);
220             this.time = ns;
221             this.period = period;
222             this.sequenceNumber = sequencer.getAndIncrement();
223         }
224 
225         /**
226          * Creates a one-shot action with given nanoTime-based trigger time.
227          */
228         ScheduledFutureTask(Callable<V> callable, long ns) {
229             super(callable);
230             this.time = ns;
231             this.period = 0;
232             this.sequenceNumber = sequencer.getAndIncrement();
233         }
234 
235         public long getDelay(TimeUnit unit) {
236             return unit.convert(time - now(), NANOSECONDS);
237         }
238 
239         public int compareTo(Delayed other) {
240             if (other == this) // compare zero if same object
241                 return 0;
242             if (other instanceof ScheduledFutureTask) {
243                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
244                 long diff = time - x.time;
245                 if (diff < 0)
246                     return -1;
247                 else if (diff > 0)
248                     return 1;
249                 else if (sequenceNumber < x.sequenceNumber)
250                     return -1;
251                 else
252                     return 1;
253             }
254             long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
255             return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
256         }
257 
258         /**
259          * Returns {@code true} if this is a periodic (not a one-shot) action.
260          *
261          * @return {@code true} if periodic
262          */
263         public boolean isPeriodic() {
264             return period != 0;
265         }
266 
267         /**
268          * Sets the next time to run for a periodic task.
269          */
270         private void setNextRunTime() {
271             long p = period;
272             if (p > 0)
273                 time += p;
274             else
275                 time = triggerTime(-p);
276         }
277 
278         public boolean cancel(boolean mayInterruptIfRunning) {
279             boolean cancelled = super.cancel(mayInterruptIfRunning);
280             if (cancelled && removeOnCancel && heapIndex >= 0)
281                 remove(this);
282             return cancelled;
283         }
284 
285         /**
286          * Overrides FutureTask version so as to reset/requeue if periodic.
287          */
288         public void run() {
289             boolean periodic = isPeriodic();
290             if (!canRunInCurrentRunState(periodic))
291                 cancel(false);
292             else if (!periodic)
293                 ScheduledFutureTask.super.run();
294             else if (ScheduledFutureTask.super.runAndReset()) {
295                 setNextRunTime();
296                 reExecutePeriodic(outerTask);
297             }
298         }
299     }
300 
301     /**
302      * Returns true if can run a task given current run state
303      * and run-after-shutdown parameters.
304      *
305      * @param periodic true if this task periodic, false if delayed
306      */
307     boolean canRunInCurrentRunState(boolean periodic) {
308         return isRunningOrShutdown(periodic ?
309                                    continueExistingPeriodicTasksAfterShutdown :
310                                    executeExistingDelayedTasksAfterShutdown);
311     }
312 
313     /**
314      * Main execution method for delayed or periodic tasks.  If pool
315      * is shut down, rejects the task. Otherwise adds task to queue
316      * and starts a thread, if necessary, to run it.  (We cannot
317      * prestart the thread to run the task because the task (probably)
318      * shouldn't be run yet.)  If the pool is shut down while the task
319      * is being added, cancel and remove it if required by state and
320      * run-after-shutdown parameters.
321      *
322      * @param task the task
323      */
324     private void delayedExecute(RunnableScheduledFuture<?> task) {
325         if (isShutdown())
326             reject(task);
327         else {
328             super.getQueue().add(task);
329             if (isShutdown() &&
330                 !canRunInCurrentRunState(task.isPeriodic()) &&
331                 remove(task))
332                 task.cancel(false);
333             else
334                 ensurePrestart();
335         }
336     }
337 
338     /**
339      * Requeues a periodic task unless current run state precludes it.
340      * Same idea as delayedExecute except drops task rather than rejecting.
341      *
342      * @param task the task
343      */
344     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
345         if (canRunInCurrentRunState(true)) {
346             super.getQueue().add(task);
347             if (!canRunInCurrentRunState(true) && remove(task))
348                 task.cancel(false);
349             else
350                 ensurePrestart();
351         }
352     }
353 
354     /**
355      * Cancels and clears the queue of all tasks that should not be run
356      * due to shutdown policy.  Invoked within super.shutdown.
357      */
358     @Override void onShutdown() {
359         BlockingQueue<Runnable> q = super.getQueue();
360         boolean keepDelayed =
361             getExecuteExistingDelayedTasksAfterShutdownPolicy();
362         boolean keepPeriodic =
363             getContinueExistingPeriodicTasksAfterShutdownPolicy();
364         if (!keepDelayed && !keepPeriodic) {
365             for (Object e : q.toArray())
366                 if (e instanceof RunnableScheduledFuture<?>)
367                     ((RunnableScheduledFuture<?>) e).cancel(false);
368             q.clear();
369         }
370         else {
371             // Traverse snapshot to avoid iterator exceptions
372             for (Object e : q.toArray()) {
373                 if (e instanceof RunnableScheduledFuture) {
374                     RunnableScheduledFuture<?> t =
375                         (RunnableScheduledFuture<?>)e;
376                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
377                         t.isCancelled()) { // also remove if already cancelled
378                         if (q.remove(t))
379                             t.cancel(false);
380                     }
381                 }
382             }
383         }
384         tryTerminate();
385     }
386 
387     /**
388      * Modifies or replaces the task used to execute a runnable.
389      * This method can be used to override the concrete
390      * class used for managing internal tasks.
391      * The default implementation simply returns the given task.
392      *
393      * @param runnable the submitted Runnable
394      * @param task the task created to execute the runnable
395      * @param <V> the type of the task's result
396      * @return a task that can execute the runnable
397      * @since 1.6
398      */
399     protected <V> RunnableScheduledFuture<V> decorateTask(
400         Runnable runnable, RunnableScheduledFuture<V> task) {
401         return task;
402     }
403 
404     /**
405      * Modifies or replaces the task used to execute a callable.
406      * This method can be used to override the concrete
407      * class used for managing internal tasks.
408      * The default implementation simply returns the given task.
409      *
410      * @param callable the submitted Callable
411      * @param task the task created to execute the callable
412      * @param <V> the type of the task's result
413      * @return a task that can execute the callable
414      * @since 1.6
415      */
416     protected <V> RunnableScheduledFuture<V> decorateTask(
417         Callable<V> callable, RunnableScheduledFuture<V> task) {
418         return task;
419     }
420 
421     /**
422      * Creates a new {@code ScheduledThreadPoolExecutor} with the
423      * given core pool size.
424      *
425      * @param corePoolSize the number of threads to keep in the pool, even
426      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
427      * @throws IllegalArgumentException if {@code corePoolSize < 0}
428      */
429     public ScheduledThreadPoolExecutor(int corePoolSize) {
430         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
431               new DelayedWorkQueue());
432     }
433 
434     /**
435      * Creates a new {@code ScheduledThreadPoolExecutor} with the
436      * given initial parameters.
437      *
438      * @param corePoolSize the number of threads to keep in the pool, even
439      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
440      * @param threadFactory the factory to use when the executor
441      *        creates a new thread
442      * @throws IllegalArgumentException if {@code corePoolSize < 0}
443      * @throws NullPointerException if {@code threadFactory} is null
444      */
445     public ScheduledThreadPoolExecutor(int corePoolSize,
446                                        ThreadFactory threadFactory) {
447         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
448               new DelayedWorkQueue(), threadFactory);
449     }
450 
451     /**
452      * Creates a new ScheduledThreadPoolExecutor with the given
453      * initial parameters.
454      *
455      * @param corePoolSize the number of threads to keep in the pool, even
456      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
457      * @param handler the handler to use when execution is blocked
458      *        because the thread bounds and queue capacities are reached
459      * @throws IllegalArgumentException if {@code corePoolSize < 0}
460      * @throws NullPointerException if {@code handler} is null
461      */
462     public ScheduledThreadPoolExecutor(int corePoolSize,
463                                        RejectedExecutionHandler handler) {
464         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
465               new DelayedWorkQueue(), handler);
466     }
467 
468     /**
469      * Creates a new ScheduledThreadPoolExecutor with the given
470      * initial parameters.
471      *
472      * @param corePoolSize the number of threads to keep in the pool, even
473      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
474      * @param threadFactory the factory to use when the executor
475      *        creates a new thread
476      * @param handler the handler to use when execution is blocked
477      *        because the thread bounds and queue capacities are reached
478      * @throws IllegalArgumentException if {@code corePoolSize < 0}
479      * @throws NullPointerException if {@code threadFactory} or
480      *         {@code handler} is null
481      */
482     public ScheduledThreadPoolExecutor(int corePoolSize,
483                                        ThreadFactory threadFactory,
484                                        RejectedExecutionHandler handler) {
485         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
486               new DelayedWorkQueue(), threadFactory, handler);
487     }
488 
489     /**
490      * Returns the trigger time of a delayed action.
491      */
492     private long triggerTime(long delay, TimeUnit unit) {
493         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
494     }
495 
496     /**
497      * Returns the trigger time of a delayed action.
498      */
499     long triggerTime(long delay) {
500         return now() +
501             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
502     }
503 
504     /**
505      * Constrains the values of all delays in the queue to be within
506      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
507      * This may occur if a task is eligible to be dequeued, but has
508      * not yet been, while some other task is added with a delay of
509      * Long.MAX_VALUE.
510      */
511     private long overflowFree(long delay) {
512         Delayed head = (Delayed) super.getQueue().peek();
513         if (head != null) {
514             long headDelay = head.getDelay(NANOSECONDS);
515             if (headDelay < 0 && (delay - headDelay < 0))
516                 delay = Long.MAX_VALUE + headDelay;
517         }
518         return delay;
519     }
520 
521     /**
522      * @throws RejectedExecutionException {@inheritDoc}
523      * @throws NullPointerException       {@inheritDoc}
524      */
525     public ScheduledFuture<?> schedule(Runnable command,
526                                        long delay,
527                                        TimeUnit unit) {
528         if (command == null || unit == null)
529             throw new NullPointerException();
530         RunnableScheduledFuture<?> t = decorateTask(command,
531             new ScheduledFutureTask<Void>(command, null,
532                                           triggerTime(delay, unit)));
533         delayedExecute(t);
534         return t;
535     }
536 
537     /**
538      * @throws RejectedExecutionException {@inheritDoc}
539      * @throws NullPointerException       {@inheritDoc}
540      */
541     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
542                                            long delay,
543                                            TimeUnit unit) {
544         if (callable == null || unit == null)
545             throw new NullPointerException();
546         RunnableScheduledFuture<V> t = decorateTask(callable,
547             new ScheduledFutureTask<V>(callable,
548                                        triggerTime(delay, unit)));
549         delayedExecute(t);
550         return t;
551     }
552 
553     /**
554      * @throws RejectedExecutionException {@inheritDoc}
555      * @throws NullPointerException       {@inheritDoc}
556      * @throws IllegalArgumentException   {@inheritDoc}
557      */
558     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
559                                                   long initialDelay,
560                                                   long period,
561                                                   TimeUnit unit) {
562         if (command == null || unit == null)
563             throw new NullPointerException();
564         if (period <= 0)
565             throw new IllegalArgumentException();
566         ScheduledFutureTask<Void> sft =
567             new ScheduledFutureTask<Void>(command,
568                                           null,
569                                           triggerTime(initialDelay, unit),
570                                           unit.toNanos(period));
571         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
572         sft.outerTask = t;
573         delayedExecute(t);
574         return t;
575     }
576 
577     /**
578      * @throws RejectedExecutionException {@inheritDoc}
579      * @throws NullPointerException       {@inheritDoc}
580      * @throws IllegalArgumentException   {@inheritDoc}
581      */
582     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
583                                                      long initialDelay,
584                                                      long delay,
585                                                      TimeUnit unit) {
586         if (command == null || unit == null)
587             throw new NullPointerException();
588         if (delay <= 0)
589             throw new IllegalArgumentException();
590         ScheduledFutureTask<Void> sft =
591             new ScheduledFutureTask<Void>(command,
592                                           null,
593                                           triggerTime(initialDelay, unit),
594                                           unit.toNanos(-delay));
595         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
596         sft.outerTask = t;
597         delayedExecute(t);
598         return t;
599     }
600 
601     /**
602      * Executes {@code command} with zero required delay.
603      * This has effect equivalent to
604      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
605      * Note that inspections of the queue and of the list returned by
606      * {@code shutdownNow} will access the zero-delayed
607      * {@link ScheduledFuture}, not the {@code command} itself.
608      *
609      * <p>A consequence of the use of {@code ScheduledFuture} objects is
610      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
611      * called with a null second {@code Throwable} argument, even if the
612      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
613      * thrown by such a task can be obtained via {@link Future#get}.
614      *
615      * @throws RejectedExecutionException at discretion of
616      *         {@code RejectedExecutionHandler}, if the task
617      *         cannot be accepted for execution because the
618      *         executor has been shut down
619      * @throws NullPointerException {@inheritDoc}
620      */
621     public void execute(Runnable command) {
622         schedule(command, 0, NANOSECONDS);
623     }
624 
625     // Override AbstractExecutorService methods
626 
627     /**
628      * @throws RejectedExecutionException {@inheritDoc}
629      * @throws NullPointerException       {@inheritDoc}
630      */
631     public Future<?> submit(Runnable task) {
632         return schedule(task, 0, NANOSECONDS);
633     }
634 
635     /**
636      * @throws RejectedExecutionException {@inheritDoc}
637      * @throws NullPointerException       {@inheritDoc}
638      */
639     public <T> Future<T> submit(Runnable task, T result) {
640         return schedule(Executors.callable(task, result), 0, NANOSECONDS);
641     }
642 
643     /**
644      * @throws RejectedExecutionException {@inheritDoc}
645      * @throws NullPointerException       {@inheritDoc}
646      */
647     public <T> Future<T> submit(Callable<T> task) {
648         return schedule(task, 0, NANOSECONDS);
649     }
650 
651     /**
652      * Sets the policy on whether to continue executing existing
653      * periodic tasks even when this executor has been {@code shutdown}.
654      * In this case, these tasks will only terminate upon
655      * {@code shutdownNow} or after setting the policy to
656      * {@code false} when already shutdown.
657      * This value is by default {@code false}.
658      *
659      * @param value if {@code true}, continue after shutdown, else don't
660      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
661      */
662     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
663         continueExistingPeriodicTasksAfterShutdown = value;
664         if (!value && isShutdown())
665             onShutdown();
666     }
667 
668     /**
669      * Gets the policy on whether to continue executing existing
670      * periodic tasks even when this executor has been {@code shutdown}.
671      * In this case, these tasks will only terminate upon
672      * {@code shutdownNow} or after setting the policy to
673      * {@code false} when already shutdown.
674      * This value is by default {@code false}.
675      *
676      * @return {@code true} if will continue after shutdown
677      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
678      */
679     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
680         return continueExistingPeriodicTasksAfterShutdown;
681     }
682 
683     /**
684      * Sets the policy on whether to execute existing delayed
685      * tasks even when this executor has been {@code shutdown}.
686      * In this case, these tasks will only terminate upon
687      * {@code shutdownNow}, or after setting the policy to
688      * {@code false} when already shutdown.
689      * This value is by default {@code true}.
690      *
691      * @param value if {@code true}, execute after shutdown, else don't
692      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
693      */
694     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
695         executeExistingDelayedTasksAfterShutdown = value;
696         if (!value && isShutdown())
697             onShutdown();
698     }
699 
700     /**
701      * Gets the policy on whether to execute existing delayed
702      * tasks even when this executor has been {@code shutdown}.
703      * In this case, these tasks will only terminate upon
704      * {@code shutdownNow}, or after setting the policy to
705      * {@code false} when already shutdown.
706      * This value is by default {@code true}.
707      *
708      * @return {@code true} if will execute after shutdown
709      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
710      */
711     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
712         return executeExistingDelayedTasksAfterShutdown;
713     }
714 
715     /**
716      * Sets the policy on whether cancelled tasks should be immediately
717      * removed from the work queue at time of cancellation.  This value is
718      * by default {@code false}.
719      *
720      * @param value if {@code true}, remove on cancellation, else don't
721      * @see #getRemoveOnCancelPolicy
722      * @since 1.7
723      */
724     public void setRemoveOnCancelPolicy(boolean value) {
725         removeOnCancel = value;
726     }
727 
728     /**
729      * Gets the policy on whether cancelled tasks should be immediately
730      * removed from the work queue at time of cancellation.  This value is
731      * by default {@code false}.
732      *
733      * @return {@code true} if cancelled tasks are immediately removed
734      *         from the queue
735      * @see #setRemoveOnCancelPolicy
736      * @since 1.7
737      */
738     public boolean getRemoveOnCancelPolicy() {
739         return removeOnCancel;
740     }
741 
742     /**
743      * Initiates an orderly shutdown in which previously submitted
744      * tasks are executed, but no new tasks will be accepted.
745      * Invocation has no additional effect if already shut down.
746      *
747      * <p>This method does not wait for previously submitted tasks to
748      * complete execution.  Use {@link #awaitTermination awaitTermination}
749      * to do that.
750      *
751      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
752      * has been set {@code false}, existing delayed tasks whose delays
753      * have not yet elapsed are cancelled.  And unless the {@code
754      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
755      * {@code true}, future executions of existing periodic tasks will
756      * be cancelled.
757      *
758      * @throws SecurityException {@inheritDoc}
759      */
760     public void shutdown() {
761         super.shutdown();
762     }
763 
764     /**
765      * Attempts to stop all actively executing tasks, halts the
766      * processing of waiting tasks, and returns a list of the tasks
767      * that were awaiting execution.
768      *
769      * <p>This method does not wait for actively executing tasks to
770      * terminate.  Use {@link #awaitTermination awaitTermination} to
771      * do that.
772      *
773      * <p>There are no guarantees beyond best-effort attempts to stop
774      * processing actively executing tasks.  This implementation
775      * cancels tasks via {@link Thread#interrupt}, so any task that
776      * fails to respond to interrupts may never terminate.
777      *
778      * @return list of tasks that never commenced execution.
779      *         Each element of this list is a {@link ScheduledFuture},
780      *         including those tasks submitted using {@code execute},
781      *         which are for scheduling purposes used as the basis of a
782      *         zero-delay {@code ScheduledFuture}.
783      * @throws SecurityException {@inheritDoc}
784      */
785     public List<Runnable> shutdownNow() {
786         return super.shutdownNow();
787     }
788 
789     /**
790      * Returns the task queue used by this executor.  Each element of
791      * this queue is a {@link ScheduledFuture}, including those
792      * tasks submitted using {@code execute} which are for scheduling
793      * purposes used as the basis of a zero-delay
794      * {@code ScheduledFuture}.  Iteration over this queue is
795      * <em>not</em> guaranteed to traverse tasks in the order in
796      * which they will execute.
797      *
798      * @return the task queue
799      */
800     public BlockingQueue<Runnable> getQueue() {
801         return super.getQueue();
802     }
803 
804     /**
805      * Specialized delay queue. To mesh with TPE declarations, this
806      * class must be declared as a BlockingQueue<Runnable> even though
807      * it can only hold RunnableScheduledFutures.
808      */
809     static class DelayedWorkQueue extends AbstractQueue<Runnable>
810         implements BlockingQueue<Runnable> {
811 
812         /*
813          * A DelayedWorkQueue is based on a heap-based data structure
814          * like those in DelayQueue and PriorityQueue, except that
815          * every ScheduledFutureTask also records its index into the
816          * heap array. This eliminates the need to find a task upon
817          * cancellation, greatly speeding up removal (down from O(n)
818          * to O(log n)), and reducing garbage retention that would
819          * otherwise occur by waiting for the element to rise to top
820          * before clearing. But because the queue may also hold
821          * RunnableScheduledFutures that are not ScheduledFutureTasks,
822          * we are not guaranteed to have such indices available, in
823          * which case we fall back to linear search. (We expect that
824          * most tasks will not be decorated, and that the faster cases
825          * will be much more common.)
826          *
827          * All heap operations must record index changes -- mainly
828          * within siftUp and siftDown. Upon removal, a task's
829          * heapIndex is set to -1. Note that ScheduledFutureTasks can
830          * appear at most once in the queue (this need not be true for
831          * other kinds of tasks or work queues), so are uniquely
832          * identified by heapIndex.
833          */
834 
835         private static final int INITIAL_CAPACITY = 16;
836         private RunnableScheduledFuture<?>[] queue =
837             new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
838         private final ReentrantLock lock = new ReentrantLock();
839         private int size = 0;
840 
841         /**
842          * Thread designated to wait for the task at the head of the
843          * queue.  This variant of the Leader-Follower pattern
844          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
845          * minimize unnecessary timed waiting.  When a thread becomes
846          * the leader, it waits only for the next delay to elapse, but
847          * other threads await indefinitely.  The leader thread must
848          * signal some other thread before returning from take() or
849          * poll(...), unless some other thread becomes leader in the
850          * interim.  Whenever the head of the queue is replaced with a
851          * task with an earlier expiration time, the leader field is
852          * invalidated by being reset to null, and some waiting
853          * thread, but not necessarily the current leader, is
854          * signalled.  So waiting threads must be prepared to acquire
855          * and lose leadership while waiting.
856          */
857         private Thread leader = null;
858 
859         /**
860          * Condition signalled when a newer task becomes available at the
861          * head of the queue or a new thread may need to become leader.
862          */
863         private final Condition available = lock.newCondition();
864 
865         /**
866          * Sets f's heapIndex if it is a ScheduledFutureTask.
867          */
868         private void setIndex(RunnableScheduledFuture<?> f, int idx) {
869             if (f instanceof ScheduledFutureTask)
870                 ((ScheduledFutureTask)f).heapIndex = idx;
871         }
872 
873         /**
874          * Sifts element added at bottom up to its heap-ordered spot.
875          * Call only when holding lock.
876          */
877         private void siftUp(int k, RunnableScheduledFuture<?> key) {
878             while (k > 0) {
879                 int parent = (k - 1) >>> 1;
880                 RunnableScheduledFuture<?> e = queue[parent];
881                 if (key.compareTo(e) >= 0)
882                     break;
883                 queue[k] = e;
884                 setIndex(e, k);
885                 k = parent;
886             }
887             queue[k] = key;
888             setIndex(key, k);
889         }
890 
891         /**
892          * Sifts element added at top down to its heap-ordered spot.
893          * Call only when holding lock.
894          */
895         private void siftDown(int k, RunnableScheduledFuture<?> key) {
896             int half = size >>> 1;
897             while (k < half) {
898                 int child = (k << 1) + 1;
899                 RunnableScheduledFuture<?> c = queue[child];
900                 int right = child + 1;
901                 if (right < size && c.compareTo(queue[right]) > 0)
902                     c = queue[child = right];
903                 if (key.compareTo(c) <= 0)
904                     break;
905                 queue[k] = c;
906                 setIndex(c, k);
907                 k = child;
908             }
909             queue[k] = key;
910             setIndex(key, k);
911         }
912 
913         /**
914          * Resizes the heap array.  Call only when holding lock.
915          */
916         private void grow() {
917             int oldCapacity = queue.length;
918             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
919             if (newCapacity < 0) // overflow
920                 newCapacity = Integer.MAX_VALUE;
921             queue = Arrays.copyOf(queue, newCapacity);
922         }
923 
924         /**
925          * Finds index of given object, or -1 if absent.
926          */
927         private int indexOf(Object x) {
928             if (x != null) {
929                 if (x instanceof ScheduledFutureTask) {
930                     int i = ((ScheduledFutureTask) x).heapIndex;
931                     // Sanity check; x could conceivably be a
932                     // ScheduledFutureTask from some other pool.
933                     if (i >= 0 && i < size && queue[i] == x)
934                         return i;
935                 } else {
936                     for (int i = 0; i < size; i++)
937                         if (x.equals(queue[i]))
938                             return i;
939                 }
940             }
941             return -1;
942         }
943 
944         public boolean contains(Object x) {
945             final ReentrantLock lock = this.lock;
946             lock.lock();
947             try {
948                 return indexOf(x) != -1;
949             } finally {
950                 lock.unlock();
951             }
952         }
953 
954         public boolean remove(Object x) {
955             final ReentrantLock lock = this.lock;
956             lock.lock();
957             try {
958                 int i = indexOf(x);
959                 if (i < 0)
960                     return false;
961 
962                 setIndex(queue[i], -1);
963                 int s = --size;
964                 RunnableScheduledFuture<?> replacement = queue[s];
965                 queue[s] = null;
966                 if (s != i) {
967                     siftDown(i, replacement);
968                     if (queue[i] == replacement)
969                         siftUp(i, replacement);
970                 }
971                 return true;
972             } finally {
973                 lock.unlock();
974             }
975         }
976 
977         public int size() {
978             final ReentrantLock lock = this.lock;
979             lock.lock();
980             try {
981                 return size;
982             } finally {
983                 lock.unlock();
984             }
985         }
986 
987         public boolean isEmpty() {
988             return size() == 0;
989         }
990 
991         public int remainingCapacity() {
992             return Integer.MAX_VALUE;
993         }
994 
995         public RunnableScheduledFuture<?> peek() {
996             final ReentrantLock lock = this.lock;
997             lock.lock();
998             try {
999                 return queue[0];
1000             } finally {
1001                 lock.unlock();
1002             }
1003         }
1004 
1005         public boolean offer(Runnable x) {
1006             if (x == null)
1007                 throw new NullPointerException();
1008             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1009             final ReentrantLock lock = this.lock;
1010             lock.lock();
1011             try {
1012                 int i = size;
1013                 if (i >= queue.length)
1014                     grow();
1015                 size = i + 1;
1016                 if (i == 0) {
1017                     queue[0] = e;
1018                     setIndex(e, 0);
1019                 } else {
1020                     siftUp(i, e);
1021                 }
1022                 if (queue[0] == e) {
1023                     leader = null;
1024                     available.signal();
1025                 }
1026             } finally {
1027                 lock.unlock();
1028             }
1029             return true;
1030         }
1031 
1032         public void put(Runnable e) {
1033             offer(e);
1034         }
1035 
1036         public boolean add(Runnable e) {
1037             return offer(e);
1038         }
1039 
1040         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1041             return offer(e);
1042         }
1043 
1044         /**
1045          * Performs common bookkeeping for poll and take: Replaces
1046          * first element with last and sifts it down.  Call only when
1047          * holding lock.
1048          * @param f the task to remove and return
1049          */
1050         private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1051             int s = --size;
1052             RunnableScheduledFuture<?> x = queue[s];
1053             queue[s] = null;
1054             if (s != 0)
1055                 siftDown(0, x);
1056             setIndex(f, -1);
1057             return f;
1058         }
1059 
1060         public RunnableScheduledFuture<?> poll() {
1061             final ReentrantLock lock = this.lock;
1062             lock.lock();
1063             try {
1064                 RunnableScheduledFuture<?> first = queue[0];
1065                 if (first == null || first.getDelay(NANOSECONDS) > 0)
1066                     return null;
1067                 else
1068                     return finishPoll(first);
1069             } finally {
1070                 lock.unlock();
1071             }
1072         }
1073 
1074         public RunnableScheduledFuture<?> take() throws InterruptedException {
1075             final ReentrantLock lock = this.lock;
1076             lock.lockInterruptibly();
1077             try {
1078                 for (;;) {
1079                     RunnableScheduledFuture<?> first = queue[0];
1080                     if (first == null)
1081                         available.await();
1082                     else {
1083                         long delay = first.getDelay(NANOSECONDS);
1084                         if (delay <= 0)
1085                             return finishPoll(first);
1086                         first = null; // don't retain ref while waiting
1087                         if (leader != null)
1088                             available.await();
1089                         else {
1090                             Thread thisThread = Thread.currentThread();
1091                             leader = thisThread;
1092                             try {
1093                                 available.awaitNanos(delay);
1094                             } finally {
1095                                 if (leader == thisThread)
1096                                     leader = null;
1097                             }
1098                         }
1099                     }
1100                 }
1101             } finally {
1102                 if (leader == null && queue[0] != null)
1103                     available.signal();
1104                 lock.unlock();
1105             }
1106         }
1107 
1108         public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1109             throws InterruptedException {
1110             long nanos = unit.toNanos(timeout);
1111             final ReentrantLock lock = this.lock;
1112             lock.lockInterruptibly();
1113             try {
1114                 for (;;) {
1115                     RunnableScheduledFuture<?> first = queue[0];
1116                     if (first == null) {
1117                         if (nanos <= 0)
1118                             return null;
1119                         else
1120                             nanos = available.awaitNanos(nanos);
1121                     } else {
1122                         long delay = first.getDelay(NANOSECONDS);
1123                         if (delay <= 0)
1124                             return finishPoll(first);
1125                         if (nanos <= 0)
1126                             return null;
1127                         first = null; // don't retain ref while waiting
1128                         if (nanos < delay || leader != null)
1129                             nanos = available.awaitNanos(nanos);
1130                         else {
1131                             Thread thisThread = Thread.currentThread();
1132                             leader = thisThread;
1133                             try {
1134                                 long timeLeft = available.awaitNanos(delay);
1135                                 nanos -= delay - timeLeft;
1136                             } finally {
1137                                 if (leader == thisThread)
1138                                     leader = null;
1139                             }
1140                         }
1141                     }
1142                 }
1143             } finally {
1144                 if (leader == null && queue[0] != null)
1145                     available.signal();
1146                 lock.unlock();
1147             }
1148         }
1149 
1150         public void clear() {
1151             final ReentrantLock lock = this.lock;
1152             lock.lock();
1153             try {
1154                 for (int i = 0; i < size; i++) {
1155                     RunnableScheduledFuture<?> t = queue[i];
1156                     if (t != null) {
1157                         queue[i] = null;
1158                         setIndex(t, -1);
1159                     }
1160                 }
1161                 size = 0;
1162             } finally {
1163                 lock.unlock();
1164             }
1165         }
1166 
1167         /**
1168          * Returns first element only if it is expired.
1169          * Used only by drainTo.  Call only when holding lock.
1170          */
1171         private RunnableScheduledFuture<?> peekExpired() {
1172             // assert lock.isHeldByCurrentThread();
1173             RunnableScheduledFuture<?> first = queue[0];
1174             return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1175                 null : first;
1176         }
1177 
1178         public int drainTo(Collection<? super Runnable> c) {
1179             if (c == null)
1180                 throw new NullPointerException();
1181             if (c == this)
1182                 throw new IllegalArgumentException();
1183             final ReentrantLock lock = this.lock;
1184             lock.lock();
1185             try {
1186                 RunnableScheduledFuture<?> first;
1187                 int n = 0;
1188                 while ((first = peekExpired()) != null) {
1189                     c.add(first);   // In this order, in case add() throws.
1190                     finishPoll(first);
1191                     ++n;
1192                 }
1193                 return n;
1194             } finally {
1195                 lock.unlock();
1196             }
1197         }
1198 
1199         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1200             if (c == null)
1201                 throw new NullPointerException();
1202             if (c == this)
1203                 throw new IllegalArgumentException();
1204             if (maxElements <= 0)
1205                 return 0;
1206             final ReentrantLock lock = this.lock;
1207             lock.lock();
1208             try {
1209                 RunnableScheduledFuture<?> first;
1210                 int n = 0;
1211                 while (n < maxElements && (first = peekExpired()) != null) {
1212                     c.add(first);   // In this order, in case add() throws.
1213                     finishPoll(first);
1214                     ++n;
1215                 }
1216                 return n;
1217             } finally {
1218                 lock.unlock();
1219             }
1220         }
1221 
1222         public Object[] toArray() {
1223             final ReentrantLock lock = this.lock;
1224             lock.lock();
1225             try {
1226                 return Arrays.copyOf(queue, size, Object[].class);
1227             } finally {
1228                 lock.unlock();
1229             }
1230         }
1231 
1232         @SuppressWarnings("unchecked")
1233         public <T> T[] toArray(T[] a) {
1234             final ReentrantLock lock = this.lock;
1235             lock.lock();
1236             try {
1237                 if (a.length < size)
1238                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1239                 System.arraycopy(queue, 0, a, 0, size);
1240                 if (a.length > size)
1241                     a[size] = null;
1242                 return a;
1243             } finally {
1244                 lock.unlock();
1245             }
1246         }
1247 
1248         public Iterator<Runnable> iterator() {
1249             return new Itr(Arrays.copyOf(queue, size));
1250         }
1251 
1252         /**
1253          * Snapshot iterator that works off copy of underlying q array.
1254          */
1255         private class Itr implements Iterator<Runnable> {
1256             final RunnableScheduledFuture<?>[] array;
1257             int cursor = 0;     // index of next element to return
1258             int lastRet = -1;   // index of last element, or -1 if no such
1259 
1260             Itr(RunnableScheduledFuture<?>[] array) {
1261                 this.array = array;
1262             }
1263 
1264             public boolean hasNext() {
1265                 return cursor < array.length;
1266             }
1267 
1268             public Runnable next() {
1269                 if (cursor >= array.length)
1270                     throw new NoSuchElementException();
1271                 lastRet = cursor;
1272                 return array[cursor++];
1273             }
1274 
1275             public void remove() {
1276                 if (lastRet < 0)
1277                     throw new IllegalStateException();
1278                 DelayedWorkQueue.this.remove(array[lastRet]);
1279                 lastRet = -1;
1280             }
1281         }
1282     }
1283 }