View Javadoc
1   /*
2    * Copyright (C) 2014 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.util.concurrent;
16  
17  import static com.google.common.base.Preconditions.checkNotNull;
18  
19  import com.google.common.annotations.GwtIncompatible;
20  import com.google.common.base.Preconditions;
21  import com.google.common.collect.Queues;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.concurrent.Executor;
27  import java.util.logging.Level;
28  import java.util.logging.Logger;
29  import javax.annotation.concurrent.GuardedBy;
30  
31  /**
32   * A list of listeners for implementing a concurrency friendly observable object.
33   *
34   * <p>Listeners are registered once via {@link #addListener} and then may be invoked by {@linkplain
35   * #enqueue enqueueing} and then {@linkplain #dispatch dispatching} events.
36   *
37   * <p>The API of this class is designed to make it easy to achieve the following properties
38   *
39   * <ul>
40   *   <li>Multiple events for the same listener are never dispatched concurrently.
41   *   <li>Events for the different listeners are dispatched concurrently.
42   *   <li>All events for a given listener dispatch on the provided {@link #executor}.
43   *   <li>It is easy for the user to ensure that listeners are never invoked while holding locks.
44   * </ul>
45   *
46   * The last point is subtle. Often the observable object will be managing its own internal state
47   * using a lock, however it is dangerous to dispatch listeners while holding a lock because they
48   * might run on the {@code directExecutor()} or be otherwise re-entrant (call back into your
49   * object). So it is important to not call {@link #dispatch} while holding any locks. This is why
50   * {@link #enqueue} and {@link #dispatch} are 2 different methods. It is expected that the decision
51   * to run a particular event is made during the state change, but the decision to actually invoke
52   * the listeners can be delayed slightly so that locks can be dropped. Also, because {@link
53   * #dispatch} is expected to be called concurrently, it is idempotent.
54   */
55  @GwtIncompatible
56  final class ListenerCallQueue<L> {
57    // TODO(cpovirk): consider using the logger associated with listener.getClass().
58    private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName());
59  
60    // TODO(chrisn): promote AppendOnlyCollection for use here.
61    private final List<PerListenerQueue<L>> listeners =
62        Collections.synchronizedList(new ArrayList<PerListenerQueue<L>>());
63  
64    /** Method reference-compatible listener event. */
65    public interface Event<L> {
66      /** Call a method on the listener. */
67      void call(L listener);
68    }
69  
70    /**
71     * Adds a listener that will be called using the given executor when events are later {@link
72     * #enqueue enqueued} and {@link #dispatch dispatched}.
73     */
74    public void addListener(L listener, Executor executor) {
75      checkNotNull(listener, "listener");
76      checkNotNull(executor, "executor");
77      listeners.add(new PerListenerQueue<>(listener, executor));
78    }
79  
80    /**
81     * Enqueues an event to be run on currently known listeners.
82     *
83     * <p>The {@code toString} method of the Event itself will be used to describe the event in the
84     * case of an error.
85     *
86     * @param event the callback to execute on {@link #dispatch}
87     */
88    public void enqueue(Event<L> event) {
89      enqueueHelper(event, event);
90    }
91  
92    /**
93     * Enqueues an event to be run on currently known listeners, with a label.
94     *
95     * @param event the callback to execute on {@link #dispatch}
96     * @param label a description of the event to use in the case of an error
97     */
98    public void enqueue(Event<L> event, String label) {
99      enqueueHelper(event, label);
100   }
101 
102   private void enqueueHelper(Event<L> event, Object label) {
103     checkNotNull(event, "event");
104     checkNotNull(label, "label");
105     synchronized (listeners) {
106       for (PerListenerQueue<L> queue : listeners) {
107         queue.add(event, label);
108       }
109     }
110   }
111 
112   /**
113    * Dispatches all events enqueued prior to this call, serially and in order, for every listener.
114    *
115    * <p>Note: this method is idempotent and safe to call from any thread
116    */
117   public void dispatch() {
118     // iterate by index to avoid concurrent modification exceptions
119     for (int i = 0; i < listeners.size(); i++) {
120       listeners.get(i).dispatch();
121     }
122   }
123 
124   /**
125    * A special purpose queue/executor that dispatches listener events serially on a configured
126    * executor. Each event event can be added and dispatched as separate phases.
127    *
128    * <p>This class is very similar to {@link SequentialExecutor} with the exception that events can
129    * be added without necessarily executing immediately.
130    */
131   private static final class PerListenerQueue<L> implements Runnable {
132     final L listener;
133     final Executor executor;
134 
135     @GuardedBy("this")
136     final Queue<ListenerCallQueue.Event<L>> waitQueue = Queues.newArrayDeque();
137 
138     @GuardedBy("this")
139     final Queue<Object> labelQueue = Queues.newArrayDeque();
140 
141     @GuardedBy("this")
142     boolean isThreadScheduled;
143 
144     PerListenerQueue(L listener, Executor executor) {
145       this.listener = checkNotNull(listener);
146       this.executor = checkNotNull(executor);
147     }
148 
149     /** Enqueues a event to be run. */
150     synchronized void add(ListenerCallQueue.Event<L> event, Object label) {
151       waitQueue.add(event);
152       labelQueue.add(label);
153     }
154 
155     /**
156      * Dispatches all listeners {@linkplain #enqueue enqueued} prior to this call, serially and in
157      * order.
158      */
159     void dispatch() {
160       boolean scheduleEventRunner = false;
161       synchronized (this) {
162         if (!isThreadScheduled) {
163           isThreadScheduled = true;
164           scheduleEventRunner = true;
165         }
166       }
167       if (scheduleEventRunner) {
168         try {
169           executor.execute(this);
170         } catch (RuntimeException e) {
171           // reset state in case of an error so that later dispatch calls will actually do something
172           synchronized (this) {
173             isThreadScheduled = false;
174           }
175           // Log it and keep going.
176           logger.log(
177               Level.SEVERE,
178               "Exception while running callbacks for " + listener + " on " + executor,
179               e);
180           throw e;
181         }
182       }
183     }
184 
185     @Override
186     public void run() {
187       boolean stillRunning = true;
188       try {
189         while (true) {
190           ListenerCallQueue.Event<L> nextToRun;
191           Object nextLabel;
192           synchronized (PerListenerQueue.this) {
193             Preconditions.checkState(isThreadScheduled);
194             nextToRun = waitQueue.poll();
195             nextLabel = labelQueue.poll();
196             if (nextToRun == null) {
197               isThreadScheduled = false;
198               stillRunning = false;
199               break;
200             }
201           }
202 
203           // Always run while _not_ holding the lock, to avoid deadlocks.
204           try {
205             nextToRun.call(listener);
206           } catch (RuntimeException e) {
207             // Log it and keep going.
208             logger.log(
209                 Level.SEVERE,
210                 "Exception while executing callback: " + listener + " " + nextLabel,
211                 e);
212           }
213         }
214       } finally {
215         if (stillRunning) {
216           // An Error is bubbling up. We should mark ourselves as no longer running. That way, if
217           // anyone tries to keep using us, we won't be corrupted.
218           synchronized (PerListenerQueue.this) {
219             isThreadScheduled = false;
220           }
221         }
222       }
223     }
224   }
225 }