Coverage Summary for Class: Dispatcher (com.google.common.eventbus)
| Class | Method, % | Line, % |
|---|---|---|
| Dispatcher | 0% (0/4) | 0% (0/4) |
| Dispatcher$ImmediateDispatcher | 0% (0/3) | 0% (0/5) |
| Dispatcher$LegacyAsyncDispatcher | 0% (0/3) | 0% (0/7) |
| Dispatcher$LegacyAsyncDispatcher$EventWithSubscriber | 0% (0/2) | 0% (0/4) |
| Dispatcher$PerThreadQueuedDispatcher | 0% (0/3) | 0% (0/15) |
| Dispatcher$PerThreadQueuedDispatcher$1 | 0% (0/2) | 0% (0/2) |
| Dispatcher$PerThreadQueuedDispatcher$2 | 0% (0/2) | 0% (0/2) |
| Dispatcher$PerThreadQueuedDispatcher$Event | 0% (0/2) | 0% (0/4) |
| Total | 0% (0/21) | 0% (0/43) |
1 /* 2 * Copyright (C) 2014 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.eventbus; 16 17 import static com.google.common.base.Preconditions.checkNotNull; 18 19 import com.google.common.collect.Queues; 20 import java.util.Iterator; 21 import java.util.Queue; 22 import java.util.concurrent.ConcurrentLinkedQueue; 23 24 /** 25 * Handler for dispatching events to subscribers, providing different event ordering guarantees that 26 * make sense for different situations. 27 * 28 * <p><b>Note:</b> The dispatcher is orthogonal to the subscriber's {@code Executor}. The dispatcher 29 * controls the order in which events are dispatched, while the executor controls how (i.e. on which 30 * thread) the subscriber is actually called when an event is dispatched to it. 31 * 32 * @author Colin Decker 33 */ 34 @ElementTypesAreNonnullByDefault 35 abstract class Dispatcher { 36 37 /** 38 * Returns a dispatcher that queues events that are posted reentrantly on a thread that is already 39 * dispatching an event, guaranteeing that all events posted on a single thread are dispatched to 40 * all subscribers in the order they are posted. 41 * 42 * <p>When all subscribers are dispatched to using a <i>direct</i> executor (which dispatches on 43 * the same thread that posts the event), this yields a breadth-first dispatch order on each 44 * thread. That is, all subscribers to a single event A will be called before any subscribers to 45 * any events B and C that are posted to the event bus by the subscribers to A. 46 */ 47 static Dispatcher perThreadDispatchQueue() { 48 return new PerThreadQueuedDispatcher(); 49 } 50 51 /** 52 * Returns a dispatcher that queues events that are posted in a single global queue. This behavior 53 * matches the original behavior of AsyncEventBus exactly, but is otherwise not especially useful. 54 * For async dispatch, an {@linkplain #immediate() immediate} dispatcher should generally be 55 * preferable. 56 */ 57 static Dispatcher legacyAsync() { 58 return new LegacyAsyncDispatcher(); 59 } 60 61 /** 62 * Returns a dispatcher that dispatches events to subscribers immediately as they're posted 63 * without using an intermediate queue to change the dispatch order. This is effectively a 64 * depth-first dispatch order, vs. breadth-first when using a queue. 65 */ 66 static Dispatcher immediate() { 67 return ImmediateDispatcher.INSTANCE; 68 } 69 70 /** Dispatches the given {@code event} to the given {@code subscribers}. */ 71 abstract void dispatch(Object event, Iterator<Subscriber> subscribers); 72 73 /** Implementation of a {@link #perThreadDispatchQueue()} dispatcher. */ 74 private static final class PerThreadQueuedDispatcher extends Dispatcher { 75 76 // This dispatcher matches the original dispatch behavior of EventBus. 77 78 /** Per-thread queue of events to dispatch. */ 79 private final ThreadLocal<Queue<Event>> queue = 80 new ThreadLocal<Queue<Event>>() { 81 @Override 82 protected Queue<Event> initialValue() { 83 return Queues.newArrayDeque(); 84 } 85 }; 86 87 /** Per-thread dispatch state, used to avoid reentrant event dispatching. */ 88 private final ThreadLocal<Boolean> dispatching = 89 new ThreadLocal<Boolean>() { 90 @Override 91 protected Boolean initialValue() { 92 return false; 93 } 94 }; 95 96 @Override 97 void dispatch(Object event, Iterator<Subscriber> subscribers) { 98 checkNotNull(event); 99 checkNotNull(subscribers); 100 Queue<Event> queueForThread = queue.get(); 101 queueForThread.offer(new Event(event, subscribers)); 102 103 if (!dispatching.get()) { 104 dispatching.set(true); 105 try { 106 Event nextEvent; 107 while ((nextEvent = queueForThread.poll()) != null) { 108 while (nextEvent.subscribers.hasNext()) { 109 nextEvent.subscribers.next().dispatchEvent(nextEvent.event); 110 } 111 } 112 } finally { 113 dispatching.remove(); 114 queue.remove(); 115 } 116 } 117 } 118 119 private static final class Event { 120 private final Object event; 121 private final Iterator<Subscriber> subscribers; 122 123 private Event(Object event, Iterator<Subscriber> subscribers) { 124 this.event = event; 125 this.subscribers = subscribers; 126 } 127 } 128 } 129 130 /** Implementation of a {@link #legacyAsync()} dispatcher. */ 131 private static final class LegacyAsyncDispatcher extends Dispatcher { 132 133 // This dispatcher matches the original dispatch behavior of AsyncEventBus. 134 // 135 // We can't really make any guarantees about the overall dispatch order for this dispatcher in 136 // a multithreaded environment for a couple reasons: 137 // 138 // 1. Subscribers to events posted on different threads can be interleaved with each other 139 // freely. (A event on one thread, B event on another could yield any of 140 // [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.) 141 // 2. It's possible for subscribers to actually be dispatched to in a different order than they 142 // were added to the queue. It's easily possible for one thread to take the head of the 143 // queue, immediately followed by another thread taking the next element in the queue. That 144 // second thread can then dispatch to the subscriber it took before the first thread does. 145 // 146 // All this makes me really wonder if there's any value in queueing here at all. A dispatcher 147 // that simply loops through the subscribers and dispatches the event to each would actually 148 // probably provide a stronger order guarantee, though that order would obviously be different 149 // in some cases. 150 151 /** Global event queue. */ 152 private final ConcurrentLinkedQueue<EventWithSubscriber> queue = 153 Queues.newConcurrentLinkedQueue(); 154 155 @Override 156 void dispatch(Object event, Iterator<Subscriber> subscribers) { 157 checkNotNull(event); 158 while (subscribers.hasNext()) { 159 queue.add(new EventWithSubscriber(event, subscribers.next())); 160 } 161 162 EventWithSubscriber e; 163 while ((e = queue.poll()) != null) { 164 e.subscriber.dispatchEvent(e.event); 165 } 166 } 167 168 private static final class EventWithSubscriber { 169 private final Object event; 170 private final Subscriber subscriber; 171 172 private EventWithSubscriber(Object event, Subscriber subscriber) { 173 this.event = event; 174 this.subscriber = subscriber; 175 } 176 } 177 } 178 179 /** Implementation of {@link #immediate()}. */ 180 private static final class ImmediateDispatcher extends Dispatcher { 181 private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher(); 182 183 @Override 184 void dispatch(Object event, Iterator<Subscriber> subscribers) { 185 checkNotNull(event); 186 while (subscribers.hasNext()) { 187 subscribers.next().dispatchEvent(event); 188 } 189 } 190 } 191 }