View Javadoc
1   /*
2    * Copyright (C) 2007 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.util.concurrent;
16  
17  import static com.google.common.base.Preconditions.checkNotNull;
18  import static com.google.common.base.Strings.isNullOrEmpty;
19  import static com.google.common.base.Throwables.throwIfUnchecked;
20  import static com.google.common.util.concurrent.Futures.getDone;
21  import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22  import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
23  
24  import com.google.common.annotations.Beta;
25  import com.google.common.annotations.GwtCompatible;
26  import com.google.common.base.Ascii;
27  import com.google.errorprone.annotations.CanIgnoreReturnValue;
28  import com.google.errorprone.annotations.DoNotMock;
29  import com.google.errorprone.annotations.ForOverride;
30  import com.google.j2objc.annotations.ReflectionSupport;
31  import java.security.AccessController;
32  import java.security.PrivilegedActionException;
33  import java.security.PrivilegedExceptionAction;
34  import java.util.concurrent.CancellationException;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.Executor;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.ScheduledFuture;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
42  import java.util.concurrent.locks.LockSupport;
43  import java.util.logging.Level;
44  import java.util.logging.Logger;
45  import javax.annotation.Nullable;
46  
47  /**
48   * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More
49   * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture},
50   * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an
51   * existing one, typically using methods like {@link Futures#transform(ListenableFuture,
52   * com.google.common.base.Function, java.util.concurrent.Executor) Futures.transform} and {@link
53   * Futures#catching(ListenableFuture, Class, com.google.common.base.Function,
54   * java.util.concurrent.Executor) Futures.catching}.
55   *
56   * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way
57   * to set the result of the computation through the protected methods {@link #set(Object)}, {@link
58   * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override
59   * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses
60   * should rarely override other methods.
61   *
62   * @author Sven Mawson
63   * @author Luke Sandberg
64   * @since 1.0
65   */
66  @SuppressWarnings("ShortCircuitBoolean") // we use non-short circuiting comparisons intentionally
67  @DoNotMock("Use Futures.immediate*Future or SettableFuture")
68  @GwtCompatible(emulated = true)
69  @ReflectionSupport(value = ReflectionSupport.Level.FULL)
70  public abstract class AbstractFuture<V> extends FluentFuture<V> {
71    // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, ||
72  
73    private static final boolean GENERATE_CANCELLATION_CAUSES =
74        Boolean.parseBoolean(
75            System.getProperty("guava.concurrent.generate_cancellation_cause", "false"));
76  
77    /**
78     * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring
79     * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}.
80     */
81    abstract static class TrustedFuture<V> extends AbstractFuture<V> {
82      @CanIgnoreReturnValue
83      @Override
84      public final V get() throws InterruptedException, ExecutionException {
85        return super.get();
86      }
87  
88      @CanIgnoreReturnValue
89      @Override
90      public final V get(long timeout, TimeUnit unit)
91          throws InterruptedException, ExecutionException, TimeoutException {
92        return super.get(timeout, unit);
93      }
94  
95      @Override
96      public final boolean isDone() {
97        return super.isDone();
98      }
99  
100     @Override
101     public final boolean isCancelled() {
102       return super.isCancelled();
103     }
104 
105     @Override
106     public final void addListener(Runnable listener, Executor executor) {
107       super.addListener(listener, executor);
108     }
109 
110     @CanIgnoreReturnValue
111     @Override
112     public final boolean cancel(boolean mayInterruptIfRunning) {
113       return super.cancel(mayInterruptIfRunning);
114     }
115   }
116 
117   // Logger to log exceptions caught when running listeners.
118   private static final Logger log = Logger.getLogger(AbstractFuture.class.getName());
119 
120   // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of
121   // blocking. This value is what AbstractQueuedSynchronizer uses.
122   private static final long SPIN_THRESHOLD_NANOS = 1000L;
123 
124   private static final AtomicHelper ATOMIC_HELPER;
125 
126   static {
127     AtomicHelper helper;
128 
129     try {
130       helper = new UnsafeAtomicHelper();
131     } catch (Throwable unsafeFailure) {
132       // catch absolutely everything and fall through to our 'SafeAtomicHelper'
133       // The access control checks that ARFU does means the caller class has to be AbstractFuture
134       // instead of SafeAtomicHelper, so we annoyingly define these here
135       try {
136         helper =
137             new SafeAtomicHelper(
138                 newUpdater(Waiter.class, Thread.class, "thread"),
139                 newUpdater(Waiter.class, Waiter.class, "next"),
140                 newUpdater(AbstractFuture.class, Waiter.class, "waiters"),
141                 newUpdater(AbstractFuture.class, Listener.class, "listeners"),
142                 newUpdater(AbstractFuture.class, Object.class, "value"));
143       } catch (Throwable atomicReferenceFieldUpdaterFailure) {
144         // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause
145         // getDeclaredField to throw a NoSuchFieldException when the field is definitely there.
146         // For these users fallback to a suboptimal implementation, based on synchronized. This will
147         // be a definite performance hit to those users.
148         log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure);
149         log.log(Level.SEVERE, "SafeAtomicHelper is broken!", atomicReferenceFieldUpdaterFailure);
150         helper = new SynchronizedHelper();
151       }
152     }
153     ATOMIC_HELPER = helper;
154 
155     // Prevent rare disastrous classloading in first call to LockSupport.park.
156     // See: https://bugs.openjdk.java.net/browse/JDK-8074773
157     @SuppressWarnings("unused")
158     Class<?> ensureLoaded = LockSupport.class;
159   }
160 
161   /**
162    * Waiter links form a Treiber stack, in the {@link #waiters} field.
163    */
164   private static final class Waiter {
165     static final Waiter TOMBSTONE = new Waiter(false /* ignored param */);
166 
167     @Nullable volatile Thread thread;
168     @Nullable volatile Waiter next;
169 
170     /**
171      * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded
172      * before the ATOMIC_HELPER. Apparently this is possible on some android platforms.
173      */
174     Waiter(boolean unused) {}
175 
176     Waiter() {
177       // avoid volatile write, write is made visible by subsequent CAS on waiters field
178       ATOMIC_HELPER.putThread(this, Thread.currentThread());
179     }
180 
181     // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters
182     // field.
183     void setNext(Waiter next) {
184       ATOMIC_HELPER.putNext(this, next);
185     }
186 
187     void unpark() {
188       // This is racy with removeWaiter. The consequence of the race is that we may spuriously call
189       // unpark even though the thread has already removed itself from the list. But even if we did
190       // use a CAS, that race would still exist (it would just be ever so slightly smaller).
191       Thread w = thread;
192       if (w != null) {
193         thread = null;
194         LockSupport.unpark(w);
195       }
196     }
197   }
198 
199   /**
200    * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted
201    * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved
202    * by two things.
203    * <ul>
204    * <li>This is only called when a waiting thread times out or is interrupted. Both of which should
205    *     be rare.
206    * <li>The waiters list should be very short.
207    * </ul>
208    */
209   private void removeWaiter(Waiter node) {
210     node.thread = null; // mark as 'deleted'
211     restart:
212     while (true) {
213       Waiter pred = null;
214       Waiter curr = waiters;
215       if (curr == Waiter.TOMBSTONE) {
216         return; // give up if someone is calling complete
217       }
218       Waiter succ;
219       while (curr != null) {
220         succ = curr.next;
221         if (curr.thread != null) { // we aren't unlinking this node, update pred.
222           pred = curr;
223         } else if (pred != null) { // We are unlinking this node and it has a predecessor.
224           pred.next = succ;
225           if (pred.thread == null) { // We raced with another node that unlinked pred. Restart.
226             continue restart;
227           }
228         } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head
229           continue restart; // We raced with an add or complete
230         }
231         curr = succ;
232       }
233       break;
234     }
235   }
236 
237   /** Listeners also form a stack through the {@link #listeners} field. */
238   private static final class Listener {
239     static final Listener TOMBSTONE = new Listener(null, null);
240     final Runnable task;
241     final Executor executor;
242 
243     // writes to next are made visible by subsequent CAS's on the listeners field
244     @Nullable Listener next;
245 
246     Listener(Runnable task, Executor executor) {
247       this.task = task;
248       this.executor = executor;
249     }
250   }
251 
252   /** A special value to represent {@code null}. */
253   private static final Object NULL = new Object();
254 
255   /** A special value to represent failure, when {@link #setException} is called successfully. */
256   private static final class Failure {
257     static final Failure FALLBACK_INSTANCE =
258         new Failure(
259             new Throwable("Failure occurred while trying to finish a future.") {
260               @Override
261               public synchronized Throwable fillInStackTrace() {
262                 return this; // no stack trace
263               }
264             });
265     final Throwable exception;
266 
267     Failure(Throwable exception) {
268       this.exception = checkNotNull(exception);
269     }
270   }
271 
272   /** A special value to represent cancellation and the 'wasInterrupted' bit. */
273   private static final class Cancellation {
274     // constants to use when GENERATE_CANCELLATION_CAUSES = false
275     static final Cancellation CAUSELESS_INTERRUPTED;
276     static final Cancellation CAUSELESS_CANCELLED;
277 
278     static {
279       if (GENERATE_CANCELLATION_CAUSES) {
280         CAUSELESS_CANCELLED = null;
281         CAUSELESS_INTERRUPTED = null;
282       } else {
283         CAUSELESS_CANCELLED = new Cancellation(false, null);
284         CAUSELESS_INTERRUPTED = new Cancellation(true, null);
285       }
286     }
287 
288     final boolean wasInterrupted;
289     @Nullable final Throwable cause;
290 
291     Cancellation(boolean wasInterrupted, @Nullable Throwable cause) {
292       this.wasInterrupted = wasInterrupted;
293       this.cause = cause;
294     }
295   }
296 
297   /** A special value that encodes the 'setFuture' state. */
298   private static final class SetFuture<V> implements Runnable {
299     final AbstractFuture<V> owner;
300     final ListenableFuture<? extends V> future;
301 
302     SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) {
303       this.owner = owner;
304       this.future = future;
305     }
306 
307     @Override
308     public void run() {
309       if (owner.value != this) {
310         // nothing to do, we must have been cancelled, don't bother inspecting the future.
311         return;
312       }
313       Object valueToSet = getFutureValue(future);
314       if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) {
315         complete(owner);
316       }
317     }
318   }
319 
320   // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is
321   // available.
322   /**
323    * This field encodes the current state of the future.
324    *
325    * <p>The valid values are:
326    * <ul>
327    * <li>{@code null} initial state, nothing has happened.
328    * <li>{@link Cancellation} terminal state, {@code cancel} was called.
329    * <li>{@link Failure} terminal state, {@code setException} was called.
330    * <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
331    * <li>{@link #NULL} terminal state, {@code set(null)} was called.
332    * <li>Any other non-null value, terminal state, {@code set} was called with a non-null argument.
333    * </ul>
334    */
335   private volatile Object value;
336 
337   /** All listeners. */
338   private volatile Listener listeners;
339 
340   /** All waiting threads. */
341   private volatile Waiter waiters;
342 
343   /**
344    * Constructor for use by subclasses.
345    */
346   protected AbstractFuture() {}
347 
348   // Gets and Timed Gets
349   //
350   // * Be responsive to interruption
351   // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the
352   //   waiters field.
353   // * Future completion is defined by when #value becomes non-null/non SetFuture
354   // * Future completion can be observed if the waiters field contains a TOMBSTONE
355 
356   // Timed Get
357   // There are a few design constraints to consider
358   // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I
359   //   have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the
360   //   timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of
361   //   spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for
362   //   similar purposes.
363   // * We want to behave reasonably for timeouts of 0
364   // * We are more responsive to completion than timeouts. This is because parkNanos depends on
365   //   system scheduling and as such we could either miss our deadline, or unpark() could be delayed
366   //   so that it looks like we timed out even though we didn't. For comparison FutureTask respects
367   //   completion preferably and AQS is non-deterministic (depends on where in the queue the waiter
368   //   is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node
369   //   and we could use that to make a decision about whether or not we timed out prior to being
370   //   unparked.
371 
372   /**
373    * {@inheritDoc}
374    *
375    * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the
376    * current thread is interrupted during the call, even if the value is already available.
377    *
378    * @throws CancellationException {@inheritDoc}
379    */
380   @CanIgnoreReturnValue
381   @Override
382   public V get(long timeout, TimeUnit unit)
383       throws InterruptedException, TimeoutException, ExecutionException {
384     // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop
385     // at the bottom and throw a timeoutexception.
386     long remainingNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit.
387     if (Thread.interrupted()) {
388       throw new InterruptedException();
389     }
390     Object localValue = value;
391     if (localValue != null & !(localValue instanceof SetFuture)) {
392       return getDoneValue(localValue);
393     }
394     // we delay calling nanoTime until we know we will need to either park or spin
395     final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0;
396     long_wait_loop:
397     if (remainingNanos >= SPIN_THRESHOLD_NANOS) {
398       Waiter oldHead = waiters;
399       if (oldHead != Waiter.TOMBSTONE) {
400         Waiter node = new Waiter();
401         do {
402           node.setNext(oldHead);
403           if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
404             while (true) {
405               LockSupport.parkNanos(this, remainingNanos);
406               // Check interruption first, if we woke up due to interruption we need to honor that.
407               if (Thread.interrupted()) {
408                 removeWaiter(node);
409                 throw new InterruptedException();
410               }
411 
412               // Otherwise re-read and check doneness. If we loop then it must have been a spurious
413               // wakeup
414               localValue = value;
415               if (localValue != null & !(localValue instanceof SetFuture)) {
416                 return getDoneValue(localValue);
417               }
418 
419               // timed out?
420               remainingNanos = endNanos - System.nanoTime();
421               if (remainingNanos < SPIN_THRESHOLD_NANOS) {
422                 // Remove the waiter, one way or another we are done parking this thread.
423                 removeWaiter(node);
424                 break long_wait_loop; // jump down to the busy wait loop
425               }
426             }
427           }
428           oldHead = waiters; // re-read and loop.
429         } while (oldHead != Waiter.TOMBSTONE);
430       }
431       // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
432       // waiter.
433       return getDoneValue(value);
434     }
435     // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the
436     // waiters list
437     while (remainingNanos > 0) {
438       localValue = value;
439       if (localValue != null & !(localValue instanceof SetFuture)) {
440         return getDoneValue(localValue);
441       }
442       if (Thread.interrupted()) {
443         throw new InterruptedException();
444       }
445       remainingNanos = endNanos - System.nanoTime();
446     }
447 
448     String futureToString = toString();
449     // It's confusing to see a completed future in a timeout message; if isDone() returns false,
450     // then we know it must have given a pending toString value earlier. If not, then the future
451     // completed after the timeout expired, and the message might be success.
452     if (isDone()) {
453       throw new TimeoutException(
454           "Waited " + timeout + " " + Ascii.toLowerCase(unit.toString())
455               + " but future completed as timeout expired");
456     }
457     throw new TimeoutException(
458         "Waited " + timeout + " " + Ascii.toLowerCase(unit.toString()) + " for " + futureToString);
459   }
460 
461   /**
462    * {@inheritDoc}
463    *
464    * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the
465    * current thread is interrupted during the call, even if the value is already available.
466    *
467    * @throws CancellationException {@inheritDoc}
468    */
469   @CanIgnoreReturnValue
470   @Override
471   public V get() throws InterruptedException, ExecutionException {
472     if (Thread.interrupted()) {
473       throw new InterruptedException();
474     }
475     Object localValue = value;
476     if (localValue != null & !(localValue instanceof SetFuture)) {
477       return getDoneValue(localValue);
478     }
479     Waiter oldHead = waiters;
480     if (oldHead != Waiter.TOMBSTONE) {
481       Waiter node = new Waiter();
482       do {
483         node.setNext(oldHead);
484         if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
485           // we are on the stack, now wait for completion.
486           while (true) {
487             LockSupport.park(this);
488             // Check interruption first, if we woke up due to interruption we need to honor that.
489             if (Thread.interrupted()) {
490               removeWaiter(node);
491               throw new InterruptedException();
492             }
493             // Otherwise re-read and check doneness. If we loop then it must have been a spurious
494             // wakeup
495             localValue = value;
496             if (localValue != null & !(localValue instanceof SetFuture)) {
497               return getDoneValue(localValue);
498             }
499           }
500         }
501         oldHead = waiters; // re-read and loop.
502       } while (oldHead != Waiter.TOMBSTONE);
503     }
504     // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
505     // waiter.
506     return getDoneValue(value);
507   }
508 
509   /**
510    * Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}.
511    */
512   private V getDoneValue(Object obj) throws ExecutionException {
513     // While this seems like it might be too branch-y, simple benchmarking proves it to be
514     // unmeasurable (comparing done AbstractFutures with immediateFuture)
515     if (obj instanceof Cancellation) {
516       throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause);
517     } else if (obj instanceof Failure) {
518       throw new ExecutionException(((Failure) obj).exception);
519     } else if (obj == NULL) {
520       return null;
521     } else {
522       @SuppressWarnings("unchecked") // this is the only other option
523       V asV = (V) obj;
524       return asV;
525     }
526   }
527 
528   @Override
529   public boolean isDone() {
530     final Object localValue = value;
531     return localValue != null & !(localValue instanceof SetFuture);
532   }
533 
534   @Override
535   public boolean isCancelled() {
536     final Object localValue = value;
537     return localValue instanceof Cancellation;
538   }
539 
540   /**
541    * {@inheritDoc}
542    *
543    * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain
544    * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate
545    * {@code Future} that was supplied in the {@code setFuture} call.
546    *
547    * <p>Rather than override this method to perform additional cancellation work or cleanup,
548    * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link
549    * #wasInterrupted} as necessary. This ensures that the work is done even if the future is
550    * cancelled without a call to {@code cancel}, such as by calling {@code
551    * setFuture(cancelledFuture)}.
552    */
553   @CanIgnoreReturnValue
554   @Override
555   public boolean cancel(boolean mayInterruptIfRunning) {
556     Object localValue = value;
557     boolean rValue = false;
558     if (localValue == null | localValue instanceof SetFuture) {
559       // Try to delay allocating the exception. At this point we may still lose the CAS, but it is
560       // certainly less likely.
561       Object valueToSet =
562           GENERATE_CANCELLATION_CAUSES
563               ? new Cancellation(
564                   mayInterruptIfRunning, new CancellationException("Future.cancel() was called."))
565               : (mayInterruptIfRunning
566                   ? Cancellation.CAUSELESS_INTERRUPTED
567                   : Cancellation.CAUSELESS_CANCELLED);
568       AbstractFuture<?> abstractFuture = this;
569       while (true) {
570         if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
571           rValue = true;
572           // We call interuptTask before calling complete(), which is consistent with
573           // FutureTask
574           if (mayInterruptIfRunning) {
575             abstractFuture.interruptTask();
576           }
577           complete(abstractFuture);
578           if (localValue instanceof SetFuture) {
579             // propagate cancellation to the future set in setfuture, this is racy, and we don't
580             // care if we are successful or not.
581             ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future;
582             if (futureToPropagateTo instanceof TrustedFuture) {
583               // If the future is a TrustedFuture then we specifically avoid calling cancel()
584               // this has 2 benefits
585               // 1. for long chains of futures strung together with setFuture we consume less stack
586               // 2. we avoid allocating Cancellation objects at every level of the cancellation
587               //    chain
588               // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and
589               // does nothing but delegate to this method.
590               AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo;
591               localValue = trusted.value;
592               if (localValue == null | localValue instanceof SetFuture) {
593                 abstractFuture = trusted;
594                 continue;  // loop back up and try to complete the new future
595               }
596             } else {
597               // not a TrustedFuture, call cancel directly.
598               futureToPropagateTo.cancel(mayInterruptIfRunning);
599             }
600           }
601           break;
602         }
603         // obj changed, reread
604         localValue = abstractFuture.value;
605         if (!(localValue instanceof SetFuture)) {
606           // obj cannot be null at this point, because value can only change from null to non-null.
607           // So if value changed (and it did since we lost the CAS), then it cannot be null and
608           // since it isn't a SetFuture, then the future must be done and we should exit the loop
609           break;
610         }
611       }
612     }
613     return rValue;
614   }
615 
616   /**
617    * Subclasses can override this method to implement interruption of the future's computation. The
618    * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}.
619    *
620    * <p>The default implementation does nothing.
621    *
622    * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, checking
623    * {@link #wasInterrupted} to decide whether to interrupt your task.
624    *
625    * @since 10.0
626    */
627   protected void interruptTask() {}
628 
629   /**
630    * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code
631    * true}.
632    *
633    * @since 14.0
634    */
635   protected final boolean wasInterrupted() {
636     final Object localValue = value;
637     return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
638   }
639 
640   /**
641    * {@inheritDoc}
642    *
643    * @since 10.0
644    */
645   @Override
646   public void addListener(Runnable listener, Executor executor) {
647     checkNotNull(listener, "Runnable was null.");
648     checkNotNull(executor, "Executor was null.");
649     Listener oldHead = listeners;
650     if (oldHead != Listener.TOMBSTONE) {
651       Listener newNode = new Listener(listener, executor);
652       do {
653         newNode.next = oldHead;
654         if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
655           return;
656         }
657         oldHead = listeners; // re-read
658       } while (oldHead != Listener.TOMBSTONE);
659     }
660     // If we get here then the Listener TOMBSTONE was set, which means the future is done, call
661     // the listener.
662     executeListener(listener, executor);
663   }
664 
665   /**
666    * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or
667    * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns,
668    * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was
669    * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code
670    * Future} may have previously been set asynchronously, in which case its result may not be known
671    * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*}
672    * method, only by a call to {@link #cancel}.
673    *
674    * @param value the value to be used as the result
675    * @return true if the attempt was accepted, completing the {@code Future}
676    */
677   @CanIgnoreReturnValue
678   protected boolean set(@Nullable V value) {
679     Object valueToSet = value == null ? NULL : value;
680     if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
681       complete(this);
682       return true;
683     }
684     return false;
685   }
686 
687   /**
688    * Sets the failed result of this {@code Future} unless this {@code Future} has already been
689    * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this
690    * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b>
691    * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the
692    * {@code Future} may have previously been set asynchronously, in which case its result may not be
693    * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*}
694    * method, only by a call to {@link #cancel}.
695    *
696    * @param throwable the exception to be used as the failed result
697    * @return true if the attempt was accepted, completing the {@code Future}
698    */
699   @CanIgnoreReturnValue
700   protected boolean setException(Throwable throwable) {
701     Object valueToSet = new Failure(checkNotNull(throwable));
702     if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
703       complete(this);
704       return true;
705     }
706     return false;
707   }
708 
709   /**
710    * Sets the result of this {@code Future} to match the supplied input {@code Future} once the
711    * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set
712    * (including "set asynchronously," defined below).
713    *
714    * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call
715    * is accepted, then this future is guaranteed to have been completed with the supplied future by
716    * the time this method returns. If the supplied future is not done and the call is accepted, then
717    * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known,
718    * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}.
719    *
720    * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later
721    * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to
722    * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code
723    * Future}.
724    *
725    * <p>Note that, even if the supplied future is cancelled and it causes this future to complete,
726    * it will never trigger interruption behavior. In particular, it will not cause this future to
727    * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not
728    * return {@code true}.
729    *
730    * @param future the future to delegate to
731    * @return true if the attempt was accepted, indicating that the {@code Future} was not previously
732    *     cancelled or set.
733    * @since 19.0
734    */
735   @Beta
736   @CanIgnoreReturnValue
737   protected boolean setFuture(ListenableFuture<? extends V> future) {
738     checkNotNull(future);
739     Object localValue = value;
740     if (localValue == null) {
741       if (future.isDone()) {
742         Object value = getFutureValue(future);
743         if (ATOMIC_HELPER.casValue(this, null, value)) {
744           complete(this);
745           return true;
746         }
747         return false;
748       }
749       SetFuture valueToSet = new SetFuture<V>(this, future);
750       if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
751         // the listener is responsible for calling completeWithFuture, directExecutor is appropriate
752         // since all we are doing is unpacking a completed future which should be fast.
753         try {
754           future.addListener(valueToSet, directExecutor());
755         } catch (Throwable t) {
756           // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this
757           // must have been caused by addListener itself. The most likely explanation is a
758           // misconfigured mock. Try to switch to Failure.
759           Failure failure;
760           try {
761             failure = new Failure(t);
762           } catch (Throwable oomMostLikely) {
763             failure = Failure.FALLBACK_INSTANCE;
764           }
765           // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok.
766           boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure);
767         }
768         return true;
769       }
770       localValue = value; // we lost the cas, fall through and maybe cancel
771     }
772     // The future has already been set to something. If it is cancellation we should cancel the
773     // incoming future.
774     if (localValue instanceof Cancellation) {
775       // we don't care if it fails, this is best-effort.
776       future.cancel(((Cancellation) localValue).wasInterrupted);
777     }
778     return false;
779   }
780 
781   /**
782    * Returns a value that satisfies the contract of the {@link #value} field based on the state of
783    * given future.
784    *
785    * <p>This is approximately the inverse of {@link #getDoneValue(Object)}
786    */
787   private static Object getFutureValue(ListenableFuture<?> future) {
788     Object valueToSet;
789     if (future instanceof TrustedFuture) {
790       // Break encapsulation for TrustedFuture instances since we know that subclasses cannot
791       // override .get() (since it is final) and therefore this is equivalent to calling .get()
792       // and unpacking the exceptions like we do below (just much faster because it is a single
793       // field read instead of a read, several branches and possibly creating exceptions).
794       Object v = ((AbstractFuture<?>) future).value;
795       if (v instanceof Cancellation) {
796         // If the other future was interrupted, clear the interrupted bit while preserving the cause
797         // this will make it consistent with how non-trustedfutures work which cannot propagate the
798         // wasInterrupted bit
799         Cancellation c = (Cancellation) v;
800         if (c.wasInterrupted) {
801           v =
802               c.cause != null
803                   ? new Cancellation(/* wasInterrupted= */ false, c.cause)
804                   : Cancellation.CAUSELESS_CANCELLED;
805         }
806       }
807       return v;
808     } else {
809       // Otherwise calculate valueToSet by calling .get()
810       try {
811         Object v = getDone(future);
812         valueToSet = v == null ? NULL : v;
813       } catch (ExecutionException exception) {
814         valueToSet = new Failure(exception.getCause());
815       } catch (CancellationException cancellation) {
816         valueToSet = new Cancellation(false, cancellation);
817       } catch (Throwable t) {
818         valueToSet = new Failure(t);
819       }
820     }
821     return valueToSet;
822   }
823 
824   /** Unblocks all threads and runs all listeners. */
825   private static void complete(AbstractFuture<?> future) {
826     Listener next = null;
827     outer: while (true) {
828       future.releaseWaiters();
829       // We call this before the listeners in order to avoid needing to manage a separate stack data
830       // structure for them.
831       // afterDone() should be generally fast and only used for cleanup work... but in theory can
832       // also be recursive and create StackOverflowErrors
833       future.afterDone();
834       // push the current set of listeners onto next
835       next = future.clearListeners(next);
836       future = null;
837       while (next != null) {
838         Listener curr = next;
839         next = next.next;
840         Runnable task = curr.task;
841         if (task instanceof SetFuture) {
842           SetFuture<?> setFuture = (SetFuture<?>) task;
843           // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long
844           // chains of SetFutures
845           // Handling this special case is important because there is no way to pass an executor to
846           // setFuture, so a user couldn't break the chain by doing this themselves.  It is also
847           // potentially common if someone writes a recursive Futures.transformAsync transformer.
848           future = setFuture.owner;
849           if (future.value == setFuture) {
850             Object valueToSet = getFutureValue(setFuture.future);
851             if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
852               continue outer;
853             }
854           }
855           // other wise the future we were trying to set is already done.
856         } else {
857           executeListener(task, curr.executor);
858         }
859       }
860       break;
861     }
862   }
863 
864   /**
865    * Callback method that is called exactly once after the future is completed.
866    *
867    * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it.
868    *
869    * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is
870    * intended for very lightweight cleanup work, for example, timing statistics or clearing fields.
871    * If your task does anything heavier consider, just using a listener with an executor.
872    *
873    * @since 20.0
874    */
875   @Beta
876   @ForOverride
877   protected void afterDone() {}
878 
879   /**
880    * Returns the exception that this {@code Future} completed with. This includes completion through
881    * a call to {@link #setException} or {@link #setFuture setFuture}{@code (failedFuture)} but not
882    * cancellation.
883    *
884    * @throws RuntimeException if the {@code Future} has not failed
885    */
886   final Throwable trustedGetException() {
887     return ((Failure) value).exception;
888   }
889 
890   /**
891    * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts)
892    * the given future (if available).
893    */
894   final void maybePropagateCancellationTo(@Nullable Future<?> related) {
895     if (related != null & isCancelled()) {
896       related.cancel(wasInterrupted());
897     }
898   }
899 
900   /** Releases all threads in the {@link #waiters} list, and clears the list. */
901   private void releaseWaiters() {
902     Waiter head;
903     do {
904       head = waiters;
905     } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE));
906     for (Waiter currentWaiter = head;
907         currentWaiter != null;
908         currentWaiter = currentWaiter.next) {
909       currentWaiter.unpark();
910     }
911   }
912 
913   /**
914    * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently
915    * added first.
916    */
917   private Listener clearListeners(Listener onto) {
918     // We need to
919     // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to
920     //    to synchronize with us
921     // 2. reverse the linked list, because despite our rather clear contract, people depend on us
922     //    executing listeners in the order they were added
923     // 3. push all the items onto 'onto' and return the new head of the stack
924     Listener head;
925     do {
926       head = listeners;
927     } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE));
928     Listener reversedList = onto;
929     while (head != null) {
930       Listener tmp = head;
931       head = head.next;
932       tmp.next = reversedList;
933       reversedList = tmp;
934     }
935     return reversedList;
936   }
937 
938   // TODO(user) move this up into FluentFuture, or parts as a default method on ListenableFuture?
939   @Override
940   public String toString() {
941     StringBuilder builder = new StringBuilder().append(super.toString()).append("[status=");
942     if (isCancelled()) {
943       builder.append("CANCELLED");
944     } else if (isDone()) {
945       addDoneString(builder);
946     } else {
947       String pendingDescription;
948       try {
949         pendingDescription = pendingToString();
950       } catch (RuntimeException e) {
951         // Don't call getMessage or toString() on the exception, in case the exception thrown by the
952         // subclass is implemented with bugs similar to the subclass.
953         pendingDescription = "Exception thrown from implementation: " + e.getClass();
954       }
955       // The future may complete during or before the call to getPendingToString, so we use null
956       // as a signal that we should try checking if the future is done again.
957       if (!isNullOrEmpty(pendingDescription)) {
958         builder.append("PENDING, info=[").append(pendingDescription).append("]");
959       } else if (isDone()) {
960         addDoneString(builder);
961       } else {
962         builder.append("PENDING");
963       }
964     }
965     return builder.append("]").toString();
966   }
967 
968   /**
969    * Provide a human-readable explanation of why this future has not yet completed.
970    *
971    * @return null if an explanation cannot be provided because the future is done.
972    * @since 23.0
973    */
974   @Nullable
975   protected String pendingToString() {
976     Object localValue = value;
977     if (localValue instanceof SetFuture) {
978       return "setFuture=[" + ((SetFuture) localValue).future + "]";
979     } else if (this instanceof ScheduledFuture) {
980       return "remaining delay=["
981           + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS)
982           + " ms]";
983     }
984     return null;
985   }
986 
987   private void addDoneString(StringBuilder builder) {
988     try {
989       V value = getDone(this);
990       builder.append("SUCCESS, result=[").append(value).append("]");
991     } catch (ExecutionException e) {
992       builder.append("FAILURE, cause=[").append(e.getCause()).append("]");
993     } catch (CancellationException e) {
994       builder.append("CANCELLED"); // shouldn't be reachable
995     } catch (RuntimeException e) {
996       builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]");
997     }
998   }
999 
1000   /**
1001    * Submits the given runnable to the given {@link Executor} catching and logging all
1002    * {@linkplain RuntimeException runtime exceptions} thrown by the executor.
1003    */
1004   private static void executeListener(Runnable runnable, Executor executor) {
1005     try {
1006       executor.execute(runnable);
1007     } catch (RuntimeException e) {
1008       // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if
1009       // we're given a bad one. We only catch RuntimeException because we want Errors to propagate
1010       // up.
1011       log.log(
1012           Level.SEVERE,
1013           "RuntimeException while executing runnable " + runnable + " with executor " + executor,
1014           e);
1015     }
1016   }
1017 
1018   private abstract static class AtomicHelper {
1019     /** Non volatile write of the thread to the {@link Waiter#thread} field. */
1020     abstract void putThread(Waiter waiter, Thread newValue);
1021 
1022     /** Non volatile write of the waiter to the {@link Waiter#next} field. */
1023     abstract void putNext(Waiter waiter, Waiter newValue);
1024 
1025     /** Performs a CAS operation on the {@link #waiters} field. */
1026     abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update);
1027 
1028     /** Performs a CAS operation on the {@link #listeners} field. */
1029     abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update);
1030 
1031     /** Performs a CAS operation on the {@link #value} field. */
1032     abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update);
1033   }
1034 
1035   /**
1036    * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
1037    *
1038    * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot
1039    * be accessed.
1040    */
1041   private static final class UnsafeAtomicHelper extends AtomicHelper {
1042     static final sun.misc.Unsafe UNSAFE;
1043     static final long LISTENERS_OFFSET;
1044     static final long WAITERS_OFFSET;
1045     static final long VALUE_OFFSET;
1046     static final long WAITER_THREAD_OFFSET;
1047     static final long WAITER_NEXT_OFFSET;
1048 
1049     static {
1050       sun.misc.Unsafe unsafe = null;
1051       try {
1052         unsafe = sun.misc.Unsafe.getUnsafe();
1053       } catch (SecurityException tryReflectionInstead) {
1054         try {
1055           unsafe =
1056               AccessController.doPrivileged(
1057                   new PrivilegedExceptionAction<sun.misc.Unsafe>() {
1058                     @Override
1059                     public sun.misc.Unsafe run() throws Exception {
1060                       Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
1061                       for (java.lang.reflect.Field f : k.getDeclaredFields()) {
1062                         f.setAccessible(true);
1063                         Object x = f.get(null);
1064                         if (k.isInstance(x)) {
1065                           return k.cast(x);
1066                         }
1067                       }
1068                       throw new NoSuchFieldError("the Unsafe");
1069                     }
1070                   });
1071         } catch (PrivilegedActionException e) {
1072           throw new RuntimeException("Could not initialize intrinsics", e.getCause());
1073         }
1074       }
1075       try {
1076         Class<?> abstractFuture = AbstractFuture.class;
1077         WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters"));
1078         LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners"));
1079         VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value"));
1080         WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread"));
1081         WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next"));
1082         UNSAFE = unsafe;
1083       } catch (Exception e) {
1084         throwIfUnchecked(e);
1085         throw new RuntimeException(e);
1086       }
1087     }
1088 
1089     @Override
1090     void putThread(Waiter waiter, Thread newValue) {
1091       UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue);
1092     }
1093 
1094     @Override
1095     void putNext(Waiter waiter, Waiter newValue) {
1096       UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue);
1097     }
1098 
1099     /** Performs a CAS operation on the {@link #waiters} field. */
1100     @Override
1101     boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
1102       return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update);
1103     }
1104 
1105     /** Performs a CAS operation on the {@link #listeners} field. */
1106     @Override
1107     boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1108       return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update);
1109     }
1110 
1111     /** Performs a CAS operation on the {@link #value} field. */
1112     @Override
1113     boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1114       return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update);
1115     }
1116   }
1117 
1118   /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */
1119   private static final class SafeAtomicHelper extends AtomicHelper {
1120     final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater;
1121     final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater;
1122     final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater;
1123     final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater;
1124     final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;
1125 
1126     SafeAtomicHelper(
1127         AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater,
1128         AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater,
1129         AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater,
1130         AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater,
1131         AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {
1132       this.waiterThreadUpdater = waiterThreadUpdater;
1133       this.waiterNextUpdater = waiterNextUpdater;
1134       this.waitersUpdater = waitersUpdater;
1135       this.listenersUpdater = listenersUpdater;
1136       this.valueUpdater = valueUpdater;
1137     }
1138 
1139     @Override
1140     void putThread(Waiter waiter, Thread newValue) {
1141       waiterThreadUpdater.lazySet(waiter, newValue);
1142     }
1143 
1144     @Override
1145     void putNext(Waiter waiter, Waiter newValue) {
1146       waiterNextUpdater.lazySet(waiter, newValue);
1147     }
1148 
1149     @Override
1150     boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
1151       return waitersUpdater.compareAndSet(future, expect, update);
1152     }
1153 
1154     @Override
1155     boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1156       return listenersUpdater.compareAndSet(future, expect, update);
1157     }
1158 
1159     @Override
1160     boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1161       return valueUpdater.compareAndSet(future, expect, update);
1162     }
1163   }
1164 
1165   /**
1166    * {@link AtomicHelper} based on {@code synchronized} and volatile writes.
1167    *
1168    * <p>This is an implementation of last resort for when certain basic VM features are broken (like
1169    * AtomicReferenceFieldUpdater).
1170    */
1171   private static final class SynchronizedHelper extends AtomicHelper {
1172     @Override
1173     void putThread(Waiter waiter, Thread newValue) {
1174       waiter.thread = newValue;
1175     }
1176 
1177     @Override
1178     void putNext(Waiter waiter, Waiter newValue) {
1179       waiter.next = newValue;
1180     }
1181 
1182     @Override
1183     boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
1184       synchronized (future) {
1185         if (future.waiters == expect) {
1186           future.waiters = update;
1187           return true;
1188         }
1189         return false;
1190       }
1191     }
1192 
1193     @Override
1194     boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1195       synchronized (future) {
1196         if (future.listeners == expect) {
1197           future.listeners = update;
1198           return true;
1199         }
1200         return false;
1201       }
1202     }
1203 
1204     @Override
1205     boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1206       synchronized (future) {
1207         if (future.value == expect) {
1208           future.value = update;
1209           return true;
1210         }
1211         return false;
1212       }
1213     }
1214   }
1215 
1216   private static CancellationException cancellationExceptionWithCause(
1217       @Nullable String message, @Nullable Throwable cause) {
1218     CancellationException exception = new CancellationException(message);
1219     exception.initCause(cause);
1220     return exception;
1221   }
1222 }