Coverage Summary for Class: ListenerCallQueue (com.google.common.util.concurrent)

Class Method, % Line, %
ListenerCallQueue 0% (0/8) 0% (0/17)
ListenerCallQueue$PerListenerQueue 0% (0/4) 0% (0/44)
Total 0% (0/12) 0% (0/61)


1 /* 2  * Copyright (C) 2014 The Guava Authors 3  * 4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5  * in compliance with the License. You may obtain a copy of the License at 6  * 7  * http://www.apache.org/licenses/LICENSE-2.0 8  * 9  * Unless required by applicable law or agreed to in writing, software distributed under the License 10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11  * or implied. See the License for the specific language governing permissions and limitations under 12  * the License. 13  */ 14  15 package com.google.common.util.concurrent; 16  17 import static com.google.common.base.Preconditions.checkNotNull; 18  19 import com.google.common.annotations.GwtIncompatible; 20 import com.google.common.base.Preconditions; 21 import com.google.common.collect.Queues; 22 import com.google.errorprone.annotations.concurrent.GuardedBy; 23 import java.util.ArrayList; 24 import java.util.Collections; 25 import java.util.List; 26 import java.util.Queue; 27 import java.util.concurrent.Executor; 28 import java.util.logging.Level; 29 import java.util.logging.Logger; 30  31 /** 32  * A list of listeners for implementing a concurrency friendly observable object. 33  * 34  * <p>Listeners are registered once via {@link #addListener} and then may be invoked by {@linkplain 35  * #enqueue enqueueing} and then {@linkplain #dispatch dispatching} events. 36  * 37  * <p>The API of this class is designed to make it easy to achieve the following properties 38  * 39  * <ul> 40  * <li>Multiple events for the same listener are never dispatched concurrently. 41  * <li>Events for the different listeners are dispatched concurrently. 42  * <li>All events for a given listener dispatch on the provided {@link #executor}. 43  * <li>It is easy for the user to ensure that listeners are never invoked while holding locks. 44  * </ul> 45  * 46  * The last point is subtle. Often the observable object will be managing its own internal state 47  * using a lock, however it is dangerous to dispatch listeners while holding a lock because they 48  * might run on the {@code directExecutor()} or be otherwise re-entrant (call back into your 49  * object). So it is important to not call {@link #dispatch} while holding any locks. This is why 50  * {@link #enqueue} and {@link #dispatch} are 2 different methods. It is expected that the decision 51  * to run a particular event is made during the state change, but the decision to actually invoke 52  * the listeners can be delayed slightly so that locks can be dropped. Also, because {@link 53  * #dispatch} is expected to be called concurrently, it is idempotent. 54  */ 55 @GwtIncompatible 56 @ElementTypesAreNonnullByDefault 57 final class ListenerCallQueue<L> { 58  // TODO(cpovirk): consider using the logger associated with listener.getClass(). 59  private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName()); 60  61  // TODO(chrisn): promote AppendOnlyCollection for use here. 62  private final List<PerListenerQueue<L>> listeners = 63  Collections.synchronizedList(new ArrayList<PerListenerQueue<L>>()); 64  65  /** Method reference-compatible listener event. */ 66  interface Event<L> { 67  /** Call a method on the listener. */ 68  void call(L listener); 69  } 70  71  /** 72  * Adds a listener that will be called using the given executor when events are later {@link 73  * #enqueue enqueued} and {@link #dispatch dispatched}. 74  */ 75  public void addListener(L listener, Executor executor) { 76  checkNotNull(listener, "listener"); 77  checkNotNull(executor, "executor"); 78  listeners.add(new PerListenerQueue<>(listener, executor)); 79  } 80  81  /** 82  * Enqueues an event to be run on currently known listeners. 83  * 84  * <p>The {@code toString} method of the Event itself will be used to describe the event in the 85  * case of an error. 86  * 87  * @param event the callback to execute on {@link #dispatch} 88  */ 89  public void enqueue(Event<L> event) { 90  enqueueHelper(event, event); 91  } 92  93  /** 94  * Enqueues an event to be run on currently known listeners, with a label. 95  * 96  * @param event the callback to execute on {@link #dispatch} 97  * @param label a description of the event to use in the case of an error 98  */ 99  public void enqueue(Event<L> event, String label) { 100  enqueueHelper(event, label); 101  } 102  103  private void enqueueHelper(Event<L> event, Object label) { 104  checkNotNull(event, "event"); 105  checkNotNull(label, "label"); 106  synchronized (listeners) { 107  for (PerListenerQueue<L> queue : listeners) { 108  queue.add(event, label); 109  } 110  } 111  } 112  113  /** 114  * Dispatches all events enqueued prior to this call, serially and in order, for every listener. 115  * 116  * <p>Note: this method is idempotent and safe to call from any thread 117  */ 118  public void dispatch() { 119  // iterate by index to avoid concurrent modification exceptions 120  for (int i = 0; i < listeners.size(); i++) { 121  listeners.get(i).dispatch(); 122  } 123  } 124  125  /** 126  * A special purpose queue/executor that dispatches listener events serially on a configured 127  * executor. Each event can be added and dispatched as separate phases. 128  * 129  * <p>This class is very similar to {@link SequentialExecutor} with the exception that events can 130  * be added without necessarily executing immediately. 131  */ 132  private static final class PerListenerQueue<L> implements Runnable { 133  final L listener; 134  final Executor executor; 135  136  @GuardedBy("this") 137  final Queue<ListenerCallQueue.Event<L>> waitQueue = Queues.newArrayDeque(); 138  139  @GuardedBy("this") 140  final Queue<Object> labelQueue = Queues.newArrayDeque(); 141  142  @GuardedBy("this") 143  boolean isThreadScheduled; 144  145  PerListenerQueue(L listener, Executor executor) { 146  this.listener = checkNotNull(listener); 147  this.executor = checkNotNull(executor); 148  } 149  150  /** Enqueues a event to be run. */ 151  synchronized void add(ListenerCallQueue.Event<L> event, Object label) { 152  waitQueue.add(event); 153  labelQueue.add(label); 154  } 155  156  /** 157  * Dispatches all listeners {@linkplain #enqueue enqueued} prior to this call, serially and in 158  * order. 159  */ 160  void dispatch() { 161  boolean scheduleEventRunner = false; 162  synchronized (this) { 163  if (!isThreadScheduled) { 164  isThreadScheduled = true; 165  scheduleEventRunner = true; 166  } 167  } 168  if (scheduleEventRunner) { 169  try { 170  executor.execute(this); 171  } catch (RuntimeException e) { 172  // reset state in case of an error so that later dispatch calls will actually do something 173  synchronized (this) { 174  isThreadScheduled = false; 175  } 176  // Log it and keep going. 177  logger.log( 178  Level.SEVERE, 179  "Exception while running callbacks for " + listener + " on " + executor, 180  e); 181  throw e; 182  } 183  } 184  } 185  186  @Override 187  public void run() { 188  boolean stillRunning = true; 189  try { 190  while (true) { 191  ListenerCallQueue.Event<L> nextToRun; 192  Object nextLabel; 193  synchronized (PerListenerQueue.this) { 194  Preconditions.checkState(isThreadScheduled); 195  nextToRun = waitQueue.poll(); 196  nextLabel = labelQueue.poll(); 197  if (nextToRun == null) { 198  isThreadScheduled = false; 199  stillRunning = false; 200  break; 201  } 202  } 203  204  // Always run while _not_ holding the lock, to avoid deadlocks. 205  try { 206  nextToRun.call(listener); 207  } catch (RuntimeException e) { 208  // Log it and keep going. 209  logger.log( 210  Level.SEVERE, 211  "Exception while executing callback: " + listener + " " + nextLabel, 212  e); 213  } 214  } 215  } finally { 216  if (stillRunning) { 217  // An Error is bubbling up. We should mark ourselves as no longer running. That way, if 218  // anyone tries to keep using us, we won't be corrupted. 219  synchronized (PerListenerQueue.this) { 220  isThreadScheduled = false; 221  } 222  } 223  } 224  } 225  } 226 }