View Javadoc
1   /*
2    * Copyright (C) 2006 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.util.concurrent;
16  
17  import static com.google.common.base.Preconditions.checkNotNull;
18  import static com.google.common.base.Preconditions.checkState;
19  import static com.google.common.util.concurrent.Futures.getDone;
20  import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
21  
22  import com.google.common.annotations.GwtCompatible;
23  import com.google.common.collect.ImmutableCollection;
24  import java.util.Set;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.Future;
27  import java.util.logging.Level;
28  import java.util.logging.Logger;
29  import javax.annotation.Nullable;
30  
31  /**
32   * A future made up of a collection of sub-futures.
33   *
34   * @param <InputT> the type of the individual inputs
35   * @param <OutputT> the type of the output (i.e. this) future
36   */
37  @GwtCompatible
38  abstract class AggregateFuture<InputT, OutputT> extends AbstractFuture.TrustedFuture<OutputT> {
39    private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName());
40  
41    /*
42     * In certain circumstances, this field might theoretically not be visible to an afterDone() call
43     * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture.
44     */
45    private RunningState runningState;
46  
47    @Override
48    protected final void afterDone() {
49      super.afterDone();
50      RunningState localRunningState = runningState;
51      if (localRunningState != null) {
52        // Let go of the memory held by the running state
53        this.runningState = null;
54        ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures =
55            localRunningState.futures;
56        boolean wasInterrupted = wasInterrupted();
57  
58        if (wasInterrupted) {
59          localRunningState.interruptTask();
60        }
61  
62        if (isCancelled() & futures != null) {
63          for (ListenableFuture<?> future : futures) {
64            future.cancel(wasInterrupted);
65          }
66        }
67      }
68    }
69  
70    @Override
71    protected String pendingToString() {
72      RunningState localRunningState = runningState;
73      if (localRunningState == null) {
74        return null;
75      }
76      ImmutableCollection<? extends ListenableFuture<? extends InputT>> localFutures =
77          localRunningState.futures;
78      if (localFutures != null) {
79        return "futures=[" + localFutures + "]";
80      }
81      return null;
82    }
83  
84    /**
85     * Must be called at the end of each sub-class's constructor.
86     */
87    final void init(RunningState runningState) {
88      this.runningState = runningState;
89      runningState.init();
90    }
91  
92    abstract class RunningState extends AggregateFutureState implements Runnable {
93      private ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures;
94      private final boolean allMustSucceed;
95      private final boolean collectsValues;
96  
97      RunningState(
98          ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures,
99          boolean allMustSucceed,
100         boolean collectsValues) {
101       super(futures.size());
102       this.futures = checkNotNull(futures);
103       this.allMustSucceed = allMustSucceed;
104       this.collectsValues = collectsValues;
105     }
106 
107     /* Used in the !allMustSucceed case so we don't have to instantiate a listener. */
108     @Override
109     public final void run() {
110       decrementCountAndMaybeComplete();
111     }
112 
113     /**
114      * The "real" initialization; we can't put this in the constructor because, in the case where
115      * futures are already complete, we would not initialize the subclass before calling {@link
116      * #handleOneInputDone}. As this is called after the subclass is constructed, we're guaranteed
117      * to have properly initialized the subclass.
118      */
119     private void init() {
120       // Corner case: List is empty.
121       if (futures.isEmpty()) {
122         handleAllCompleted();
123         return;
124       }
125 
126       // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll
127       // need to handle RejectedExecutionException
128 
129       if (allMustSucceed) {
130         // We need fail fast, so we have to keep track of which future failed so we can propagate
131         // the exception immediately
132 
133         // Register a listener on each Future in the list to update the state of this future.
134         // Note that if all the futures on the list are done prior to completing this loop, the last
135         // call to addListener() will callback to setOneValue(), transitively call our cleanup
136         // listener, and set this.futures to null.
137         // This is not actually a problem, since the foreach only needs this.futures to be non-null
138         // at the beginning of the loop.
139         int i = 0;
140         for (final ListenableFuture<? extends InputT> listenable : futures) {
141           final int index = i++;
142           listenable.addListener(
143               new Runnable() {
144                 @Override
145                 public void run() {
146                   try {
147                     handleOneInputDone(index, listenable);
148                   } finally {
149                     decrementCountAndMaybeComplete();
150                   }
151                 }
152               },
153               directExecutor());
154         }
155       } else {
156         // We'll only call the callback when all futures complete, regardless of whether some failed
157         // Hold off on calling setOneValue until all complete, so we can share the same listener
158         for (ListenableFuture<? extends InputT> listenable : futures) {
159           listenable.addListener(this, directExecutor());
160         }
161       }
162     }
163 
164     /**
165      * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the
166      * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the
167      * throwable did not cause this future to fail, and it is the first time we've seen that
168      * particular Throwable.
169      */
170     private void handleException(Throwable throwable) {
171       checkNotNull(throwable);
172 
173       boolean completedWithFailure = false;
174       boolean firstTimeSeeingThisException = true;
175       if (allMustSucceed) {
176         // As soon as the first one fails, throw the exception up.
177         // The result of all other inputs is then ignored.
178         completedWithFailure = setException(throwable);
179         if (completedWithFailure) {
180           releaseResourcesAfterFailure();
181         } else {
182           // Go up the causal chain to see if we've already seen this cause; if we have, even if
183           // it's wrapped by a different exception, don't log it.
184           firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable);
185         }
186       }
187 
188       // | and & used because it's faster than the branch required for || and &&
189       if (throwable instanceof Error
190           | (allMustSucceed & !completedWithFailure & firstTimeSeeingThisException)) {
191         String message =
192             (throwable instanceof Error)
193                 ? "Input Future failed with Error"
194                 : "Got more than one input Future failure. Logging failures after the first";
195         logger.log(Level.SEVERE, message, throwable);
196       }
197     }
198 
199     @Override
200     final void addInitialException(Set<Throwable> seen) {
201       if (!isCancelled()) {
202         // TODO(cpovirk): Think about whether we could/should use Verify to check this.
203         boolean unused = addCausalChain(seen, trustedGetException());
204       }
205     }
206 
207     /**
208      * Handles the input at the given index completing.
209      */
210     private void handleOneInputDone(int index, Future<? extends InputT> future) {
211       // The only cases in which this Future should already be done are (a) if it was cancelled or
212       // (b) if an input failed and we propagated that immediately because of allMustSucceed.
213       checkState(
214           allMustSucceed || !isDone() || isCancelled(),
215           "Future was done before all dependencies completed");
216 
217       try {
218         checkState(future.isDone(), "Tried to set value from future which is not done");
219         if (allMustSucceed) {
220           if (future.isCancelled()) {
221             // clear running state prior to cancelling children, this sets our own state but lets
222             // the input futures keep running as some of them may be used elsewhere.
223             runningState = null;
224             cancel(false);
225           } else {
226             // We always get the result so that we can have fail-fast, even if we don't collect
227             InputT result = getDone(future);
228             if (collectsValues) {
229               collectOneValue(allMustSucceed, index, result);
230             }
231           }
232         } else if (collectsValues && !future.isCancelled()) {
233           collectOneValue(allMustSucceed, index, getDone(future));
234         }
235       } catch (ExecutionException e) {
236         handleException(e.getCause());
237       } catch (Throwable t) {
238         handleException(t);
239       }
240     }
241 
242     private void decrementCountAndMaybeComplete() {
243       int newRemaining = decrementRemainingAndGet();
244       checkState(newRemaining >= 0, "Less than 0 remaining futures");
245       if (newRemaining == 0) {
246         processCompleted();
247       }
248     }
249 
250     private void processCompleted() {
251       // Collect the values if (a) our output requires collecting them and (b) we haven't been
252       // collecting them as we go. (We've collected them as we go only if we needed to fail fast)
253       if (collectsValues & !allMustSucceed) {
254         int i = 0;
255         for (ListenableFuture<? extends InputT> listenable : futures) {
256           handleOneInputDone(i++, listenable);
257         }
258       }
259       handleAllCompleted();
260     }
261 
262     /**
263      * Listeners implicitly keep a reference to {@link RunningState} as they're inner classes, so we
264      * free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we
265      * immediately release resources we no longer need); additionally, the future will release its
266      * reference to {@link RunningState}, which should free all associated memory when all the
267      * futures complete and the listeners are released.
268      *
269      * TODO(user): Write tests for memory retention
270      */
271     void releaseResourcesAfterFailure() {
272       this.futures = null;
273     }
274 
275     /**
276      * Called only if {@code collectsValues} is true.
277      *
278      * <p>If {@code allMustSucceed} is true, called as each future completes; otherwise, called for
279      * each future when all futures complete.
280      */
281     abstract void collectOneValue(boolean allMustSucceed, int index, @Nullable InputT returnValue);
282 
283     abstract void handleAllCompleted();
284 
285     void interruptTask() {}
286   }
287 
288   /** Adds the chain to the seen set, and returns whether all the chain was new to us. */
289   private static boolean addCausalChain(Set<Throwable> seen, Throwable t) {
290     for (; t != null; t = t.getCause()) {
291       boolean firstTimeSeen = seen.add(t);
292       if (!firstTimeSeen) {
293         /*
294          * We've seen this, so we've seen its causes, too. No need to re-add them. (There's one case
295          * where this isn't true, but we ignore it: If we record an exception, then someone calls
296          * initCause() on it, and then we examine it again, we'll conclude that we've seen the whole
297          * chain before when it fact we haven't. But this should be rare.)
298          */
299         return false;
300       }
301     }
302     return true;
303   }
304 }