View Javadoc
1   /*
2    * Copyright (C) 2011 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.util.concurrent;
16  
17  import static com.google.common.base.Preconditions.checkArgument;
18  import static com.google.common.base.Preconditions.checkNotNull;
19  import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
20  
21  import com.google.common.annotations.Beta;
22  import com.google.common.annotations.GwtIncompatible;
23  import com.google.common.base.Supplier;
24  import com.google.errorprone.annotations.CanIgnoreReturnValue;
25  import com.google.j2objc.annotations.WeakOuter;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.Future;
30  import java.util.concurrent.ScheduledExecutorService;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.TimeoutException;
34  import java.util.concurrent.locks.ReentrantLock;
35  import java.util.logging.Level;
36  import java.util.logging.Logger;
37  import javax.annotation.concurrent.GuardedBy;
38  
39  /**
40   * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in
41   * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp},
42   * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically.
43   *
44   * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run
45   * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the
46   * {@link #runOneIteration} that will be executed periodically as specified by its {@link
47   * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic
48   * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method.
49   *
50   * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link
51   * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link
52   * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start
53   * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify
54   * shared state without additional synchronization necessary for visibility to later executions of
55   * the life cycle methods.
56   *
57   * <h3>Usage Example</h3>
58   *
59   * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to
60   * rate limit itself. <pre> {@code
61   * class CrawlingService extends AbstractScheduledService {
62   *   private Set<Uri> visited;
63   *   private Queue<Uri> toCrawl;
64   *   protected void startUp() throws Exception {
65   *     toCrawl = readStartingUris();
66   *   }
67   *
68   *   protected void runOneIteration() throws Exception {
69   *     Uri uri = toCrawl.remove();
70   *     Collection<Uri> newUris = crawl(uri);
71   *     visited.add(uri);
72   *     for (Uri newUri : newUris) {
73   *       if (!visited.contains(newUri)) { toCrawl.add(newUri); }
74   *     }
75   *   }
76   *
77   *   protected void shutDown() throws Exception {
78   *     saveUris(toCrawl);
79   *   }
80   *
81   *   protected Scheduler scheduler() {
82   *     return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS);
83   *   }
84   * }}</pre>
85   *
86   * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of
87   * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to
88   * rate limit the number of queries we perform.
89   *
90   * @author Luke Sandberg
91   * @since 11.0
92   */
93  @Beta
94  @GwtIncompatible
95  public abstract class AbstractScheduledService implements Service {
96    private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName());
97  
98    /**
99     * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its
100    * task.
101    *
102    * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory
103    * methods, these provide {@link Scheduler} instances for the common use case of running the
104    * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link
105    * CustomScheduler}.
106    *
107    * @author Luke Sandberg
108    * @since 11.0
109    */
110   public abstract static class Scheduler {
111     /**
112      * Returns a {@link Scheduler} that schedules the task using the {@link
113      * ScheduledExecutorService#scheduleWithFixedDelay} method.
114      *
115      * @param initialDelay the time to delay first execution
116      * @param delay the delay between the termination of one execution and the commencement of the
117      *     next
118      * @param unit the time unit of the initialDelay and delay parameters
119      */
120     public static Scheduler newFixedDelaySchedule(
121         final long initialDelay, final long delay, final TimeUnit unit) {
122       checkNotNull(unit);
123       checkArgument(delay > 0, "delay must be > 0, found %s", delay);
124       return new Scheduler() {
125         @Override
126         public Future<?> schedule(
127             AbstractService service, ScheduledExecutorService executor, Runnable task) {
128           return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit);
129         }
130       };
131     }
132 
133     /**
134      * Returns a {@link Scheduler} that schedules the task using the {@link
135      * ScheduledExecutorService#scheduleAtFixedRate} method.
136      *
137      * @param initialDelay the time to delay first execution
138      * @param period the period between successive executions of the task
139      * @param unit the time unit of the initialDelay and period parameters
140      */
141     public static Scheduler newFixedRateSchedule(
142         final long initialDelay, final long period, final TimeUnit unit) {
143       checkNotNull(unit);
144       checkArgument(period > 0, "period must be > 0, found %s", period);
145       return new Scheduler() {
146         @Override
147         public Future<?> schedule(
148             AbstractService service, ScheduledExecutorService executor, Runnable task) {
149           return executor.scheduleAtFixedRate(task, initialDelay, period, unit);
150         }
151       };
152     }
153 
154     /** Schedules the task to run on the provided executor on behalf of the service. */
155     abstract Future<?> schedule(
156         AbstractService service, ScheduledExecutorService executor, Runnable runnable);
157 
158     private Scheduler() {}
159   }
160 
161   /* use AbstractService for state management */
162   private final AbstractService delegate = new ServiceDelegate();
163 
164   @WeakOuter
165   private final class ServiceDelegate extends AbstractService {
166 
167     // A handle to the running task so that we can stop it when a shutdown has been requested.
168     // These two fields are volatile because their values will be accessed from multiple threads.
169     private volatile Future<?> runningTask;
170     private volatile ScheduledExecutorService executorService;
171 
172     // This lock protects the task so we can ensure that none of the template methods (startUp,
173     // shutDown or runOneIteration) run concurrently with one another.
174     // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the
175     // lock.
176     private final ReentrantLock lock = new ReentrantLock();
177 
178     @WeakOuter
179     class Task implements Runnable {
180       @Override
181       public void run() {
182         lock.lock();
183         try {
184           if (runningTask.isCancelled()) {
185             // task may have been cancelled while blocked on the lock.
186             return;
187           }
188           AbstractScheduledService.this.runOneIteration();
189         } catch (Throwable t) {
190           try {
191             shutDown();
192           } catch (Exception ignored) {
193             logger.log(
194                 Level.WARNING,
195                 "Error while attempting to shut down the service after failure.",
196                 ignored);
197           }
198           notifyFailed(t);
199           runningTask.cancel(false); // prevent future invocations.
200         } finally {
201           lock.unlock();
202         }
203       }
204     }
205 
206     private final Runnable task = new Task();
207 
208     @Override
209     protected final void doStart() {
210       executorService =
211           MoreExecutors.renamingDecorator(
212               executor(),
213               new Supplier<String>() {
214                 @Override
215                 public String get() {
216                   return serviceName() + " " + state();
217                 }
218               });
219       executorService.execute(
220           new Runnable() {
221             @Override
222             public void run() {
223               lock.lock();
224               try {
225                 startUp();
226                 runningTask = scheduler().schedule(delegate, executorService, task);
227                 notifyStarted();
228               } catch (Throwable t) {
229                 notifyFailed(t);
230                 if (runningTask != null) {
231                   // prevent the task from running if possible
232                   runningTask.cancel(false);
233                 }
234               } finally {
235                 lock.unlock();
236               }
237             }
238           });
239     }
240 
241     @Override
242     protected final void doStop() {
243       runningTask.cancel(false);
244       executorService.execute(
245           new Runnable() {
246             @Override
247             public void run() {
248               try {
249                 lock.lock();
250                 try {
251                   if (state() != State.STOPPING) {
252                     // This means that the state has changed since we were scheduled. This implies
253                     // that an execution of runOneIteration has thrown an exception and we have
254                     // transitioned to a failed state, also this means that shutDown has already
255                     // been called, so we do not want to call it again.
256                     return;
257                   }
258                   shutDown();
259                 } finally {
260                   lock.unlock();
261                 }
262                 notifyStopped();
263               } catch (Throwable t) {
264                 notifyFailed(t);
265               }
266             }
267           });
268     }
269 
270     @Override
271     public String toString() {
272       return AbstractScheduledService.this.toString();
273     }
274   }
275 
276   /** Constructor for use by subclasses. */
277   protected AbstractScheduledService() {}
278 
279   /**
280    * Run one iteration of the scheduled task. If any invocation of this method throws an exception,
281    * the service will transition to the {@link Service.State#FAILED} state and this method will no
282    * longer be called.
283    */
284   protected abstract void runOneIteration() throws Exception;
285 
286   /**
287    * Start the service.
288    *
289    * <p>By default this method does nothing.
290    */
291   protected void startUp() throws Exception {}
292 
293   /**
294    * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}.
295    *
296    * <p>By default this method does nothing.
297    */
298   protected void shutDown() throws Exception {}
299 
300   /**
301    * Returns the {@link Scheduler} object used to configure this service. This method will only be
302    * called once.
303    */
304   // TODO(cpovirk): @ForOverride
305   protected abstract Scheduler scheduler();
306 
307   /**
308    * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
309    * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the
310    * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service
311    * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED
312    * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService}
313    * instance. This method is guaranteed to only be called once.
314    *
315    * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread
316    * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. Also,
317    * the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service
318    * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED
319    * fails}.
320    */
321   protected ScheduledExecutorService executor() {
322     @WeakOuter
323     class ThreadFactoryImpl implements ThreadFactory {
324       @Override
325       public Thread newThread(Runnable runnable) {
326         return MoreExecutors.newThread(serviceName(), runnable);
327       }
328     }
329     final ScheduledExecutorService executor =
330         Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl());
331     // Add a listener to shutdown the executor after the service is stopped. This ensures that the
332     // JVM shutdown will not be prevented from exiting after this service has stopped or failed.
333     // Technically this listener is added after start() was called so it is a little gross, but it
334     // is called within doStart() so we know that the service cannot terminate or fail concurrently
335     // with adding this listener so it is impossible to miss an event that we are interested in.
336     addListener(
337         new Listener() {
338           @Override
339           public void terminated(State from) {
340             executor.shutdown();
341           }
342 
343           @Override
344           public void failed(State from, Throwable failure) {
345             executor.shutdown();
346           }
347         },
348         directExecutor());
349     return executor;
350   }
351 
352   /**
353    * Returns the name of this service. {@link AbstractScheduledService} may include the name in
354    * debugging output.
355    *
356    * @since 14.0
357    */
358   protected String serviceName() {
359     return getClass().getSimpleName();
360   }
361 
362   @Override
363   public String toString() {
364     return serviceName() + " [" + state() + "]";
365   }
366 
367   @Override
368   public final boolean isRunning() {
369     return delegate.isRunning();
370   }
371 
372   @Override
373   public final State state() {
374     return delegate.state();
375   }
376 
377   /**
378    * @since 13.0
379    */
380   @Override
381   public final void addListener(Listener listener, Executor executor) {
382     delegate.addListener(listener, executor);
383   }
384 
385   /**
386    * @since 14.0
387    */
388   @Override
389   public final Throwable failureCause() {
390     return delegate.failureCause();
391   }
392 
393   /**
394    * @since 15.0
395    */
396   @CanIgnoreReturnValue
397   @Override
398   public final Service startAsync() {
399     delegate.startAsync();
400     return this;
401   }
402 
403   /**
404    * @since 15.0
405    */
406   @CanIgnoreReturnValue
407   @Override
408   public final Service stopAsync() {
409     delegate.stopAsync();
410     return this;
411   }
412 
413   /**
414    * @since 15.0
415    */
416   @Override
417   public final void awaitRunning() {
418     delegate.awaitRunning();
419   }
420 
421   /**
422    * @since 15.0
423    */
424   @Override
425   public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
426     delegate.awaitRunning(timeout, unit);
427   }
428 
429   /**
430    * @since 15.0
431    */
432   @Override
433   public final void awaitTerminated() {
434     delegate.awaitTerminated();
435   }
436 
437   /**
438    * @since 15.0
439    */
440   @Override
441   public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
442     delegate.awaitTerminated(timeout, unit);
443   }
444 
445   /**
446    * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to
447    * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been
448    * cancelled, the {@link #getNextSchedule} method will be called.
449    *
450    * @author Luke Sandberg
451    * @since 11.0
452    */
453   @Beta
454   public abstract static class CustomScheduler extends Scheduler {
455 
456     /**
457      * A callable class that can reschedule itself using a {@link CustomScheduler}.
458      */
459     private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> {
460 
461       /** The underlying task. */
462       private final Runnable wrappedRunnable;
463 
464       /** The executor on which this Callable will be scheduled. */
465       private final ScheduledExecutorService executor;
466 
467       /**
468        * The service that is managing this callable. This is used so that failure can be reported
469        * properly.
470        */
471       private final AbstractService service;
472 
473       /**
474        * This lock is used to ensure safe and correct cancellation, it ensures that a new task is
475        * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to
476        * ensure that it is assigned atomically with being scheduled.
477        */
478       private final ReentrantLock lock = new ReentrantLock();
479 
480       /** The future that represents the next execution of this task. */
481       @GuardedBy("lock")
482       private Future<Void> currentFuture;
483 
484       ReschedulableCallable(
485           AbstractService service, ScheduledExecutorService executor, Runnable runnable) {
486         this.wrappedRunnable = runnable;
487         this.executor = executor;
488         this.service = service;
489       }
490 
491       @Override
492       public Void call() throws Exception {
493         wrappedRunnable.run();
494         reschedule();
495         return null;
496       }
497 
498       /**
499        * Atomically reschedules this task and assigns the new future to {@link #currentFuture}.
500        */
501       public void reschedule() {
502         // invoke the callback outside the lock, prevents some shenanigans.
503         Schedule schedule;
504         try {
505           schedule = CustomScheduler.this.getNextSchedule();
506         } catch (Throwable t) {
507           service.notifyFailed(t);
508           return;
509         }
510         // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that
511         // cancel calls cancel on the correct future. 2. we want to make sure that the assignment
512         // to currentFuture doesn't race with itself so that currentFuture is assigned in the
513         // correct order.
514         Throwable scheduleFailure = null;
515         lock.lock();
516         try {
517           if (currentFuture == null || !currentFuture.isCancelled()) {
518             currentFuture = executor.schedule(this, schedule.delay, schedule.unit);
519           }
520         } catch (Throwable e) {
521           // If an exception is thrown by the subclass then we need to make sure that the service
522           // notices and transitions to the FAILED state. We do it by calling notifyFailed directly
523           // because the service does not monitor the state of the future so if the exception is not
524           // caught and forwarded to the service the task would stop executing but the service would
525           // have no idea.
526           // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then
527           // the AbstractService could monitor the future directly. Rescheduling is still hard...
528           // but it would help with some of these lock ordering issues.
529           scheduleFailure = e;
530         } finally {
531           lock.unlock();
532         }
533         // Call notifyFailed outside the lock to avoid lock ordering issues.
534         if (scheduleFailure != null) {
535           service.notifyFailed(scheduleFailure);
536         }
537       }
538 
539       // N.B. Only protect cancel and isCancelled because those are the only methods that are
540       // invoked by the AbstractScheduledService.
541       @Override
542       public boolean cancel(boolean mayInterruptIfRunning) {
543         // Ensure that a task cannot be rescheduled while a cancel is ongoing.
544         lock.lock();
545         try {
546           return currentFuture.cancel(mayInterruptIfRunning);
547         } finally {
548           lock.unlock();
549         }
550       }
551 
552       @Override
553       public boolean isCancelled() {
554         lock.lock();
555         try {
556           return currentFuture.isCancelled();
557         } finally {
558           lock.unlock();
559         }
560       }
561 
562       @Override
563       protected Future<Void> delegate() {
564         throw new UnsupportedOperationException(
565             "Only cancel and isCancelled is supported by this future");
566       }
567     }
568 
569     @Override
570     final Future<?> schedule(
571         AbstractService service, ScheduledExecutorService executor, Runnable runnable) {
572       ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable);
573       task.reschedule();
574       return task;
575     }
576 
577     /**
578      * A value object that represents an absolute delay until a task should be invoked.
579      *
580      * @author Luke Sandberg
581      * @since 11.0
582      */
583     @Beta
584     protected static final class Schedule {
585 
586       private final long delay;
587       private final TimeUnit unit;
588 
589       /**
590        * @param delay the time from now to delay execution
591        * @param unit the time unit of the delay parameter
592        */
593       public Schedule(long delay, TimeUnit unit) {
594         this.delay = delay;
595         this.unit = checkNotNull(unit);
596       }
597     }
598 
599     /**
600      * Calculates the time at which to next invoke the task.
601      *
602      * <p>This is guaranteed to be called immediately after the task has completed an iteration and
603      * on the same thread as the previous execution of {@link
604      * AbstractScheduledService#runOneIteration}.
605      *
606      * @return a schedule that defines the delay before the next execution.
607      */
608     // TODO(cpovirk): @ForOverride
609     protected abstract Schedule getNextSchedule() throws Exception;
610   }
611 }