View Javadoc
1   /*
2    * Copyright (C) 2006 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.util.concurrent;
16  
17  import com.google.common.annotations.Beta;
18  import com.google.common.annotations.GwtCompatible;
19  import com.google.common.annotations.GwtIncompatible;
20  import com.google.common.base.Function;
21  import com.google.errorprone.annotations.DoNotMock;
22  import java.util.concurrent.ExecutionException;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ScheduledExecutorService;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.TimeoutException;
27  
28  /**
29   * A {@link ListenableFuture} that supports fluent chains of operations. For example:
30   *
31   * <pre>{@code
32   * ListenableFuture<Boolean> adminIsLoggedIn =
33   *     FluentFuture.from(usersDatabase.getAdminUser())
34   *         .transform(User::getId, directExecutor())
35   *         .transform(ActivityService::isLoggedIn, threadPool)
36   *         .catching(RpcException.class, e -> false, directExecutor());
37   * }</pre>
38   *
39   * <h3>Alternatives</h3>
40   *
41   * <h4>Frameworks</h4>
42   *
43   * <p>When chaining together a graph of asynchronous operations, you will often find it easier to
44   * use a framework. Frameworks automate the process, often adding features like monitoring,
45   * debugging, and cancellation. Examples of frameworks include:
46   *
47   * <ul>
48   *   <li><a href="http://google.github.io/dagger/producers.html">Dagger Producers</a>
49   * </ul>
50   *
51   * <h4>{@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage}
52   * </h4>
53   *
54   * <p>Users of {@code CompletableFuture} will likely want to continue using {@code
55   * CompletableFuture}. {@code FluentFuture} is targeted at people who use {@code ListenableFuture},
56   * who can't use Java 8, or who want an API more focused than {@code CompletableFuture}. (If you
57   * need to adapt between {@code CompletableFuture} and {@code ListenableFuture}, consider <a
58   * href="https://github.com/lukas-krecan/future-converter">Future Converter</a>.)
59   *
60   * <h3>Extension</h3>
61   *
62   * If you want a class like {@code FluentFuture} but with extra methods, we recommend declaring your
63   * own subclass of {@link ListenableFuture}, complete with a method like {@link #from} to adapt an
64   * existing {@code ListenableFuture}, implemented atop a {@link ForwardingListenableFuture} that
65   * forwards to that future and adds the desired methods.
66   *
67   * @since 23.0
68   */
69  @Beta
70  @DoNotMock("Use FluentFuture.from(Futures.immediate*Future) or SettableFuture")
71  @GwtCompatible(emulated = true)
72  public abstract class FluentFuture<V> extends GwtFluentFutureCatchingSpecialization<V> {
73    FluentFuture() {}
74  
75    /**
76     * Converts the given {@code ListenableFuture} to an equivalent {@code FluentFuture}.
77     *
78     * <p>If the given {@code ListenableFuture} is already a {@code FluentFuture}, it is returned
79     * directly. If not, it is wrapped in a {@code FluentFuture} that delegates all calls to the
80     * original {@code ListenableFuture}.
81     */
82    public static <V> FluentFuture<V> from(ListenableFuture<V> future) {
83      return future instanceof FluentFuture
84          ? (FluentFuture<V>) future
85          : new ForwardingFluentFuture<V>(future);
86    }
87  
88    /**
89     * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
90     * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
91     * fallback}. {@link Function#apply} is not invoked until the primary input has failed, so if the
92     * primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, an
93     * exception is thrown, this exception is used as the result of the output {@code Future}.
94     *
95     * <p>Usage example:
96     *
97     * <pre>{@code
98     * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
99     * // counters.
100    * ListenableFuture<Integer> faultTolerantFuture =
101    *     fetchCounters().catching(FetchException.class, x -> 0, directExecutor());
102    * }</pre>
103    *
104    * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
105    * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
106    * listeners are also applicable to heavyweight functions passed to this method.
107    *
108    * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
109    * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
110    * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
111    * #transform}.
112    *
113    * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
114    *     type is matched against the input's exception. "The input's exception" means the cause of
115    *     the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a
116    *     different kind of exception, that exception itself. To avoid hiding bugs and other
117    *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
118    *     Throwable.class} in particular.
119    * @param fallback the {@link Function} to be called if the input fails with the expected
120    *     exception type. The function's argument is the input's exception. "The input's exception"
121    *     means the cause of the {@link ExecutionException} thrown by {@code this.get()} or, if
122    *     {@code get()} throws a different kind of exception, that exception itself.
123    * @param executor the executor that runs {@code fallback} if the input fails
124    */
125   @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
126   public final <X extends Throwable> FluentFuture<V> catching(
127       Class<X> exceptionType, Function<? super X, ? extends V> fallback, Executor executor) {
128     return (FluentFuture<V>) Futures.catching(this, exceptionType, fallback, executor);
129   }
130 
131   /**
132    * Returns a {@code Future} whose result is taken from this {@code Future} or, if the this {@code
133    * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
134    * fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has failed, so if
135    * the primary input succeeds, it is never invoked. If, during the invocation of {@code fallback},
136    * an exception is thrown, this exception is used as the result of the output {@code Future}.
137    *
138    * <p>Usage examples:
139    *
140    * <pre>{@code
141    * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
142    * // counters.
143    * ListenableFuture<Integer> faultTolerantFuture =
144    *     fetchCounters().catchingAsync(
145    *         FetchException.class, x -> immediateFuture(0), directExecutor());
146    * }</pre>
147    *
148    * <p>The fallback can also choose to propagate the original exception when desired:
149    *
150    * <pre>{@code
151    * // Falling back to a zero counter only in case the exception was a
152    * // TimeoutException.
153    * ListenableFuture<Integer> faultTolerantFuture =
154    *     fetchCounters().catchingAsync(
155    *         fetchCounterFuture,
156    *         FetchException.class,
157    *         e -> {
158    *           if (omitDataOnFetchFailure) {
159    *             return immediateFuture(0);
160    *           }
161    *           throw e;
162    *         },
163    *         directExecutor());
164    * }</pre>
165    *
166    * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
167    * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
168    * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
169    * {@code directExecutor} functions should avoid heavyweight operations inside {@code
170    * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
171    * completing the returned {@code Future}.)
172    *
173    * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
174    * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
175    * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
176    * #transform}.
177    *
178    * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
179    *     type is matched against the input's exception. "The input's exception" means the cause of
180    *     the {@link ExecutionException} thrown by {@code this.get()} or, if {@code get()} throws a
181    *     different kind of exception, that exception itself. To avoid hiding bugs and other
182    *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
183    *     Throwable.class} in particular.
184    * @param fallback the {@link AsyncFunction} to be called if the input fails with the expected
185    *     exception type. The function's argument is the input's exception. "The input's exception"
186    *     means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if
187    *     {@code get()} throws a different kind of exception, that exception itself.
188    * @param executor the executor that runs {@code fallback} if the input fails
189    */
190   @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
191   public final <X extends Throwable> FluentFuture<V> catchingAsync(
192       Class<X> exceptionType, AsyncFunction<? super X, ? extends V> fallback, Executor executor) {
193     return (FluentFuture<V>) Futures.catchingAsync(this, exceptionType, fallback, executor);
194   }
195 
196   /**
197    * Returns a future that delegates to this future but will finish early (via a {@link
198    * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires.
199    * If the timeout expires, not only will the output future finish, but also the input future
200    * ({@code this}) will be cancelled and interrupted.
201    *
202    * @param timeout when to time out the future
203    * @param unit the time unit of the time parameter
204    * @param scheduledExecutor The executor service to enforce the timeout.
205    */
206   @GwtIncompatible // ScheduledExecutorService
207   public final FluentFuture<V> withTimeout(
208       long timeout, TimeUnit unit, ScheduledExecutorService scheduledExecutor) {
209     return (FluentFuture<V>) Futures.withTimeout(this, timeout, unit, scheduledExecutor);
210   }
211 
212   /**
213    * Returns a new {@code Future} whose result is asynchronously derived from the result of this
214    * {@code Future}. If the input {@code Future} fails, the returned {@code Future} fails with the
215    * same exception (and the function is not invoked).
216    *
217    * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced
218    * by applying the given {@code AsyncFunction} to the result of the original {@code Future}.
219    * Example usage:
220    *
221    * <pre>{@code
222    * FluentFuture<RowKey> rowKeyFuture = FluentFuture.from(indexService.lookUp(query));
223    * ListenableFuture<QueryResult> queryFuture =
224    *     rowKeyFuture.transformAsync(dataService::readFuture, executor);
225    * }</pre>
226    *
227    * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
228    * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
229    * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
230    * {@code directExecutor} functions should avoid heavyweight operations inside {@code
231    * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
232    * completing the returned {@code Future}.)
233    *
234    * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
235    * input future and that of the future returned by the chain function. That is, if the returned
236    * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the
237    * other two is cancelled, the returned {@code Future} will receive a callback in which it will
238    * attempt to cancel itself.
239    *
240    * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenCompose} and
241    * {@link java.util.concurrent.CompletableFuture#thenComposeAsync}. It can also serve some of the
242    * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
243    * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
244    *
245    * @param function A function to transform the result of this future to the result of the output
246    *     future
247    * @param executor Executor to run the function in.
248    * @return A future that holds result of the function (if the input succeeded) or the original
249    *     input's failure (if not)
250    */
251   public final <T> FluentFuture<T> transformAsync(
252       AsyncFunction<? super V, T> function, Executor executor) {
253     return (FluentFuture<T>) Futures.transformAsync(this, function, executor);
254   }
255 
256   /**
257    * Returns a new {@code Future} whose result is derived from the result of this {@code Future}. If
258    * this input {@code Future} fails, the returned {@code Future} fails with the same exception (and
259    * the function is not invoked). Example usage:
260    *
261    * <pre>{@code
262    * ListenableFuture<List<Row>> rowsFuture =
263    *     queryFuture.transform(QueryResult::getRows, executor);
264    * }</pre>
265    *
266    * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
267    * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
268    * listeners are also applicable to heavyweight functions passed to this method.
269    *
270    * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
271    * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel
272    * the input, and if the input is cancelled, the returned {@code Future} will receive a callback
273    * in which it will attempt to cancel itself.
274    *
275    * <p>An example use of this method is to convert a serializable object returned from an RPC into
276    * a POJO.
277    *
278    * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenApply} and
279    * {@link java.util.concurrent.CompletableFuture#thenApplyAsync}. It can also serve some of the
280    * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
281    * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
282    *
283    * @param function A Function to transform the results of this future to the results of the
284    *     returned future.
285    * @param executor Executor to run the function in.
286    * @return A future that holds result of the transformation.
287    */
288   public final <T> FluentFuture<T> transform(Function<? super V, T> function, Executor executor) {
289     return (FluentFuture<T>) Futures.transform(this, function, executor);
290   }
291 
292   /**
293    * Registers separate success and failure callbacks to be run when this {@code Future}'s
294    * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the
295    * computation is already complete, immediately.
296    *
297    * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of
298    * callbacks, but any callback added through this method is guaranteed to be called once the
299    * computation is complete.
300    *
301    * <p>Example:
302    *
303    * <pre>{@code
304    * future.addCallback(
305    *     new FutureCallback<QueryResult>() {
306    *       public void onSuccess(QueryResult result) {
307    *         storeInCache(result);
308    *       }
309    *       public void onFailure(Throwable t) {
310    *         reportError(t);
311    *       }
312    *     }, executor);
313    * }</pre>
314    *
315    * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
316    * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
317    * listeners are also applicable to heavyweight callbacks passed to this method.
318    *
319    * <p>For a more general interface to attach a completion listener, see {@link #addListener}.
320    *
321    * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#whenComplete} and
322    * {@link java.util.concurrent.CompletableFuture#whenCompleteAsync}. It also serves the use case
323    * of {@link java.util.concurrent.CompletableFuture#thenAccept} and {@link
324    * java.util.concurrent.CompletableFuture#thenAcceptAsync}.
325    *
326    * @param callback The callback to invoke when this {@code Future} is completed.
327    * @param executor The executor to run {@code callback} when the future completes.
328    */
329   public final void addCallback(FutureCallback<? super V> callback, Executor executor) {
330     Futures.addCallback(this, callback, executor);
331   }
332 }