View Javadoc
1   /*
2    * Copyright (C) 2009 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.util.concurrent;
16  
17  import static com.google.common.base.Preconditions.checkArgument;
18  import static com.google.common.base.Preconditions.checkNotNull;
19  import static com.google.common.base.Preconditions.checkState;
20  import static com.google.common.util.concurrent.Service.State.FAILED;
21  import static com.google.common.util.concurrent.Service.State.NEW;
22  import static com.google.common.util.concurrent.Service.State.RUNNING;
23  import static com.google.common.util.concurrent.Service.State.STARTING;
24  import static com.google.common.util.concurrent.Service.State.STOPPING;
25  import static com.google.common.util.concurrent.Service.State.TERMINATED;
26  
27  import com.google.common.annotations.Beta;
28  import com.google.common.annotations.GwtIncompatible;
29  import com.google.common.util.concurrent.Monitor.Guard;
30  import com.google.common.util.concurrent.Service.State; // javadoc needs this
31  import com.google.errorprone.annotations.CanIgnoreReturnValue;
32  import com.google.errorprone.annotations.ForOverride;
33  import com.google.j2objc.annotations.WeakOuter;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.TimeoutException;
37  import javax.annotation.Nullable;
38  import javax.annotation.concurrent.GuardedBy;
39  import javax.annotation.concurrent.Immutable;
40  
41  /**
42   * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
43   * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
44   * callbacks. Its subclasses must manage threads manually; consider {@link
45   * AbstractExecutionThreadService} if you need only a single execution thread.
46   *
47   * @author Jesse Wilson
48   * @author Luke Sandberg
49   * @since 1.0
50   */
51  @Beta
52  @GwtIncompatible
53  public abstract class AbstractService implements Service {
54    private static final ListenerCallQueue.Event<Listener> STARTING_EVENT =
55        new ListenerCallQueue.Event<Listener>() {
56          @Override
57          public void call(Listener listener) {
58            listener.starting();
59          }
60  
61          @Override
62          public String toString() {
63            return "starting()";
64          }
65        };
66    private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT =
67        new ListenerCallQueue.Event<Listener>() {
68          @Override
69          public void call(Listener listener) {
70            listener.running();
71          }
72  
73          @Override
74          public String toString() {
75            return "running()";
76          }
77        };
78    private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT =
79        stoppingEvent(STARTING);
80    private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT =
81        stoppingEvent(RUNNING);
82  
83    private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
84        terminatedEvent(NEW);
85    private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
86        terminatedEvent(RUNNING);
87    private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
88        terminatedEvent(STOPPING);
89  
90    private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) {
91      return new ListenerCallQueue.Event<Listener>() {
92        @Override
93        public void call(Listener listener) {
94          listener.terminated(from);
95        }
96  
97        @Override
98        public String toString() {
99          return "terminated({from = " + from + "})";
100       }
101     };
102   }
103 
104   private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) {
105     return new ListenerCallQueue.Event<Listener>() {
106       @Override
107       public void call(Listener listener) {
108         listener.stopping(from);
109       }
110 
111       @Override
112       public String toString() {
113         return "stopping({from = " + from + "})";
114       }
115     };
116   }
117 
118   private final Monitor monitor = new Monitor();
119 
120   private final Guard isStartable = new IsStartableGuard();
121 
122   @WeakOuter
123   private final class IsStartableGuard extends Guard {
124     IsStartableGuard() {
125       super(AbstractService.this.monitor);
126     }
127 
128     @Override
129     public boolean isSatisfied() {
130       return state() == NEW;
131     }
132   }
133 
134   private final Guard isStoppable = new IsStoppableGuard();
135 
136   @WeakOuter
137   private final class IsStoppableGuard extends Guard {
138     IsStoppableGuard() {
139       super(AbstractService.this.monitor);
140     }
141 
142     @Override
143     public boolean isSatisfied() {
144       return state().compareTo(RUNNING) <= 0;
145     }
146   }
147 
148   private final Guard hasReachedRunning = new HasReachedRunningGuard();
149 
150   @WeakOuter
151   private final class HasReachedRunningGuard extends Guard {
152     HasReachedRunningGuard() {
153       super(AbstractService.this.monitor);
154     }
155 
156     @Override
157     public boolean isSatisfied() {
158       return state().compareTo(RUNNING) >= 0;
159     }
160   }
161 
162   private final Guard isStopped = new IsStoppedGuard();
163 
164   @WeakOuter
165   private final class IsStoppedGuard extends Guard {
166     IsStoppedGuard() {
167       super(AbstractService.this.monitor);
168     }
169 
170     @Override
171     public boolean isSatisfied() {
172       return state().isTerminal();
173     }
174   }
175 
176   /** The listeners to notify during a state transition. */
177   private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>();
178 
179   /**
180    * The current state of the service. This should be written with the lock held but can be read
181    * without it because it is an immutable object in a volatile field. This is desirable so that
182    * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
183    * without grabbing the lock.
184    *
185    * <p>To update this field correctly the lock must be held to guarantee that the state is
186    * consistent.
187    */
188   private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
189 
190   /** Constructor for use by subclasses. */
191   protected AbstractService() {}
192 
193   /**
194    * This method is called by {@link #startAsync} to initiate service startup. The invocation of
195    * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
196    * or after it has returned. If startup fails, the invocation should cause a call to {@link
197    * #notifyFailed(Throwable)} instead.
198    *
199    * <p>This method should return promptly; prefer to do work on a different thread where it is
200    * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
201    * called multiple times.
202    */
203   @ForOverride
204   protected abstract void doStart();
205 
206   /**
207    * This method should be used to initiate service shutdown. The invocation of this method should
208    * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
209    * returned. If shutdown fails, the invocation should cause a call to {@link
210    * #notifyFailed(Throwable)} instead.
211    *
212    * <p>This method should return promptly; prefer to do work on a different thread where it is
213    * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
214    * called multiple times.
215    */
216   @ForOverride
217   protected abstract void doStop();
218 
219   @CanIgnoreReturnValue
220   @Override
221   public final Service startAsync() {
222     if (monitor.enterIf(isStartable)) {
223       try {
224         snapshot = new StateSnapshot(STARTING);
225         enqueueStartingEvent();
226         doStart();
227       } catch (Throwable startupFailure) {
228         notifyFailed(startupFailure);
229       } finally {
230         monitor.leave();
231         dispatchListenerEvents();
232       }
233     } else {
234       throw new IllegalStateException("Service " + this + " has already been started");
235     }
236     return this;
237   }
238 
239   @CanIgnoreReturnValue
240   @Override
241   public final Service stopAsync() {
242     if (monitor.enterIf(isStoppable)) {
243       try {
244         State previous = state();
245         switch (previous) {
246           case NEW:
247             snapshot = new StateSnapshot(TERMINATED);
248             enqueueTerminatedEvent(NEW);
249             break;
250           case STARTING:
251             snapshot = new StateSnapshot(STARTING, true, null);
252             enqueueStoppingEvent(STARTING);
253             break;
254           case RUNNING:
255             snapshot = new StateSnapshot(STOPPING);
256             enqueueStoppingEvent(RUNNING);
257             doStop();
258             break;
259           case STOPPING:
260           case TERMINATED:
261           case FAILED:
262             // These cases are impossible due to the if statement above.
263             throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
264           default:
265             throw new AssertionError("Unexpected state: " + previous);
266         }
267       } catch (Throwable shutdownFailure) {
268         notifyFailed(shutdownFailure);
269       } finally {
270         monitor.leave();
271         dispatchListenerEvents();
272       }
273     }
274     return this;
275   }
276 
277   @Override
278   public final void awaitRunning() {
279     monitor.enterWhenUninterruptibly(hasReachedRunning);
280     try {
281       checkCurrentState(RUNNING);
282     } finally {
283       monitor.leave();
284     }
285   }
286 
287   @Override
288   public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
289     if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
290       try {
291         checkCurrentState(RUNNING);
292       } finally {
293         monitor.leave();
294       }
295     } else {
296       // It is possible due to races the we are currently in the expected state even though we
297       // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
298       // even check the guard. I don't think we care too much about this use case but it could lead
299       // to a confusing error message.
300       throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
301     }
302   }
303 
304   @Override
305   public final void awaitTerminated() {
306     monitor.enterWhenUninterruptibly(isStopped);
307     try {
308       checkCurrentState(TERMINATED);
309     } finally {
310       monitor.leave();
311     }
312   }
313 
314   @Override
315   public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
316     if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
317       try {
318         checkCurrentState(TERMINATED);
319       } finally {
320         monitor.leave();
321       }
322     } else {
323       // It is possible due to races the we are currently in the expected state even though we
324       // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
325       // even check the guard. I don't think we care too much about this use case but it could lead
326       // to a confusing error message.
327       throw new TimeoutException(
328           "Timed out waiting for "
329               + this
330               + " to reach a terminal state. "
331               + "Current state: "
332               + state());
333     }
334   }
335 
336   /** Checks that the current state is equal to the expected state. */
337   @GuardedBy("monitor")
338   private void checkCurrentState(State expected) {
339     State actual = state();
340     if (actual != expected) {
341       if (actual == FAILED) {
342         // Handle this specially so that we can include the failureCause, if there is one.
343         throw new IllegalStateException(
344             "Expected the service " + this + " to be " + expected + ", but the service has FAILED",
345             failureCause());
346       }
347       throw new IllegalStateException(
348           "Expected the service " + this + " to be " + expected + ", but was " + actual);
349     }
350   }
351 
352   /**
353    * Implementing classes should invoke this method once their service has started. It will cause
354    * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
355    *
356    * @throws IllegalStateException if the service is not {@link State#STARTING}.
357    */
358   protected final void notifyStarted() {
359     monitor.enter();
360     try {
361       // We have to examine the internal state of the snapshot here to properly handle the stop
362       // while starting case.
363       if (snapshot.state != STARTING) {
364         IllegalStateException failure =
365             new IllegalStateException(
366                 "Cannot notifyStarted() when the service is " + snapshot.state);
367         notifyFailed(failure);
368         throw failure;
369       }
370 
371       if (snapshot.shutdownWhenStartupFinishes) {
372         snapshot = new StateSnapshot(STOPPING);
373         // We don't call listeners here because we already did that when we set the
374         // shutdownWhenStartupFinishes flag.
375         doStop();
376       } else {
377         snapshot = new StateSnapshot(RUNNING);
378         enqueueRunningEvent();
379       }
380     } finally {
381       monitor.leave();
382       dispatchListenerEvents();
383     }
384   }
385 
386   /**
387    * Implementing classes should invoke this method once their service has stopped. It will cause
388    * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
389    *
390    * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor {@link
391    *     State#RUNNING}.
392    */
393   protected final void notifyStopped() {
394     monitor.enter();
395     try {
396       // We check the internal state of the snapshot instead of state() directly so we don't allow
397       // notifyStopped() to be called while STARTING, even if stop() has already been called.
398       State previous = snapshot.state;
399       if (previous != STOPPING && previous != RUNNING) {
400         IllegalStateException failure =
401             new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
402         notifyFailed(failure);
403         throw failure;
404       }
405       snapshot = new StateSnapshot(TERMINATED);
406       enqueueTerminatedEvent(previous);
407     } finally {
408       monitor.leave();
409       dispatchListenerEvents();
410     }
411   }
412 
413   /**
414    * Invoke this method to transition the service to the {@link State#FAILED}. The service will
415    * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
416    * or otherwise cannot be started nor stopped.
417    */
418   protected final void notifyFailed(Throwable cause) {
419     checkNotNull(cause);
420 
421     monitor.enter();
422     try {
423       State previous = state();
424       switch (previous) {
425         case NEW:
426         case TERMINATED:
427           throw new IllegalStateException("Failed while in state:" + previous, cause);
428         case RUNNING:
429         case STARTING:
430         case STOPPING:
431           snapshot = new StateSnapshot(FAILED, false, cause);
432           enqueueFailedEvent(previous, cause);
433           break;
434         case FAILED:
435           // Do nothing
436           break;
437         default:
438           throw new AssertionError("Unexpected state: " + previous);
439       }
440     } finally {
441       monitor.leave();
442       dispatchListenerEvents();
443     }
444   }
445 
446   @Override
447   public final boolean isRunning() {
448     return state() == RUNNING;
449   }
450 
451   @Override
452   public final State state() {
453     return snapshot.externalState();
454   }
455 
456   /**
457    * @since 14.0
458    */
459   @Override
460   public final Throwable failureCause() {
461     return snapshot.failureCause();
462   }
463 
464   /**
465    * @since 13.0
466    */
467   @Override
468   public final void addListener(Listener listener, Executor executor) {
469     listeners.addListener(listener, executor);
470   }
471 
472   @Override
473   public String toString() {
474     return getClass().getSimpleName() + " [" + state() + "]";
475   }
476 
477   /**
478    * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link
479    * #monitor}.
480    */
481   private void dispatchListenerEvents() {
482     if (!monitor.isOccupiedByCurrentThread()) {
483       listeners.dispatch();
484     }
485   }
486 
487   private void enqueueStartingEvent() {
488     listeners.enqueue(STARTING_EVENT);
489   }
490 
491   private void enqueueRunningEvent() {
492     listeners.enqueue(RUNNING_EVENT);
493   }
494 
495   private void enqueueStoppingEvent(final State from) {
496     if (from == State.STARTING) {
497       listeners.enqueue(STOPPING_FROM_STARTING_EVENT);
498     } else if (from == State.RUNNING) {
499       listeners.enqueue(STOPPING_FROM_RUNNING_EVENT);
500     } else {
501       throw new AssertionError();
502     }
503   }
504 
505   private void enqueueTerminatedEvent(final State from) {
506     switch (from) {
507       case NEW:
508         listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
509         break;
510       case RUNNING:
511         listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
512         break;
513       case STOPPING:
514         listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
515         break;
516       case STARTING:
517       case TERMINATED:
518       case FAILED:
519       default:
520         throw new AssertionError();
521     }
522   }
523 
524   private void enqueueFailedEvent(final State from, final Throwable cause) {
525     // can't memoize this one due to the exception
526     listeners.enqueue(
527         new ListenerCallQueue.Event<Listener>() {
528           @Override
529           public void call(Listener listener) {
530             listener.failed(from, cause);
531           }
532 
533           @Override
534           public String toString() {
535             return "failed({from = " + from + ", cause = " + cause + "})";
536           }
537         });
538   }
539 
540   /**
541    * An immutable snapshot of the current state of the service. This class represents a consistent
542    * snapshot of the state and therefore it can be used to answer simple queries without needing to
543    * grab a lock.
544    */
545   @Immutable
546   private static final class StateSnapshot {
547     /**
548      * The internal state, which equals external state unless shutdownWhenStartupFinishes is true.
549      */
550     final State state;
551 
552     /**
553      * If true, the user requested a shutdown while the service was still starting up.
554      */
555     final boolean shutdownWhenStartupFinishes;
556 
557     /**
558      * The exception that caused this service to fail. This will be {@code null} unless the service
559      * has failed.
560      */
561     @Nullable final Throwable failure;
562 
563     StateSnapshot(State internalState) {
564       this(internalState, false, null);
565     }
566 
567     StateSnapshot(
568         State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
569       checkArgument(
570           !shutdownWhenStartupFinishes || internalState == STARTING,
571           "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
572           internalState);
573       checkArgument(
574           !(failure != null ^ internalState == FAILED),
575           "A failure cause should be set if and only if the state is failed.  Got %s and %s "
576               + "instead.",
577           internalState,
578           failure);
579       this.state = internalState;
580       this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
581       this.failure = failure;
582     }
583 
584     /** @see Service#state() */
585     State externalState() {
586       if (shutdownWhenStartupFinishes && state == STARTING) {
587         return STOPPING;
588       } else {
589         return state;
590       }
591     }
592 
593     /** @see Service#failureCause() */
594     Throwable failureCause() {
595       checkState(
596           state == FAILED,
597           "failureCause() is only valid if the service has failed, service is %s",
598           state);
599       return failure;
600     }
601   }
602 }