Coverage Summary for Class: ListenerCallQueue (com.google.common.util.concurrent)
| Class | Method, % | Line, % |
|---|---|---|
| ListenerCallQueue | 0% (0/8) | 0% (0/17) |
| ListenerCallQueue$PerListenerQueue | 0% (0/4) | 0% (0/44) |
| Total | 0% (0/12) | 0% (0/61) |
1 /* 2 * Copyright (C) 2014 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.base.Preconditions.checkNotNull; 18 19 import com.google.common.annotations.GwtIncompatible; 20 import com.google.common.base.Preconditions; 21 import com.google.common.collect.Queues; 22 import com.google.errorprone.annotations.concurrent.GuardedBy; 23 import java.util.ArrayList; 24 import java.util.Collections; 25 import java.util.List; 26 import java.util.Queue; 27 import java.util.concurrent.Executor; 28 import java.util.logging.Level; 29 import java.util.logging.Logger; 30 31 /** 32 * A list of listeners for implementing a concurrency friendly observable object. 33 * 34 * <p>Listeners are registered once via {@link #addListener} and then may be invoked by {@linkplain 35 * #enqueue enqueueing} and then {@linkplain #dispatch dispatching} events. 36 * 37 * <p>The API of this class is designed to make it easy to achieve the following properties 38 * 39 * <ul> 40 * <li>Multiple events for the same listener are never dispatched concurrently. 41 * <li>Events for the different listeners are dispatched concurrently. 42 * <li>All events for a given listener dispatch on the provided {@link #executor}. 43 * <li>It is easy for the user to ensure that listeners are never invoked while holding locks. 44 * </ul> 45 * 46 * The last point is subtle. Often the observable object will be managing its own internal state 47 * using a lock, however it is dangerous to dispatch listeners while holding a lock because they 48 * might run on the {@code directExecutor()} or be otherwise re-entrant (call back into your 49 * object). So it is important to not call {@link #dispatch} while holding any locks. This is why 50 * {@link #enqueue} and {@link #dispatch} are 2 different methods. It is expected that the decision 51 * to run a particular event is made during the state change, but the decision to actually invoke 52 * the listeners can be delayed slightly so that locks can be dropped. Also, because {@link 53 * #dispatch} is expected to be called concurrently, it is idempotent. 54 */ 55 @GwtIncompatible 56 @ElementTypesAreNonnullByDefault 57 final class ListenerCallQueue<L> { 58 // TODO(cpovirk): consider using the logger associated with listener.getClass(). 59 private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName()); 60 61 // TODO(chrisn): promote AppendOnlyCollection for use here. 62 private final List<PerListenerQueue<L>> listeners = 63 Collections.synchronizedList(new ArrayList<PerListenerQueue<L>>()); 64 65 /** Method reference-compatible listener event. */ 66 interface Event<L> { 67 /** Call a method on the listener. */ 68 void call(L listener); 69 } 70 71 /** 72 * Adds a listener that will be called using the given executor when events are later {@link 73 * #enqueue enqueued} and {@link #dispatch dispatched}. 74 */ 75 public void addListener(L listener, Executor executor) { 76 checkNotNull(listener, "listener"); 77 checkNotNull(executor, "executor"); 78 listeners.add(new PerListenerQueue<>(listener, executor)); 79 } 80 81 /** 82 * Enqueues an event to be run on currently known listeners. 83 * 84 * <p>The {@code toString} method of the Event itself will be used to describe the event in the 85 * case of an error. 86 * 87 * @param event the callback to execute on {@link #dispatch} 88 */ 89 public void enqueue(Event<L> event) { 90 enqueueHelper(event, event); 91 } 92 93 /** 94 * Enqueues an event to be run on currently known listeners, with a label. 95 * 96 * @param event the callback to execute on {@link #dispatch} 97 * @param label a description of the event to use in the case of an error 98 */ 99 public void enqueue(Event<L> event, String label) { 100 enqueueHelper(event, label); 101 } 102 103 private void enqueueHelper(Event<L> event, Object label) { 104 checkNotNull(event, "event"); 105 checkNotNull(label, "label"); 106 synchronized (listeners) { 107 for (PerListenerQueue<L> queue : listeners) { 108 queue.add(event, label); 109 } 110 } 111 } 112 113 /** 114 * Dispatches all events enqueued prior to this call, serially and in order, for every listener. 115 * 116 * <p>Note: this method is idempotent and safe to call from any thread 117 */ 118 public void dispatch() { 119 // iterate by index to avoid concurrent modification exceptions 120 for (int i = 0; i < listeners.size(); i++) { 121 listeners.get(i).dispatch(); 122 } 123 } 124 125 /** 126 * A special purpose queue/executor that dispatches listener events serially on a configured 127 * executor. Each event can be added and dispatched as separate phases. 128 * 129 * <p>This class is very similar to {@link SequentialExecutor} with the exception that events can 130 * be added without necessarily executing immediately. 131 */ 132 private static final class PerListenerQueue<L> implements Runnable { 133 final L listener; 134 final Executor executor; 135 136 @GuardedBy("this") 137 final Queue<ListenerCallQueue.Event<L>> waitQueue = Queues.newArrayDeque(); 138 139 @GuardedBy("this") 140 final Queue<Object> labelQueue = Queues.newArrayDeque(); 141 142 @GuardedBy("this") 143 boolean isThreadScheduled; 144 145 PerListenerQueue(L listener, Executor executor) { 146 this.listener = checkNotNull(listener); 147 this.executor = checkNotNull(executor); 148 } 149 150 /** Enqueues a event to be run. */ 151 synchronized void add(ListenerCallQueue.Event<L> event, Object label) { 152 waitQueue.add(event); 153 labelQueue.add(label); 154 } 155 156 /** 157 * Dispatches all listeners {@linkplain #enqueue enqueued} prior to this call, serially and in 158 * order. 159 */ 160 void dispatch() { 161 boolean scheduleEventRunner = false; 162 synchronized (this) { 163 if (!isThreadScheduled) { 164 isThreadScheduled = true; 165 scheduleEventRunner = true; 166 } 167 } 168 if (scheduleEventRunner) { 169 try { 170 executor.execute(this); 171 } catch (RuntimeException e) { 172 // reset state in case of an error so that later dispatch calls will actually do something 173 synchronized (this) { 174 isThreadScheduled = false; 175 } 176 // Log it and keep going. 177 logger.log( 178 Level.SEVERE, 179 "Exception while running callbacks for " + listener + " on " + executor, 180 e); 181 throw e; 182 } 183 } 184 } 185 186 @Override 187 public void run() { 188 boolean stillRunning = true; 189 try { 190 while (true) { 191 ListenerCallQueue.Event<L> nextToRun; 192 Object nextLabel; 193 synchronized (PerListenerQueue.this) { 194 Preconditions.checkState(isThreadScheduled); 195 nextToRun = waitQueue.poll(); 196 nextLabel = labelQueue.poll(); 197 if (nextToRun == null) { 198 isThreadScheduled = false; 199 stillRunning = false; 200 break; 201 } 202 } 203 204 // Always run while _not_ holding the lock, to avoid deadlocks. 205 try { 206 nextToRun.call(listener); 207 } catch (RuntimeException e) { 208 // Log it and keep going. 209 logger.log( 210 Level.SEVERE, 211 "Exception while executing callback: " + listener + " " + nextLabel, 212 e); 213 } 214 } 215 } finally { 216 if (stillRunning) { 217 // An Error is bubbling up. We should mark ourselves as no longer running. That way, if 218 // anyone tries to keep using us, we won't be corrupted. 219 synchronized (PerListenerQueue.this) { 220 isThreadScheduled = false; 221 } 222 } 223 } 224 } 225 } 226 }