View Javadoc
1   /*
2    * Copyright (C) 2014 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.eventbus;
16  
17  import static com.google.common.base.Preconditions.checkNotNull;
18  
19  import com.google.common.collect.Queues;
20  import java.util.Iterator;
21  import java.util.Queue;
22  import java.util.concurrent.ConcurrentLinkedQueue;
23  
24  /**
25   * Handler for dispatching events to subscribers, providing different event ordering guarantees that
26   * make sense for different situations.
27   *
28   * <p><b>Note:</b> The dispatcher is orthogonal to the subscriber's {@code Executor}. The dispatcher
29   * controls the order in which events are dispatched, while the executor controls how (i.e. on which
30   * thread) the subscriber is actually called when an event is dispatched to it.
31   *
32   * @author Colin Decker
33   */
34  abstract class Dispatcher {
35  
36    /**
37     * Returns a dispatcher that queues events that are posted reentrantly on a thread that is already
38     * dispatching an event, guaranteeing that all events posted on a single thread are dispatched to
39     * all subscribers in the order they are posted.
40     *
41     * <p>When all subscribers are dispatched to using a <i>direct</i> executor (which dispatches on
42     * the same thread that posts the event), this yields a breadth-first dispatch order on each
43     * thread. That is, all subscribers to a single event A will be called before any subscribers to
44     * any events B and C that are posted to the event bus by the subscribers to A.
45     */
46    static Dispatcher perThreadDispatchQueue() {
47      return new PerThreadQueuedDispatcher();
48    }
49  
50    /**
51     * Returns a dispatcher that queues events that are posted in a single global queue. This behavior
52     * matches the original behavior of AsyncEventBus exactly, but is otherwise not especially useful.
53     * For async dispatch, an {@linkplain #immediate() immediate} dispatcher should generally be
54     * preferable.
55     */
56    static Dispatcher legacyAsync() {
57      return new LegacyAsyncDispatcher();
58    }
59  
60    /**
61     * Returns a dispatcher that dispatches events to subscribers immediately as they're posted
62     * without using an intermediate queue to change the dispatch order. This is effectively a
63     * depth-first dispatch order, vs. breadth-first when using a queue.
64     */
65    static Dispatcher immediate() {
66      return ImmediateDispatcher.INSTANCE;
67    }
68  
69    /**
70     * Dispatches the given {@code event} to the given {@code subscribers}.
71     */
72    abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
73  
74    /**
75     * Implementation of a {@link #perThreadDispatchQueue()} dispatcher.
76     */
77    private static final class PerThreadQueuedDispatcher extends Dispatcher {
78  
79      // This dispatcher matches the original dispatch behavior of EventBus.
80  
81      /**
82       * Per-thread queue of events to dispatch.
83       */
84      private final ThreadLocal<Queue<Event>> queue =
85          new ThreadLocal<Queue<Event>>() {
86            @Override
87            protected Queue<Event> initialValue() {
88              return Queues.newArrayDeque();
89            }
90          };
91  
92      /**
93       * Per-thread dispatch state, used to avoid reentrant event dispatching.
94       */
95      private final ThreadLocal<Boolean> dispatching =
96          new ThreadLocal<Boolean>() {
97            @Override
98            protected Boolean initialValue() {
99              return false;
100           }
101         };
102 
103     @Override
104     void dispatch(Object event, Iterator<Subscriber> subscribers) {
105       checkNotNull(event);
106       checkNotNull(subscribers);
107       Queue<Event> queueForThread = queue.get();
108       queueForThread.offer(new Event(event, subscribers));
109 
110       if (!dispatching.get()) {
111         dispatching.set(true);
112         try {
113           Event nextEvent;
114           while ((nextEvent = queueForThread.poll()) != null) {
115             while (nextEvent.subscribers.hasNext()) {
116               nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
117             }
118           }
119         } finally {
120           dispatching.remove();
121           queue.remove();
122         }
123       }
124     }
125 
126     private static final class Event {
127       private final Object event;
128       private final Iterator<Subscriber> subscribers;
129 
130       private Event(Object event, Iterator<Subscriber> subscribers) {
131         this.event = event;
132         this.subscribers = subscribers;
133       }
134     }
135   }
136 
137   /**
138    * Implementation of a {@link #legacyAsync()} dispatcher.
139    */
140   private static final class LegacyAsyncDispatcher extends Dispatcher {
141 
142     // This dispatcher matches the original dispatch behavior of AsyncEventBus.
143     //
144     // We can't really make any guarantees about the overall dispatch order for this dispatcher in
145     // a multithreaded environment for a couple reasons:
146     //
147     // 1. Subscribers to events posted on different threads can be interleaved with each other
148     //    freely. (A event on one thread, B event on another could yield any of
149     //    [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
150     // 2. It's possible for subscribers to actually be dispatched to in a different order than they
151     //    were added to the queue. It's easily possible for one thread to take the head of the
152     //    queue, immediately followed by another thread taking the next element in the queue. That
153     //    second thread can then dispatch to the subscriber it took before the first thread does.
154     //
155     // All this makes me really wonder if there's any value in queueing here at all. A dispatcher
156     // that simply loops through the subscribers and dispatches the event to each would actually
157     // probably provide a stronger order guarantee, though that order would obviously be different
158     // in some cases.
159 
160     /**
161      * Global event queue.
162      */
163     private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
164         Queues.newConcurrentLinkedQueue();
165 
166     @Override
167     void dispatch(Object event, Iterator<Subscriber> subscribers) {
168       checkNotNull(event);
169       while (subscribers.hasNext()) {
170         queue.add(new EventWithSubscriber(event, subscribers.next()));
171       }
172 
173       EventWithSubscriber e;
174       while ((e = queue.poll()) != null) {
175         e.subscriber.dispatchEvent(e.event);
176       }
177     }
178 
179     private static final class EventWithSubscriber {
180       private final Object event;
181       private final Subscriber subscriber;
182 
183       private EventWithSubscriber(Object event, Subscriber subscriber) {
184         this.event = event;
185         this.subscriber = subscriber;
186       }
187     }
188   }
189 
190   /**
191    * Implementation of {@link #immediate()}.
192    */
193   private static final class ImmediateDispatcher extends Dispatcher {
194     private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();
195 
196     @Override
197     void dispatch(Object event, Iterator<Subscriber> subscribers) {
198       checkNotNull(event);
199       while (subscribers.hasNext()) {
200         subscribers.next().dispatchEvent(event);
201       }
202     }
203   }
204 }