View Javadoc
1   /*
2    * Copyright (C) 2007 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.util.concurrent;
16  
17  import static com.google.common.base.Preconditions.checkArgument;
18  import static com.google.common.base.Preconditions.checkNotNull;
19  
20  import com.google.common.annotations.Beta;
21  import com.google.common.annotations.GwtCompatible;
22  import com.google.common.annotations.GwtIncompatible;
23  import com.google.common.annotations.VisibleForTesting;
24  import com.google.common.base.Supplier;
25  import com.google.common.base.Throwables;
26  import com.google.common.collect.Lists;
27  import com.google.common.collect.Queues;
28  import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
29  import com.google.errorprone.annotations.CanIgnoreReturnValue;
30  import java.lang.reflect.InvocationTargetException;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.concurrent.BlockingQueue;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.Delayed;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Executor;
40  import java.util.concurrent.ExecutorService;
41  import java.util.concurrent.Executors;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.RejectedExecutionException;
44  import java.util.concurrent.ScheduledExecutorService;
45  import java.util.concurrent.ScheduledFuture;
46  import java.util.concurrent.ScheduledThreadPoolExecutor;
47  import java.util.concurrent.ThreadFactory;
48  import java.util.concurrent.ThreadPoolExecutor;
49  import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
50  import java.util.concurrent.TimeUnit;
51  import java.util.concurrent.TimeoutException;
52  import javax.annotation.concurrent.GuardedBy;
53  
54  /**
55   * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link ExecutorService},
56   * and {@link ThreadFactory}.
57   *
58   * @author Eric Fellheimer
59   * @author Kyle Littlefield
60   * @author Justin Mahoney
61   * @since 3.0
62   */
63  @GwtCompatible(emulated = true)
64  public final class MoreExecutors {
65    private MoreExecutors() {}
66  
67    /**
68     * Converts the given ThreadPoolExecutor into an ExecutorService that exits when the application
69     * is complete. It does so by using daemon threads and adding a shutdown hook to wait for their
70     * completion.
71     *
72     * <p>This is mainly for fixed thread pools. See {@link Executors#newFixedThreadPool(int)}.
73     *
74     * @param executor the executor to modify to make sure it exits when the application is finished
75     * @param terminationTimeout how long to wait for the executor to finish before terminating the
76     *     JVM
77     * @param timeUnit unit of time for the time parameter
78     * @return an unmodifiable version of the input which will not hang the JVM
79     */
80    @Beta
81    @GwtIncompatible // TODO
82    public static ExecutorService getExitingExecutorService(
83        ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
84      return new Application().getExitingExecutorService(executor, terminationTimeout, timeUnit);
85    }
86  
87    /**
88     * Converts the given ScheduledThreadPoolExecutor into a ScheduledExecutorService that exits when
89     * the application is complete. It does so by using daemon threads and adding a shutdown hook to
90     * wait for their completion.
91     *
92     * <p>This is mainly for fixed thread pools. See {@link Executors#newScheduledThreadPool(int)}.
93     *
94     * @param executor the executor to modify to make sure it exits when the application is finished
95     * @param terminationTimeout how long to wait for the executor to finish before terminating the
96     *     JVM
97     * @param timeUnit unit of time for the time parameter
98     * @return an unmodifiable version of the input which will not hang the JVM
99     */
100   @Beta
101   @GwtIncompatible // TODO
102   public static ScheduledExecutorService getExitingScheduledExecutorService(
103       ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
104     return new Application()
105         .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
106   }
107 
108   /**
109    * Add a shutdown hook to wait for thread completion in the given {@link ExecutorService service}.
110    * This is useful if the given service uses daemon threads, and we want to keep the JVM from
111    * exiting immediately on shutdown, instead giving these daemon threads a chance to terminate
112    * normally.
113    *
114    * @param service ExecutorService which uses daemon threads
115    * @param terminationTimeout how long to wait for the executor to finish before terminating the
116    *     JVM
117    * @param timeUnit unit of time for the time parameter
118    */
119   @Beta
120   @GwtIncompatible // TODO
121   public static void addDelayedShutdownHook(
122       ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
123     new Application().addDelayedShutdownHook(service, terminationTimeout, timeUnit);
124   }
125 
126   /**
127    * Converts the given ThreadPoolExecutor into an ExecutorService that exits when the application
128    * is complete. It does so by using daemon threads and adding a shutdown hook to wait for their
129    * completion.
130    *
131    * <p>This method waits 120 seconds before continuing with JVM termination, even if the executor
132    * has not finished its work.
133    *
134    * <p>This is mainly for fixed thread pools. See {@link Executors#newFixedThreadPool(int)}.
135    *
136    * @param executor the executor to modify to make sure it exits when the application is finished
137    * @return an unmodifiable version of the input which will not hang the JVM
138    */
139   @Beta
140   @GwtIncompatible // concurrency
141   public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
142     return new Application().getExitingExecutorService(executor);
143   }
144 
145   /**
146    * Converts the given ScheduledThreadPoolExecutor into a ScheduledExecutorService that exits when
147    * the application is complete. It does so by using daemon threads and adding a shutdown hook to
148    * wait for their completion.
149    *
150    * <p>This method waits 120 seconds before continuing with JVM termination, even if the executor
151    * has not finished its work.
152    *
153    * <p>This is mainly for fixed thread pools. See {@link Executors#newScheduledThreadPool(int)}.
154    *
155    * @param executor the executor to modify to make sure it exits when the application is finished
156    * @return an unmodifiable version of the input which will not hang the JVM
157    */
158   @Beta
159   @GwtIncompatible // TODO
160   public static ScheduledExecutorService getExitingScheduledExecutorService(
161       ScheduledThreadPoolExecutor executor) {
162     return new Application().getExitingScheduledExecutorService(executor);
163   }
164 
165   /** Represents the current application to register shutdown hooks. */
166   @GwtIncompatible // TODO
167   @VisibleForTesting
168   static class Application {
169 
170     final ExecutorService getExitingExecutorService(
171         ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
172       useDaemonThreadFactory(executor);
173       ExecutorService service = Executors.unconfigurableExecutorService(executor);
174       addDelayedShutdownHook(executor, terminationTimeout, timeUnit);
175       return service;
176     }
177 
178     final ScheduledExecutorService getExitingScheduledExecutorService(
179         ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
180       useDaemonThreadFactory(executor);
181       ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
182       addDelayedShutdownHook(executor, terminationTimeout, timeUnit);
183       return service;
184     }
185 
186     final void addDelayedShutdownHook(
187         final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
188       checkNotNull(service);
189       checkNotNull(timeUnit);
190       addShutdownHook(
191           MoreExecutors.newThread(
192               "DelayedShutdownHook-for-" + service,
193               new Runnable() {
194                 @Override
195                 public void run() {
196                   try {
197                     // We'd like to log progress and failures that may arise in the
198                     // following code, but unfortunately the behavior of logging
199                     // is undefined in shutdown hooks.
200                     // This is because the logging code installs a shutdown hook of its
201                     // own. See Cleaner class inside {@link LogManager}.
202                     service.shutdown();
203                     service.awaitTermination(terminationTimeout, timeUnit);
204                   } catch (InterruptedException ignored) {
205                     // We're shutting down anyway, so just ignore.
206                   }
207                 }
208               }));
209     }
210 
211     final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
212       return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
213     }
214 
215     final ScheduledExecutorService getExitingScheduledExecutorService(
216         ScheduledThreadPoolExecutor executor) {
217       return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
218     }
219 
220     @VisibleForTesting
221     void addShutdownHook(Thread hook) {
222       Runtime.getRuntime().addShutdownHook(hook);
223     }
224   }
225 
226   @GwtIncompatible // TODO
227   private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
228     executor.setThreadFactory(
229         new ThreadFactoryBuilder()
230             .setDaemon(true)
231             .setThreadFactory(executor.getThreadFactory())
232             .build());
233   }
234 
235   // See newDirectExecutorService javadoc for behavioral notes.
236   @GwtIncompatible // TODO
237   private static final class DirectExecutorService extends AbstractListeningExecutorService {
238     /**
239      * Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor
240      */
241     private final Object lock = new Object();
242 
243     /*
244      * Conceptually, these two variables describe the executor being in
245      * one of three states:
246      *   - Active: shutdown == false
247      *   - Shutdown: runningTasks > 0 and shutdown == true
248      *   - Terminated: runningTasks == 0 and shutdown == true
249      */
250     @GuardedBy("lock")
251     private int runningTasks = 0;
252 
253     @GuardedBy("lock")
254     private boolean shutdown = false;
255 
256     @Override
257     public void execute(Runnable command) {
258       startTask();
259       try {
260         command.run();
261       } finally {
262         endTask();
263       }
264     }
265 
266     @Override
267     public boolean isShutdown() {
268       synchronized (lock) {
269         return shutdown;
270       }
271     }
272 
273     @Override
274     public void shutdown() {
275       synchronized (lock) {
276         shutdown = true;
277         if (runningTasks == 0) {
278           lock.notifyAll();
279         }
280       }
281     }
282 
283     // See newDirectExecutorService javadoc for unusual behavior of this method.
284     @Override
285     public List<Runnable> shutdownNow() {
286       shutdown();
287       return Collections.emptyList();
288     }
289 
290     @Override
291     public boolean isTerminated() {
292       synchronized (lock) {
293         return shutdown && runningTasks == 0;
294       }
295     }
296 
297     @Override
298     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
299       long nanos = unit.toNanos(timeout);
300       synchronized (lock) {
301         while (true) {
302           if (shutdown && runningTasks == 0) {
303             return true;
304           } else if (nanos <= 0) {
305             return false;
306           } else {
307             long now = System.nanoTime();
308             TimeUnit.NANOSECONDS.timedWait(lock, nanos);
309             nanos -= System.nanoTime() - now; // subtract the actual time we waited
310           }
311         }
312       }
313     }
314 
315     /**
316      * Checks if the executor has been shut down and increments the running task count.
317      *
318      * @throws RejectedExecutionException if the executor has been previously shutdown
319      */
320     private void startTask() {
321       synchronized (lock) {
322         if (shutdown) {
323           throw new RejectedExecutionException("Executor already shutdown");
324         }
325         runningTasks++;
326       }
327     }
328 
329     /**
330      * Decrements the running task count.
331      */
332     private void endTask() {
333       synchronized (lock) {
334         int numRunning = --runningTasks;
335         if (numRunning == 0) {
336           lock.notifyAll();
337         }
338       }
339     }
340   }
341 
342   /**
343    * Creates an executor service that runs each task in the thread that invokes
344    * {@code execute/submit}, as in {@link CallerRunsPolicy} This applies both to individually
345    * submitted tasks and to collections of tasks submitted via {@code invokeAll} or
346    * {@code invokeAny}. In the latter case, tasks will run serially on the calling thread. Tasks are
347    * run to completion before a {@code Future} is returned to the caller (unless the executor has
348    * been shutdown).
349    *
350    * <p>Although all tasks are immediately executed in the thread that submitted the task, this
351    * {@code ExecutorService} imposes a small locking overhead on each task submission in order to
352    * implement shutdown and termination behavior.
353    *
354    * <p>The implementation deviates from the {@code ExecutorService} specification with regards to
355    * the {@code shutdownNow} method. First, "best-effort" with regards to canceling running tasks is
356    * implemented as "no-effort". No interrupts or other attempts are made to stop threads executing
357    * tasks. Second, the returned list will always be empty, as any submitted task is considered to
358    * have started execution. This applies also to tasks given to {@code invokeAll} or
359    * {@code invokeAny} which are pending serial execution, even the subset of the tasks that have
360    * not yet started execution. It is unclear from the {@code ExecutorService} specification if
361    * these should be included, and it's much easier to implement the interpretation that they not
362    * be. Finally, a call to {@code shutdown} or {@code shutdownNow} may result in concurrent calls
363    * to {@code invokeAll/invokeAny} throwing RejectedExecutionException, although a subset of the
364    * tasks may already have been executed.
365    *
366    * @since 18.0 (present as MoreExecutors.sameThreadExecutor() since 10.0)
367    */
368   @GwtIncompatible // TODO
369   public static ListeningExecutorService newDirectExecutorService() {
370     return new DirectExecutorService();
371   }
372 
373   /**
374    * Returns an {@link Executor} that runs each task in the thread that invokes
375    * {@link Executor#execute execute}, as in {@link CallerRunsPolicy}.
376    *
377    * <p>This instance is equivalent to: <pre>   {@code
378    *   final class DirectExecutor implements Executor {
379    *     public void execute(Runnable r) {
380    *       r.run();
381    *     }
382    *   }}</pre>
383    *
384    * <p>This should be preferred to {@link #newDirectExecutorService()} because implementing the
385    * {@link ExecutorService} subinterface necessitates significant performance overhead.
386    *
387    * @since 18.0
388    */
389   public static Executor directExecutor() {
390     return DirectExecutor.INSTANCE;
391   }
392 
393   /** See {@link #directExecutor} for behavioral notes. */
394   private enum DirectExecutor implements Executor {
395     INSTANCE;
396 
397     @Override
398     public void execute(Runnable command) {
399       command.run();
400     }
401 
402     @Override
403     public String toString() {
404       return "MoreExecutors.directExecutor()";
405     }
406   }
407 
408   /**
409    * Returns an {@link Executor} that runs each task executed sequentially, such that no
410    * two tasks are running concurrently.
411    *
412    * <p>The executor uses {@code delegate} in order to {@link Executor#execute execute} each task in
413    * turn, and does not create any threads of its own.
414    *
415    * <p>After execution starts on the {@code delegate} {@link Executor}, tasks are polled and
416    * executed from the queue until there are no more tasks. The thread will not be released until
417    * there are no more tasks to run.
418    *
419    * <p>If a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
420    * continues. {@code RuntimeException}s thrown by tasks are simply logged and the executor keeps
421    * trucking. If an {@code Error} is thrown, the error will propagate and execution will stop until
422    * the next time a task is submitted.
423    *
424    * @since 23.1
425    */
426   @Beta
427   @GwtIncompatible
428   public static Executor sequentialExecutor(Executor delegate) {
429     return new SequentialExecutor(delegate);
430   }
431 
432   /**
433    * Creates an {@link ExecutorService} whose {@code submit} and {@code
434    * invokeAll} methods submit {@link ListenableFutureTask} instances to the given delegate
435    * executor. Those methods, as well as {@code execute} and {@code invokeAny}, are implemented in
436    * terms of calls to {@code
437    * delegate.execute}. All other methods are forwarded unchanged to the delegate. This implies that
438    * the returned {@code ListeningExecutorService} never calls the delegate's {@code submit},
439    * {@code invokeAll}, and {@code
440    * invokeAny} methods, so any special handling of tasks must be implemented in the delegate's
441    * {@code execute} method or by wrapping the returned {@code
442    * ListeningExecutorService}.
443    *
444    * <p>If the delegate executor was already an instance of {@code
445    * ListeningExecutorService}, it is returned untouched, and the rest of this documentation does
446    * not apply.
447    *
448    * @since 10.0
449    */
450   @GwtIncompatible // TODO
451   public static ListeningExecutorService listeningDecorator(ExecutorService delegate) {
452     return (delegate instanceof ListeningExecutorService)
453         ? (ListeningExecutorService) delegate
454         : (delegate instanceof ScheduledExecutorService)
455             ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
456             : new ListeningDecorator(delegate);
457   }
458 
459   /**
460    * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code invokeAll} methods
461    * submit {@link ListenableFutureTask} instances to the given delegate executor. Those methods, as
462    * well as {@code execute} and {@code invokeAny}, are implemented in terms of calls to {@code
463    * delegate.execute}. All other methods are forwarded unchanged to the delegate. This implies that
464    * the returned {@code ListeningScheduledExecutorService} never calls the delegate's {@code
465    * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special handling of tasks
466    * must be implemented in the delegate's {@code execute} method or by wrapping the returned {@code
467    * ListeningScheduledExecutorService}.
468    *
469    * <p>If the delegate executor was already an instance of {@code
470    * ListeningScheduledExecutorService}, it is returned untouched, and the rest of this
471    * documentation does not apply.
472    *
473    * @since 10.0
474    */
475   @GwtIncompatible // TODO
476   public static ListeningScheduledExecutorService listeningDecorator(
477       ScheduledExecutorService delegate) {
478     return (delegate instanceof ListeningScheduledExecutorService)
479         ? (ListeningScheduledExecutorService) delegate
480         : new ScheduledListeningDecorator(delegate);
481   }
482 
483   @GwtIncompatible // TODO
484   private static class ListeningDecorator extends AbstractListeningExecutorService {
485     private final ExecutorService delegate;
486 
487     ListeningDecorator(ExecutorService delegate) {
488       this.delegate = checkNotNull(delegate);
489     }
490 
491     @Override
492     public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
493       return delegate.awaitTermination(timeout, unit);
494     }
495 
496     @Override
497     public final boolean isShutdown() {
498       return delegate.isShutdown();
499     }
500 
501     @Override
502     public final boolean isTerminated() {
503       return delegate.isTerminated();
504     }
505 
506     @Override
507     public final void shutdown() {
508       delegate.shutdown();
509     }
510 
511     @Override
512     public final List<Runnable> shutdownNow() {
513       return delegate.shutdownNow();
514     }
515 
516     @Override
517     public final void execute(Runnable command) {
518       delegate.execute(command);
519     }
520   }
521 
522   @GwtIncompatible // TODO
523   private static final class ScheduledListeningDecorator extends ListeningDecorator
524       implements ListeningScheduledExecutorService {
525     @SuppressWarnings("hiding")
526     final ScheduledExecutorService delegate;
527 
528     ScheduledListeningDecorator(ScheduledExecutorService delegate) {
529       super(delegate);
530       this.delegate = checkNotNull(delegate);
531     }
532 
533     @Override
534     public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
535       TrustedListenableFutureTask<Void> task = TrustedListenableFutureTask.create(command, null);
536       ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
537       return new ListenableScheduledTask<>(task, scheduled);
538     }
539 
540     @Override
541     public <V> ListenableScheduledFuture<V> schedule(
542         Callable<V> callable, long delay, TimeUnit unit) {
543       TrustedListenableFutureTask<V> task = TrustedListenableFutureTask.create(callable);
544       ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
545       return new ListenableScheduledTask<V>(task, scheduled);
546     }
547 
548     @Override
549     public ListenableScheduledFuture<?> scheduleAtFixedRate(
550         Runnable command, long initialDelay, long period, TimeUnit unit) {
551       NeverSuccessfulListenableFutureTask task = new NeverSuccessfulListenableFutureTask(command);
552       ScheduledFuture<?> scheduled = delegate.scheduleAtFixedRate(task, initialDelay, period, unit);
553       return new ListenableScheduledTask<>(task, scheduled);
554     }
555 
556     @Override
557     public ListenableScheduledFuture<?> scheduleWithFixedDelay(
558         Runnable command, long initialDelay, long delay, TimeUnit unit) {
559       NeverSuccessfulListenableFutureTask task = new NeverSuccessfulListenableFutureTask(command);
560       ScheduledFuture<?> scheduled =
561           delegate.scheduleWithFixedDelay(task, initialDelay, delay, unit);
562       return new ListenableScheduledTask<>(task, scheduled);
563     }
564 
565     private static final class ListenableScheduledTask<V>
566         extends SimpleForwardingListenableFuture<V> implements ListenableScheduledFuture<V> {
567 
568       private final ScheduledFuture<?> scheduledDelegate;
569 
570       public ListenableScheduledTask(
571           ListenableFuture<V> listenableDelegate, ScheduledFuture<?> scheduledDelegate) {
572         super(listenableDelegate);
573         this.scheduledDelegate = scheduledDelegate;
574       }
575 
576       @Override
577       public boolean cancel(boolean mayInterruptIfRunning) {
578         boolean cancelled = super.cancel(mayInterruptIfRunning);
579         if (cancelled) {
580           // Unless it is cancelled, the delegate may continue being scheduled
581           scheduledDelegate.cancel(mayInterruptIfRunning);
582 
583           // TODO(user): Cancel "this" if "scheduledDelegate" is cancelled.
584         }
585         return cancelled;
586       }
587 
588       @Override
589       public long getDelay(TimeUnit unit) {
590         return scheduledDelegate.getDelay(unit);
591       }
592 
593       @Override
594       public int compareTo(Delayed other) {
595         return scheduledDelegate.compareTo(other);
596       }
597     }
598 
599     @GwtIncompatible // TODO
600     private static final class NeverSuccessfulListenableFutureTask extends AbstractFuture<Void>
601         implements Runnable {
602       private final Runnable delegate;
603 
604       public NeverSuccessfulListenableFutureTask(Runnable delegate) {
605         this.delegate = checkNotNull(delegate);
606       }
607 
608       @Override
609       public void run() {
610         try {
611           delegate.run();
612         } catch (Throwable t) {
613           setException(t);
614           throw Throwables.propagate(t);
615         }
616       }
617     }
618   }
619 
620   /*
621    * This following method is a modified version of one found in
622    * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
623    * which contained the following notice:
624    *
625    * Written by Doug Lea with assistance from members of JCP JSR-166 Expert Group and released to
626    * the public domain, as explained at http://creativecommons.org/publicdomain/zero/1.0/
627    *
628    * Other contributors include Andrew Wright, Jeffrey Hayes, Pat Fisher, Mike Judd.
629    */
630 
631   /**
632    * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
633    * implementations.
634    */
635   @GwtIncompatible static <T> T invokeAnyImpl(
636       ListeningExecutorService executorService,
637       Collection<? extends Callable<T>> tasks,
638       boolean timed,
639       long timeout,
640       TimeUnit unit)
641       throws InterruptedException, ExecutionException, TimeoutException {
642     checkNotNull(executorService);
643     checkNotNull(unit);
644     int ntasks = tasks.size();
645     checkArgument(ntasks > 0);
646     List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
647     BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
648     long timeoutNanos = unit.toNanos(timeout);
649 
650     // For efficiency, especially in executors with limited
651     // parallelism, check to see if previously submitted tasks are
652     // done before submitting more of them. This interleaving
653     // plus the exception mechanics account for messiness of main
654     // loop.
655 
656     try {
657       // Record exceptions so that if we fail to obtain any
658       // result, we can throw the last exception we got.
659       ExecutionException ee = null;
660       long lastTime = timed ? System.nanoTime() : 0;
661       Iterator<? extends Callable<T>> it = tasks.iterator();
662 
663       futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
664       --ntasks;
665       int active = 1;
666 
667       while (true) {
668         Future<T> f = futureQueue.poll();
669         if (f == null) {
670           if (ntasks > 0) {
671             --ntasks;
672             futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
673             ++active;
674           } else if (active == 0) {
675             break;
676           } else if (timed) {
677             f = futureQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
678             if (f == null) {
679               throw new TimeoutException();
680             }
681             long now = System.nanoTime();
682             timeoutNanos -= now - lastTime;
683             lastTime = now;
684           } else {
685             f = futureQueue.take();
686           }
687         }
688         if (f != null) {
689           --active;
690           try {
691             return f.get();
692           } catch (ExecutionException eex) {
693             ee = eex;
694           } catch (RuntimeException rex) {
695             ee = new ExecutionException(rex);
696           }
697         }
698       }
699 
700       if (ee == null) {
701         ee = new ExecutionException(null);
702       }
703       throw ee;
704     } finally {
705       for (Future<T> f : futures) {
706         f.cancel(true);
707       }
708     }
709   }
710 
711   /**
712    * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
713    */
714   @GwtIncompatible // TODO
715   private static <T> ListenableFuture<T> submitAndAddQueueListener(
716       ListeningExecutorService executorService,
717       Callable<T> task,
718       final BlockingQueue<Future<T>> queue) {
719     final ListenableFuture<T> future = executorService.submit(task);
720     future.addListener(
721         new Runnable() {
722           @Override
723           public void run() {
724             queue.add(future);
725           }
726         },
727         directExecutor());
728     return future;
729   }
730 
731   /**
732    * Returns a default thread factory used to create new threads.
733    *
734    * <p>On AppEngine, returns {@code ThreadManager.currentRequestThreadFactory()}. Otherwise,
735    * returns {@link Executors#defaultThreadFactory()}.
736    *
737    * @since 14.0
738    */
739   @Beta
740   @GwtIncompatible // concurrency
741   public static ThreadFactory platformThreadFactory() {
742     if (!isAppEngine()) {
743       return Executors.defaultThreadFactory();
744     }
745     try {
746       return (ThreadFactory)
747           Class.forName("com.google.appengine.api.ThreadManager")
748               .getMethod("currentRequestThreadFactory")
749               .invoke(null);
750     } catch (IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
751       throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
752     } catch (InvocationTargetException e) {
753       throw Throwables.propagate(e.getCause());
754     }
755   }
756 
757   @GwtIncompatible // TODO
758   private static boolean isAppEngine() {
759     if (System.getProperty("com.google.appengine.runtime.environment") == null) {
760       return false;
761     }
762     try {
763       // If the current environment is null, we're not inside AppEngine.
764       return Class.forName("com.google.apphosting.api.ApiProxy")
765               .getMethod("getCurrentEnvironment")
766               .invoke(null)
767           != null;
768     } catch (ClassNotFoundException e) {
769       // If ApiProxy doesn't exist, we're not on AppEngine at all.
770       return false;
771     } catch (InvocationTargetException e) {
772       // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
773       return false;
774     } catch (IllegalAccessException e) {
775       // If the method isn't accessible, we're not on a supported version of AppEngine;
776       return false;
777     } catch (NoSuchMethodException e) {
778       // If the method doesn't exist, we're not on a supported version of AppEngine;
779       return false;
780     }
781   }
782 
783   /**
784    * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name} unless
785    * changing the name is forbidden by the security manager.
786    */
787   @GwtIncompatible // concurrency
788   static Thread newThread(String name, Runnable runnable) {
789     checkNotNull(name);
790     checkNotNull(runnable);
791     Thread result = platformThreadFactory().newThread(runnable);
792     try {
793       result.setName(name);
794     } catch (SecurityException e) {
795       // OK if we can't set the name in this environment.
796     }
797     return result;
798   }
799 
800   // TODO(lukes): provide overloads for ListeningExecutorService? ListeningScheduledExecutorService?
801   // TODO(lukes): provide overloads that take constant strings? Function<Runnable, String>s to
802   // calculate names?
803 
804   /**
805    * Creates an {@link Executor} that renames the {@link Thread threads} that its tasks run in.
806    *
807    * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
808    * right before each task is run. The renaming is best effort, if a {@link SecurityManager}
809    * prevents the renaming then it will be skipped but the tasks will still execute.
810    *
811    *
812    * @param executor The executor to decorate
813    * @param nameSupplier The source of names for each task
814    */
815   @GwtIncompatible // concurrency
816   static Executor renamingDecorator(final Executor executor, final Supplier<String> nameSupplier) {
817     checkNotNull(executor);
818     checkNotNull(nameSupplier);
819     if (isAppEngine()) {
820       // AppEngine doesn't support thread renaming, so don't even try
821       return executor;
822     }
823     return new Executor() {
824       @Override
825       public void execute(Runnable command) {
826         executor.execute(Callables.threadRenaming(command, nameSupplier));
827       }
828     };
829   }
830 
831   /**
832    * Creates an {@link ExecutorService} that renames the {@link Thread threads} that its tasks run
833    * in.
834    *
835    * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
836    * right before each task is run. The renaming is best effort, if a {@link SecurityManager}
837    * prevents the renaming then it will be skipped but the tasks will still execute.
838    *
839    *
840    * @param service The executor to decorate
841    * @param nameSupplier The source of names for each task
842    */
843   @GwtIncompatible // concurrency
844   static ExecutorService renamingDecorator(
845       final ExecutorService service, final Supplier<String> nameSupplier) {
846     checkNotNull(service);
847     checkNotNull(nameSupplier);
848     if (isAppEngine()) {
849       // AppEngine doesn't support thread renaming, so don't even try.
850       return service;
851     }
852     return new WrappingExecutorService(service) {
853       @Override
854       protected <T> Callable<T> wrapTask(Callable<T> callable) {
855         return Callables.threadRenaming(callable, nameSupplier);
856       }
857 
858       @Override
859       protected Runnable wrapTask(Runnable command) {
860         return Callables.threadRenaming(command, nameSupplier);
861       }
862     };
863   }
864 
865   /**
866    * Creates a {@link ScheduledExecutorService} that renames the {@link Thread threads} that its
867    * tasks run in.
868    *
869    * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
870    * right before each task is run. The renaming is best effort, if a {@link SecurityManager}
871    * prevents the renaming then it will be skipped but the tasks will still execute.
872    *
873    *
874    * @param service The executor to decorate
875    * @param nameSupplier The source of names for each task
876    */
877   @GwtIncompatible // concurrency
878   static ScheduledExecutorService renamingDecorator(
879       final ScheduledExecutorService service, final Supplier<String> nameSupplier) {
880     checkNotNull(service);
881     checkNotNull(nameSupplier);
882     if (isAppEngine()) {
883       // AppEngine doesn't support thread renaming, so don't even try.
884       return service;
885     }
886     return new WrappingScheduledExecutorService(service) {
887       @Override
888       protected <T> Callable<T> wrapTask(Callable<T> callable) {
889         return Callables.threadRenaming(callable, nameSupplier);
890       }
891 
892       @Override
893       protected Runnable wrapTask(Runnable command) {
894         return Callables.threadRenaming(command, nameSupplier);
895       }
896     };
897   }
898 
899   /**
900    * Shuts down the given executor service gradually, first disabling new submissions and later, if
901    * necessary, cancelling remaining tasks.
902    *
903    * <p>The method takes the following steps:
904    * <ol>
905    * <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of new submitted tasks.
906    * <li>awaits executor service termination for half of the specified timeout.
907    * <li>if the timeout expires, it calls {@link ExecutorService#shutdownNow()}, cancelling pending
908    * tasks and interrupting running tasks.
909    * <li>awaits executor service termination for the other half of the specified timeout.
910    * </ol>
911    *
912    * <p>If, at any step of the process, the calling thread is interrupted, the method calls
913    * {@link ExecutorService#shutdownNow()} and returns.
914    *
915    * @param service the {@code ExecutorService} to shut down
916    * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate
917    * @param unit the time unit of the timeout argument
918    * @return {@code true} if the {@code ExecutorService} was terminated successfully, {@code false}
919    *     if the call timed out or was interrupted
920    * @since 17.0
921    */
922   @Beta
923   @CanIgnoreReturnValue
924   @GwtIncompatible // concurrency
925   public static boolean shutdownAndAwaitTermination(
926       ExecutorService service, long timeout, TimeUnit unit) {
927     long halfTimeoutNanos = unit.toNanos(timeout) / 2;
928     // Disable new tasks from being submitted
929     service.shutdown();
930     try {
931       // Wait for half the duration of the timeout for existing tasks to terminate
932       if (!service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
933         // Cancel currently executing tasks
934         service.shutdownNow();
935         // Wait the other half of the timeout for tasks to respond to being cancelled
936         service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
937       }
938     } catch (InterruptedException ie) {
939       // Preserve interrupt status
940       Thread.currentThread().interrupt();
941       // (Re-)Cancel if current thread also interrupted
942       service.shutdownNow();
943     }
944     return service.isTerminated();
945   }
946 
947   /**
948    * Returns an Executor that will propagate {@link RejectedExecutionException} from the delegate
949    * executor to the given {@code future}.
950    *
951    * <p>Note, the returned executor can only be used once.
952    */
953   static Executor rejectionPropagatingExecutor(
954       final Executor delegate, final AbstractFuture<?> future) {
955     checkNotNull(delegate);
956     checkNotNull(future);
957     if (delegate == directExecutor()) {
958       // directExecutor() cannot throw RejectedExecutionException
959       return delegate;
960     }
961     return new Executor() {
962       volatile boolean thrownFromDelegate = true;
963 
964       @Override
965       public void execute(final Runnable command) {
966         try {
967           delegate.execute(
968               new Runnable() {
969                 @Override
970                 public void run() {
971                   thrownFromDelegate = false;
972                   command.run();
973                 }
974               });
975         } catch (RejectedExecutionException e) {
976           if (thrownFromDelegate) {
977             // wrap exception?
978             future.setException(e);
979           }
980           // otherwise it must have been thrown from a transitive call and the delegate runnable
981           // should have handled it.
982         }
983       }
984     };
985   }
986 }