View Javadoc
1   /*
2    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3    *
4    * This code is free software; you can redistribute it and/or modify it
5    * under the terms of the GNU General Public License version 2 only, as
6    * published by the Free Software Foundation.  Oracle designates this
7    * particular file as subject to the "Classpath" exception as provided
8    * by Oracle in the LICENSE file that accompanied this code.
9    *
10   * This code is distributed in the hope that it will be useful, but WITHOUT
11   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12   * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13   * version 2 for more details (a copy is included in the LICENSE file that
14   * accompanied this code).
15   *
16   * You should have received a copy of the GNU General Public License version
17   * 2 along with this work; if not, write to the Free Software Foundation,
18   * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19   *
20   * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21   * or visit www.oracle.com if you need additional information or have any
22   * questions.
23   */
24  
25  /*
26   * This file is available under and governed by the GNU General Public
27   * License version 2 only, as published by the Free Software Foundation.
28   * However, the following notice accompanied the original version of this
29   * file:
30   *
31   * Written by Doug Lea with assistance from members of JCP JSR-166
32   * Expert Group and released to the public domain, as explained at
33   * http://creativecommons.org/publicdomain/zero/1.0/
34   */
35  
36  package java.util.concurrent;
37  import java.util.*;
38  
39  /**
40   * Provides default implementations of {@link ExecutorService}
41   * execution methods. This class implements the {@code submit},
42   * {@code invokeAny} and {@code invokeAll} methods using a
43   * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
44   * to the {@link FutureTask} class provided in this package.  For example,
45   * the implementation of {@code submit(Runnable)} creates an
46   * associated {@code RunnableFuture} that is executed and
47   * returned. Subclasses may override the {@code newTaskFor} methods
48   * to return {@code RunnableFuture} implementations other than
49   * {@code FutureTask}.
50   *
51   * <p><b>Extension example</b>. Here is a sketch of a class
52   * that customizes {@link ThreadPoolExecutor} to use
53   * a {@code CustomTask} class instead of the default {@code FutureTask}:
54   *  <pre> {@code
55   * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
56   *
57   *   static class CustomTask<V> implements RunnableFuture<V> {...}
58   *
59   *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
60   *       return new CustomTask<V>(c);
61   *   }
62   *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
63   *       return new CustomTask<V>(r, v);
64   *   }
65   *   // ... add constructors, etc.
66   * }}</pre>
67   *
68   * @since 1.5
69   * @author Doug Lea
70   */
71  public abstract class AbstractExecutorService implements ExecutorService {
72  
73      /**
74       * Returns a {@code RunnableFuture} for the given runnable and default
75       * value.
76       *
77       * @param runnable the runnable task being wrapped
78       * @param value the default value for the returned future
79       * @param <T> the type of the given value
80       * @return a {@code RunnableFuture} which, when run, will run the
81       * underlying runnable and which, as a {@code Future}, will yield
82       * the given value as its result and provide for cancellation of
83       * the underlying task
84       * @since 1.6
85       */
86      protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
87          return new FutureTask<T>(runnable, value);
88      }
89  
90      /**
91       * Returns a {@code RunnableFuture} for the given callable task.
92       *
93       * @param callable the callable task being wrapped
94       * @param <T> the type of the callable's result
95       * @return a {@code RunnableFuture} which, when run, will call the
96       * underlying callable and which, as a {@code Future}, will yield
97       * the callable's result as its result and provide for
98       * cancellation of the underlying task
99       * @since 1.6
100      */
101     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
102         return new FutureTask<T>(callable);
103     }
104 
105     /**
106      * @throws RejectedExecutionException {@inheritDoc}
107      * @throws NullPointerException       {@inheritDoc}
108      */
109     public Future<?> submit(Runnable task) {
110         if (task == null) throw new NullPointerException();
111         RunnableFuture<Void> ftask = newTaskFor(task, null);
112         execute(ftask);
113         return ftask;
114     }
115 
116     /**
117      * @throws RejectedExecutionException {@inheritDoc}
118      * @throws NullPointerException       {@inheritDoc}
119      */
120     public <T> Future<T> submit(Runnable task, T result) {
121         if (task == null) throw new NullPointerException();
122         RunnableFuture<T> ftask = newTaskFor(task, result);
123         execute(ftask);
124         return ftask;
125     }
126 
127     /**
128      * @throws RejectedExecutionException {@inheritDoc}
129      * @throws NullPointerException       {@inheritDoc}
130      */
131     public <T> Future<T> submit(Callable<T> task) {
132         if (task == null) throw new NullPointerException();
133         RunnableFuture<T> ftask = newTaskFor(task);
134         execute(ftask);
135         return ftask;
136     }
137 
138     /**
139      * the main mechanics of invokeAny.
140      */
141     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
142                               boolean timed, long nanos)
143         throws InterruptedException, ExecutionException, TimeoutException {
144         if (tasks == null)
145             throw new NullPointerException();
146         int ntasks = tasks.size();
147         if (ntasks == 0)
148             throw new IllegalArgumentException();
149         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
150         ExecutorCompletionService<T> ecs =
151             new ExecutorCompletionService<T>(this);
152 
153         // For efficiency, especially in executors with limited
154         // parallelism, check to see if previously submitted tasks are
155         // done before submitting more of them. This interleaving
156         // plus the exception mechanics account for messiness of main
157         // loop.
158 
159         try {
160             // Record exceptions so that if we fail to obtain any
161             // result, we can throw the last exception we got.
162             ExecutionException ee = null;
163             final long deadline = timed ? System.nanoTime() + nanos : 0L;
164             Iterator<? extends Callable<T>> it = tasks.iterator();
165 
166             // Start one task for sure; the rest incrementally
167             futures.add(ecs.submit(it.next()));
168             --ntasks;
169             int active = 1;
170 
171             for (;;) {
172                 Future<T> f = ecs.poll();
173                 if (f == null) {
174                     if (ntasks > 0) {
175                         --ntasks;
176                         futures.add(ecs.submit(it.next()));
177                         ++active;
178                     }
179                     else if (active == 0)
180                         break;
181                     else if (timed) {
182                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
183                         if (f == null)
184                             throw new TimeoutException();
185                         nanos = deadline - System.nanoTime();
186                     }
187                     else
188                         f = ecs.take();
189                 }
190                 if (f != null) {
191                     --active;
192                     try {
193                         return f.get();
194                     } catch (ExecutionException eex) {
195                         ee = eex;
196                     } catch (RuntimeException rex) {
197                         ee = new ExecutionException(rex);
198                     }
199                 }
200             }
201 
202             if (ee == null)
203                 ee = new ExecutionException();
204             throw ee;
205 
206         } finally {
207             for (int i = 0, size = futures.size(); i < size; i++)
208                 futures.get(i).cancel(true);
209         }
210     }
211 
212     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
213         throws InterruptedException, ExecutionException {
214         try {
215             return doInvokeAny(tasks, false, 0);
216         } catch (TimeoutException cannotHappen) {
217             assert false;
218             return null;
219         }
220     }
221 
222     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
223                            long timeout, TimeUnit unit)
224         throws InterruptedException, ExecutionException, TimeoutException {
225         return doInvokeAny(tasks, true, unit.toNanos(timeout));
226     }
227 
228     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
229         throws InterruptedException {
230         if (tasks == null)
231             throw new NullPointerException();
232         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
233         boolean done = false;
234         try {
235             for (Callable<T> t : tasks) {
236                 RunnableFuture<T> f = newTaskFor(t);
237                 futures.add(f);
238                 execute(f);
239             }
240             for (int i = 0, size = futures.size(); i < size; i++) {
241                 Future<T> f = futures.get(i);
242                 if (!f.isDone()) {
243                     try {
244                         f.get();
245                     } catch (CancellationException ignore) {
246                     } catch (ExecutionException ignore) {
247                     }
248                 }
249             }
250             done = true;
251             return futures;
252         } finally {
253             if (!done)
254                 for (int i = 0, size = futures.size(); i < size; i++)
255                     futures.get(i).cancel(true);
256         }
257     }
258 
259     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
260                                          long timeout, TimeUnit unit)
261         throws InterruptedException {
262         if (tasks == null)
263             throw new NullPointerException();
264         long nanos = unit.toNanos(timeout);
265         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
266         boolean done = false;
267         try {
268             for (Callable<T> t : tasks)
269                 futures.add(newTaskFor(t));
270 
271             final long deadline = System.nanoTime() + nanos;
272             final int size = futures.size();
273 
274             // Interleave time checks and calls to execute in case
275             // executor doesn't have any/much parallelism.
276             for (int i = 0; i < size; i++) {
277                 execute((Runnable)futures.get(i));
278                 nanos = deadline - System.nanoTime();
279                 if (nanos <= 0L)
280                     return futures;
281             }
282 
283             for (int i = 0; i < size; i++) {
284                 Future<T> f = futures.get(i);
285                 if (!f.isDone()) {
286                     if (nanos <= 0L)
287                         return futures;
288                     try {
289                         f.get(nanos, TimeUnit.NANOSECONDS);
290                     } catch (CancellationException ignore) {
291                     } catch (ExecutionException ignore) {
292                     } catch (TimeoutException toe) {
293                         return futures;
294                     }
295                     nanos = deadline - System.nanoTime();
296                 }
297             }
298             done = true;
299             return futures;
300         } finally {
301             if (!done)
302                 for (int i = 0, size = futures.size(); i < size; i++)
303                     futures.get(i).cancel(true);
304         }
305     }
306 
307 }