Coverage Summary for Class: Dispatcher (com.google.common.eventbus)

Class Method, % Line, %
Dispatcher 0% (0/4) 0% (0/4)
Dispatcher$ImmediateDispatcher 0% (0/3) 0% (0/5)
Dispatcher$LegacyAsyncDispatcher 0% (0/3) 0% (0/7)
Dispatcher$LegacyAsyncDispatcher$EventWithSubscriber 0% (0/2) 0% (0/4)
Dispatcher$PerThreadQueuedDispatcher 0% (0/3) 0% (0/15)
Dispatcher$PerThreadQueuedDispatcher$1 0% (0/2) 0% (0/2)
Dispatcher$PerThreadQueuedDispatcher$2 0% (0/2) 0% (0/2)
Dispatcher$PerThreadQueuedDispatcher$Event 0% (0/2) 0% (0/4)
Total 0% (0/21) 0% (0/43)


1 /* 2  * Copyright (C) 2014 The Guava Authors 3  * 4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5  * in compliance with the License. You may obtain a copy of the License at 6  * 7  * http://www.apache.org/licenses/LICENSE-2.0 8  * 9  * Unless required by applicable law or agreed to in writing, software distributed under the License 10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11  * or implied. See the License for the specific language governing permissions and limitations under 12  * the License. 13  */ 14  15 package com.google.common.eventbus; 16  17 import static com.google.common.base.Preconditions.checkNotNull; 18  19 import com.google.common.collect.Queues; 20 import java.util.Iterator; 21 import java.util.Queue; 22 import java.util.concurrent.ConcurrentLinkedQueue; 23  24 /** 25  * Handler for dispatching events to subscribers, providing different event ordering guarantees that 26  * make sense for different situations. 27  * 28  * <p><b>Note:</b> The dispatcher is orthogonal to the subscriber's {@code Executor}. The dispatcher 29  * controls the order in which events are dispatched, while the executor controls how (i.e. on which 30  * thread) the subscriber is actually called when an event is dispatched to it. 31  * 32  * @author Colin Decker 33  */ 34 @ElementTypesAreNonnullByDefault 35 abstract class Dispatcher { 36  37  /** 38  * Returns a dispatcher that queues events that are posted reentrantly on a thread that is already 39  * dispatching an event, guaranteeing that all events posted on a single thread are dispatched to 40  * all subscribers in the order they are posted. 41  * 42  * <p>When all subscribers are dispatched to using a <i>direct</i> executor (which dispatches on 43  * the same thread that posts the event), this yields a breadth-first dispatch order on each 44  * thread. That is, all subscribers to a single event A will be called before any subscribers to 45  * any events B and C that are posted to the event bus by the subscribers to A. 46  */ 47  static Dispatcher perThreadDispatchQueue() { 48  return new PerThreadQueuedDispatcher(); 49  } 50  51  /** 52  * Returns a dispatcher that queues events that are posted in a single global queue. This behavior 53  * matches the original behavior of AsyncEventBus exactly, but is otherwise not especially useful. 54  * For async dispatch, an {@linkplain #immediate() immediate} dispatcher should generally be 55  * preferable. 56  */ 57  static Dispatcher legacyAsync() { 58  return new LegacyAsyncDispatcher(); 59  } 60  61  /** 62  * Returns a dispatcher that dispatches events to subscribers immediately as they're posted 63  * without using an intermediate queue to change the dispatch order. This is effectively a 64  * depth-first dispatch order, vs. breadth-first when using a queue. 65  */ 66  static Dispatcher immediate() { 67  return ImmediateDispatcher.INSTANCE; 68  } 69  70  /** Dispatches the given {@code event} to the given {@code subscribers}. */ 71  abstract void dispatch(Object event, Iterator<Subscriber> subscribers); 72  73  /** Implementation of a {@link #perThreadDispatchQueue()} dispatcher. */ 74  private static final class PerThreadQueuedDispatcher extends Dispatcher { 75  76  // This dispatcher matches the original dispatch behavior of EventBus. 77  78  /** Per-thread queue of events to dispatch. */ 79  private final ThreadLocal<Queue<Event>> queue = 80  new ThreadLocal<Queue<Event>>() { 81  @Override 82  protected Queue<Event> initialValue() { 83  return Queues.newArrayDeque(); 84  } 85  }; 86  87  /** Per-thread dispatch state, used to avoid reentrant event dispatching. */ 88  private final ThreadLocal<Boolean> dispatching = 89  new ThreadLocal<Boolean>() { 90  @Override 91  protected Boolean initialValue() { 92  return false; 93  } 94  }; 95  96  @Override 97  void dispatch(Object event, Iterator<Subscriber> subscribers) { 98  checkNotNull(event); 99  checkNotNull(subscribers); 100  Queue<Event> queueForThread = queue.get(); 101  queueForThread.offer(new Event(event, subscribers)); 102  103  if (!dispatching.get()) { 104  dispatching.set(true); 105  try { 106  Event nextEvent; 107  while ((nextEvent = queueForThread.poll()) != null) { 108  while (nextEvent.subscribers.hasNext()) { 109  nextEvent.subscribers.next().dispatchEvent(nextEvent.event); 110  } 111  } 112  } finally { 113  dispatching.remove(); 114  queue.remove(); 115  } 116  } 117  } 118  119  private static final class Event { 120  private final Object event; 121  private final Iterator<Subscriber> subscribers; 122  123  private Event(Object event, Iterator<Subscriber> subscribers) { 124  this.event = event; 125  this.subscribers = subscribers; 126  } 127  } 128  } 129  130  /** Implementation of a {@link #legacyAsync()} dispatcher. */ 131  private static final class LegacyAsyncDispatcher extends Dispatcher { 132  133  // This dispatcher matches the original dispatch behavior of AsyncEventBus. 134  // 135  // We can't really make any guarantees about the overall dispatch order for this dispatcher in 136  // a multithreaded environment for a couple reasons: 137  // 138  // 1. Subscribers to events posted on different threads can be interleaved with each other 139  // freely. (A event on one thread, B event on another could yield any of 140  // [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.) 141  // 2. It's possible for subscribers to actually be dispatched to in a different order than they 142  // were added to the queue. It's easily possible for one thread to take the head of the 143  // queue, immediately followed by another thread taking the next element in the queue. That 144  // second thread can then dispatch to the subscriber it took before the first thread does. 145  // 146  // All this makes me really wonder if there's any value in queueing here at all. A dispatcher 147  // that simply loops through the subscribers and dispatches the event to each would actually 148  // probably provide a stronger order guarantee, though that order would obviously be different 149  // in some cases. 150  151  /** Global event queue. */ 152  private final ConcurrentLinkedQueue<EventWithSubscriber> queue = 153  Queues.newConcurrentLinkedQueue(); 154  155  @Override 156  void dispatch(Object event, Iterator<Subscriber> subscribers) { 157  checkNotNull(event); 158  while (subscribers.hasNext()) { 159  queue.add(new EventWithSubscriber(event, subscribers.next())); 160  } 161  162  EventWithSubscriber e; 163  while ((e = queue.poll()) != null) { 164  e.subscriber.dispatchEvent(e.event); 165  } 166  } 167  168  private static final class EventWithSubscriber { 169  private final Object event; 170  private final Subscriber subscriber; 171  172  private EventWithSubscriber(Object event, Subscriber subscriber) { 173  this.event = event; 174  this.subscriber = subscriber; 175  } 176  } 177  } 178  179  /** Implementation of {@link #immediate()}. */ 180  private static final class ImmediateDispatcher extends Dispatcher { 181  private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher(); 182  183  @Override 184  void dispatch(Object event, Iterator<Subscriber> subscribers) { 185  checkNotNull(event); 186  while (subscribers.hasNext()) { 187  subscribers.next().dispatchEvent(event); 188  } 189  } 190  } 191 }