Coverage Summary for Class: SequentialExecutor (com.google.common.util.concurrent)
| Class | Method, % | Line, % |
|---|---|---|
| SequentialExecutor | 0% (0/5) | 0% (0/34) |
| SequentialExecutor$1 | 0% (0/3) | 0% (0/3) |
| SequentialExecutor$QueueWorker | 0% (0/4) | 0% (0/33) |
| SequentialExecutor$WorkerRunningState | 0% (0/1) | 0% (0/5) |
| Total | 0% (0/13) | 0% (0/75) |
1 /* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.IDLE; 19 import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUED; 20 import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUING; 21 import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.RUNNING; 22 import static java.lang.System.identityHashCode; 23 24 import com.google.common.annotations.GwtIncompatible; 25 import com.google.common.base.Preconditions; 26 import com.google.errorprone.annotations.concurrent.GuardedBy; 27 import com.google.j2objc.annotations.RetainedWith; 28 import java.util.ArrayDeque; 29 import java.util.Deque; 30 import java.util.concurrent.Executor; 31 import java.util.concurrent.RejectedExecutionException; 32 import java.util.logging.Level; 33 import java.util.logging.Logger; 34 import javax.annotation.CheckForNull; 35 36 /** 37 * Executor ensuring that all Runnables submitted are executed in order, using the provided 38 * Executor, and sequentially such that no two will ever be running at the same time. 39 * 40 * <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order. 41 * 42 * <p>The execution of tasks is done by one thread as long as there are tasks left in the queue. 43 * When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks 44 * continues. See {@link QueueWorker#workOnQueue} for details. 45 * 46 * <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking. 47 * If an {@code Error} is thrown, the error will propagate and execution will stop until it is 48 * restarted by a call to {@link #execute}. 49 */ 50 @GwtIncompatible 51 @ElementTypesAreNonnullByDefault 52 final class SequentialExecutor implements Executor { 53 private static final Logger log = Logger.getLogger(SequentialExecutor.class.getName()); 54 55 enum WorkerRunningState { 56 /** Runnable is not running and not queued for execution */ 57 IDLE, 58 /** Runnable is not running, but is being queued for execution */ 59 QUEUING, 60 /** runnable has been submitted but has not yet begun execution */ 61 QUEUED, 62 RUNNING, 63 } 64 65 /** Underlying executor that all submitted Runnable objects are run on. */ 66 private final Executor executor; 67 68 @GuardedBy("queue") 69 private final Deque<Runnable> queue = new ArrayDeque<>(); 70 71 /** see {@link WorkerRunningState} */ 72 @GuardedBy("queue") 73 private WorkerRunningState workerRunningState = IDLE; 74 75 /** 76 * This counter prevents an ABA issue where a thread may successfully schedule the worker, the 77 * worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the 78 * worker, and then the first thread's call to delegate.execute() returns. Without this counter, 79 * it would observe the QUEUING state and set it to QUEUED, and the worker would never be 80 * scheduled again for future submissions. 81 */ 82 @GuardedBy("queue") 83 private long workerRunCount = 0; 84 85 @RetainedWith private final QueueWorker worker = new QueueWorker(); 86 87 /** Use {@link MoreExecutors#newSequentialExecutor} */ 88 SequentialExecutor(Executor executor) { 89 this.executor = Preconditions.checkNotNull(executor); 90 } 91 92 /** 93 * Adds a task to the queue and makes sure a worker thread is running. 94 * 95 * <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor, 96 * execution of tasks will stop until a call to this method is made. 97 */ 98 @Override 99 public void execute(final Runnable task) { 100 checkNotNull(task); 101 final Runnable submittedTask; 102 final long oldRunCount; 103 synchronized (queue) { 104 // If the worker is already running (or execute() on the delegate returned successfully, and 105 // the worker has yet to start) then we don't need to start the worker. 106 if (workerRunningState == RUNNING || workerRunningState == QUEUED) { 107 queue.add(task); 108 return; 109 } 110 111 oldRunCount = workerRunCount; 112 113 // If the worker is not yet running, the delegate Executor might reject our attempt to start 114 // it. To preserve FIFO order and failure atomicity of rejected execution when the same 115 // Runnable is executed more than once, allocate a wrapper that we know is safe to remove by 116 // object identity. 117 // A data structure that returned a removal handle from add() would allow eliminating this 118 // allocation. 119 submittedTask = 120 new Runnable() { 121 @Override 122 public void run() { 123 task.run(); 124 } 125 126 @Override 127 public String toString() { 128 return task.toString(); 129 } 130 }; 131 queue.add(submittedTask); 132 workerRunningState = QUEUING; 133 } 134 135 try { 136 executor.execute(worker); 137 } catch (RuntimeException | Error t) { 138 synchronized (queue) { 139 boolean removed = 140 (workerRunningState == IDLE || workerRunningState == QUEUING) 141 && queue.removeLastOccurrence(submittedTask); 142 // If the delegate is directExecutor(), the submitted runnable could have thrown a REE. But 143 // that's handled by the log check that catches RuntimeExceptions in the queue worker. 144 if (!(t instanceof RejectedExecutionException) || removed) { 145 throw t; 146 } 147 } 148 return; 149 } 150 151 /* 152 * This is an unsynchronized read! After the read, the function returns immediately or acquires 153 * the lock to check again. Since an IDLE state was observed inside the preceding synchronized 154 * block, and reference field assignment is atomic, this may save reacquiring the lock when 155 * another thread or the worker task has cleared the count and set the state. 156 * 157 * <p>When {@link #executor} is a directExecutor(), the value written to 158 * {@code workerRunningState} will be available synchronously, and behaviour will be 159 * deterministic. 160 */ 161 @SuppressWarnings("GuardedBy") 162 boolean alreadyMarkedQueued = workerRunningState != QUEUING; 163 if (alreadyMarkedQueued) { 164 return; 165 } 166 synchronized (queue) { 167 if (workerRunCount == oldRunCount && workerRunningState == QUEUING) { 168 workerRunningState = QUEUED; 169 } 170 } 171 } 172 173 /** Worker that runs tasks from {@link #queue} until it is empty. */ 174 private final class QueueWorker implements Runnable { 175 @CheckForNull Runnable task; 176 177 @Override 178 public void run() { 179 try { 180 workOnQueue(); 181 } catch (Error e) { 182 synchronized (queue) { 183 workerRunningState = IDLE; 184 } 185 throw e; 186 // The execution of a task has ended abnormally. 187 // We could have tasks left in the queue, so should perhaps try to restart a worker, 188 // but then the Error will get delayed if we are using a direct (same thread) executor. 189 } 190 } 191 192 /** 193 * Continues executing tasks from {@link #queue} until it is empty. 194 * 195 * <p>The thread's interrupt bit is cleared before execution of each task. 196 * 197 * <p>If the Thread in use is interrupted before or during execution of the tasks in {@link 198 * #queue}, the Executor will complete its tasks, and then restore the interruption. This means 199 * that once the Thread returns to the Executor that this Executor composes, the interruption 200 * will still be present. If the composed Executor is an ExecutorService, it can respond to 201 * shutdown() by returning tasks queued on that Thread after {@link #worker} drains the queue. 202 */ 203 private void workOnQueue() { 204 boolean interruptedDuringTask = false; 205 boolean hasSetRunning = false; 206 try { 207 while (true) { 208 synchronized (queue) { 209 // Choose whether this thread will run or not after acquiring the lock on the first 210 // iteration 211 if (!hasSetRunning) { 212 if (workerRunningState == RUNNING) { 213 // Don't want to have two workers pulling from the queue. 214 return; 215 } else { 216 // Increment the run counter to avoid the ABA problem of a submitter marking the 217 // thread as QUEUED after it already ran and exhausted the queue before returning 218 // from execute(). 219 workerRunCount++; 220 workerRunningState = RUNNING; 221 hasSetRunning = true; 222 } 223 } 224 task = queue.poll(); 225 if (task == null) { 226 workerRunningState = IDLE; 227 return; 228 } 229 } 230 // Remove the interrupt bit before each task. The interrupt is for the "current task" when 231 // it is sent, so subsequent tasks in the queue should not be caused to be interrupted 232 // by a previous one in the queue being interrupted. 233 interruptedDuringTask |= Thread.interrupted(); 234 try { 235 task.run(); 236 } catch (RuntimeException e) { 237 log.log(Level.SEVERE, "Exception while executing runnable " + task, e); 238 } finally { 239 task = null; 240 } 241 } 242 } finally { 243 // Ensure that if the thread was interrupted at all while processing the task queue, it 244 // is returned to the delegate Executor interrupted so that it may handle the 245 // interruption if it likes. 246 if (interruptedDuringTask) { 247 Thread.currentThread().interrupt(); 248 } 249 } 250 } 251 252 @SuppressWarnings("GuardedBy") 253 @Override 254 public String toString() { 255 Runnable currentlyRunning = task; 256 if (currentlyRunning != null) { 257 return "SequentialExecutorWorker{running=" + currentlyRunning + "}"; 258 } 259 return "SequentialExecutorWorker{state=" + workerRunningState + "}"; 260 } 261 } 262 263 @Override 264 public String toString() { 265 return "SequentialExecutor@" + identityHashCode(this) + "{" + executor + "}"; 266 } 267 }