View Javadoc
1   /*
2    * Copyright (C) 2008 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.util.concurrent;
16  
17  import com.google.common.annotations.GwtIncompatible;
18  import com.google.common.base.Preconditions;
19  import com.google.j2objc.annotations.WeakOuter;
20  import java.util.ArrayDeque;
21  import java.util.Queue;
22  import java.util.concurrent.Executor;
23  import java.util.logging.Level;
24  import java.util.logging.Logger;
25  import javax.annotation.concurrent.GuardedBy;
26  
27  /**
28   * Executor ensuring that all Runnables submitted are executed in order, using the provided
29   * Executor, and sequentially such that no two will ever be running at the same time.
30   *
31   * <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order.
32   *
33   * <p>The execution of tasks is done by one thread as long as there are tasks left in the queue.
34   * When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
35   * continues. {@code RuntimeException}s thrown by tasks are simply logged and the executor keeps
36   * trucking. If an {@code Error} is thrown, the error will propagate and execution will stop until
37   * it is restarted by a call to {@link #execute}.
38   */
39  @GwtIncompatible
40  final class SequentialExecutor implements Executor {
41    private static final Logger log = Logger.getLogger(SequentialExecutor.class.getName());
42  
43    /** Underlying executor that all submitted Runnable objects are run on. */
44    private final Executor executor;
45  
46    @GuardedBy("queue")
47    private final Queue<Runnable> queue = new ArrayDeque<>();
48  
49    @GuardedBy("queue")
50    private boolean isWorkerRunning = false;
51  
52    private final QueueWorker worker = new QueueWorker();
53  
54    /** Use {@link MoreExecutors#sequentialExecutor} */
55    SequentialExecutor(Executor executor) {
56      this.executor = Preconditions.checkNotNull(executor);
57    }
58  
59    /**
60     * Adds a task to the queue and makes sure a worker thread is running.
61     *
62     * <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor,
63     * execution of tasks will stop until a call to this method or to {@link #resume()} is made.
64     */
65    @Override
66    public void execute(Runnable task) {
67      synchronized (queue) {
68        queue.add(task);
69        if (isWorkerRunning) {
70          return;
71        }
72        isWorkerRunning = true;
73      }
74      startQueueWorker();
75    }
76  
77    /**
78     * Starts a worker.  This should only be called if:
79     *
80     * <ul>
81     *   <li>{@code suspensions == 0}
82     *   <li>{@code isWorkerRunning == true}
83     *   <li>{@code !queue.isEmpty()}
84     *   <li>the {@link #worker} lock is not held
85     * </ul>
86     */
87    private void startQueueWorker() {
88      boolean executionRejected = true;
89      try {
90        executor.execute(worker);
91        executionRejected = false;
92      } finally {
93        if (executionRejected) {
94          // The best we can do is to stop executing the queue, but reset the state so that
95          // execution can be resumed later if the caller so wishes.
96          synchronized (queue) {
97            isWorkerRunning = false;
98          }
99        }
100     }
101   }
102 
103   /**
104    * Worker that runs tasks from {@link #queue} until it is empty.
105    */
106   @WeakOuter
107   private final class QueueWorker implements Runnable {
108     @Override
109     public void run() {
110       try {
111         workOnQueue();
112       } catch (Error e) {
113         synchronized (queue) {
114           isWorkerRunning = false;
115         }
116         throw e;
117         // The execution of a task has ended abnormally.
118         // We could have tasks left in the queue, so should perhaps try to restart a worker,
119         // but then the Error will get delayed if we are using a direct (same thread) executor.
120       }
121     }
122 
123     private void workOnQueue() {
124       while (true) {
125         Runnable task = null;
126         synchronized (queue) {
127           // TODO(user): How should we handle interrupts and shutdowns?
128           task = queue.poll();
129           if (task == null) {
130             isWorkerRunning = false;
131             return;
132           }
133         }
134         try {
135           task.run();
136         } catch (RuntimeException e) {
137           log.log(Level.SEVERE, "Exception while executing runnable " + task, e);
138         }
139       }
140     }
141   }
142 }