Coverage Summary for Class: SequentialExecutor (com.google.common.util.concurrent)

Class Method, % Line, %
SequentialExecutor 0% (0/5) 0% (0/34)
SequentialExecutor$1 0% (0/3) 0% (0/3)
SequentialExecutor$QueueWorker 0% (0/4) 0% (0/33)
SequentialExecutor$WorkerRunningState 0% (0/1) 0% (0/5)
Total 0% (0/13) 0% (0/75)


1 /* 2  * Copyright (C) 2008 The Guava Authors 3  * 4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5  * in compliance with the License. You may obtain a copy of the License at 6  * 7  * http://www.apache.org/licenses/LICENSE-2.0 8  * 9  * Unless required by applicable law or agreed to in writing, software distributed under the License 10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11  * or implied. See the License for the specific language governing permissions and limitations under 12  * the License. 13  */ 14  15 package com.google.common.util.concurrent; 16  17 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.IDLE; 19 import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUED; 20 import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUING; 21 import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.RUNNING; 22 import static java.lang.System.identityHashCode; 23  24 import com.google.common.annotations.GwtIncompatible; 25 import com.google.common.base.Preconditions; 26 import com.google.errorprone.annotations.concurrent.GuardedBy; 27 import com.google.j2objc.annotations.RetainedWith; 28 import java.util.ArrayDeque; 29 import java.util.Deque; 30 import java.util.concurrent.Executor; 31 import java.util.concurrent.RejectedExecutionException; 32 import java.util.logging.Level; 33 import java.util.logging.Logger; 34 import javax.annotation.CheckForNull; 35  36 /** 37  * Executor ensuring that all Runnables submitted are executed in order, using the provided 38  * Executor, and sequentially such that no two will ever be running at the same time. 39  * 40  * <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order. 41  * 42  * <p>The execution of tasks is done by one thread as long as there are tasks left in the queue. 43  * When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks 44  * continues. See {@link QueueWorker#workOnQueue} for details. 45  * 46  * <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking. 47  * If an {@code Error} is thrown, the error will propagate and execution will stop until it is 48  * restarted by a call to {@link #execute}. 49  */ 50 @GwtIncompatible 51 @ElementTypesAreNonnullByDefault 52 final class SequentialExecutor implements Executor { 53  private static final Logger log = Logger.getLogger(SequentialExecutor.class.getName()); 54  55  enum WorkerRunningState { 56  /** Runnable is not running and not queued for execution */ 57  IDLE, 58  /** Runnable is not running, but is being queued for execution */ 59  QUEUING, 60  /** runnable has been submitted but has not yet begun execution */ 61  QUEUED, 62  RUNNING, 63  } 64  65  /** Underlying executor that all submitted Runnable objects are run on. */ 66  private final Executor executor; 67  68  @GuardedBy("queue") 69  private final Deque<Runnable> queue = new ArrayDeque<>(); 70  71  /** see {@link WorkerRunningState} */ 72  @GuardedBy("queue") 73  private WorkerRunningState workerRunningState = IDLE; 74  75  /** 76  * This counter prevents an ABA issue where a thread may successfully schedule the worker, the 77  * worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the 78  * worker, and then the first thread's call to delegate.execute() returns. Without this counter, 79  * it would observe the QUEUING state and set it to QUEUED, and the worker would never be 80  * scheduled again for future submissions. 81  */ 82  @GuardedBy("queue") 83  private long workerRunCount = 0; 84  85  @RetainedWith private final QueueWorker worker = new QueueWorker(); 86  87  /** Use {@link MoreExecutors#newSequentialExecutor} */ 88  SequentialExecutor(Executor executor) { 89  this.executor = Preconditions.checkNotNull(executor); 90  } 91  92  /** 93  * Adds a task to the queue and makes sure a worker thread is running. 94  * 95  * <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor, 96  * execution of tasks will stop until a call to this method is made. 97  */ 98  @Override 99  public void execute(final Runnable task) { 100  checkNotNull(task); 101  final Runnable submittedTask; 102  final long oldRunCount; 103  synchronized (queue) { 104  // If the worker is already running (or execute() on the delegate returned successfully, and 105  // the worker has yet to start) then we don't need to start the worker. 106  if (workerRunningState == RUNNING || workerRunningState == QUEUED) { 107  queue.add(task); 108  return; 109  } 110  111  oldRunCount = workerRunCount; 112  113  // If the worker is not yet running, the delegate Executor might reject our attempt to start 114  // it. To preserve FIFO order and failure atomicity of rejected execution when the same 115  // Runnable is executed more than once, allocate a wrapper that we know is safe to remove by 116  // object identity. 117  // A data structure that returned a removal handle from add() would allow eliminating this 118  // allocation. 119  submittedTask = 120  new Runnable() { 121  @Override 122  public void run() { 123  task.run(); 124  } 125  126  @Override 127  public String toString() { 128  return task.toString(); 129  } 130  }; 131  queue.add(submittedTask); 132  workerRunningState = QUEUING; 133  } 134  135  try { 136  executor.execute(worker); 137  } catch (RuntimeException | Error t) { 138  synchronized (queue) { 139  boolean removed = 140  (workerRunningState == IDLE || workerRunningState == QUEUING) 141  && queue.removeLastOccurrence(submittedTask); 142  // If the delegate is directExecutor(), the submitted runnable could have thrown a REE. But 143  // that's handled by the log check that catches RuntimeExceptions in the queue worker. 144  if (!(t instanceof RejectedExecutionException) || removed) { 145  throw t; 146  } 147  } 148  return; 149  } 150  151  /* 152  * This is an unsynchronized read! After the read, the function returns immediately or acquires 153  * the lock to check again. Since an IDLE state was observed inside the preceding synchronized 154  * block, and reference field assignment is atomic, this may save reacquiring the lock when 155  * another thread or the worker task has cleared the count and set the state. 156  * 157  * <p>When {@link #executor} is a directExecutor(), the value written to 158  * {@code workerRunningState} will be available synchronously, and behaviour will be 159  * deterministic. 160  */ 161  @SuppressWarnings("GuardedBy") 162  boolean alreadyMarkedQueued = workerRunningState != QUEUING; 163  if (alreadyMarkedQueued) { 164  return; 165  } 166  synchronized (queue) { 167  if (workerRunCount == oldRunCount && workerRunningState == QUEUING) { 168  workerRunningState = QUEUED; 169  } 170  } 171  } 172  173  /** Worker that runs tasks from {@link #queue} until it is empty. */ 174  private final class QueueWorker implements Runnable { 175  @CheckForNull Runnable task; 176  177  @Override 178  public void run() { 179  try { 180  workOnQueue(); 181  } catch (Error e) { 182  synchronized (queue) { 183  workerRunningState = IDLE; 184  } 185  throw e; 186  // The execution of a task has ended abnormally. 187  // We could have tasks left in the queue, so should perhaps try to restart a worker, 188  // but then the Error will get delayed if we are using a direct (same thread) executor. 189  } 190  } 191  192  /** 193  * Continues executing tasks from {@link #queue} until it is empty. 194  * 195  * <p>The thread's interrupt bit is cleared before execution of each task. 196  * 197  * <p>If the Thread in use is interrupted before or during execution of the tasks in {@link 198  * #queue}, the Executor will complete its tasks, and then restore the interruption. This means 199  * that once the Thread returns to the Executor that this Executor composes, the interruption 200  * will still be present. If the composed Executor is an ExecutorService, it can respond to 201  * shutdown() by returning tasks queued on that Thread after {@link #worker} drains the queue. 202  */ 203  private void workOnQueue() { 204  boolean interruptedDuringTask = false; 205  boolean hasSetRunning = false; 206  try { 207  while (true) { 208  synchronized (queue) { 209  // Choose whether this thread will run or not after acquiring the lock on the first 210  // iteration 211  if (!hasSetRunning) { 212  if (workerRunningState == RUNNING) { 213  // Don't want to have two workers pulling from the queue. 214  return; 215  } else { 216  // Increment the run counter to avoid the ABA problem of a submitter marking the 217  // thread as QUEUED after it already ran and exhausted the queue before returning 218  // from execute(). 219  workerRunCount++; 220  workerRunningState = RUNNING; 221  hasSetRunning = true; 222  } 223  } 224  task = queue.poll(); 225  if (task == null) { 226  workerRunningState = IDLE; 227  return; 228  } 229  } 230  // Remove the interrupt bit before each task. The interrupt is for the "current task" when 231  // it is sent, so subsequent tasks in the queue should not be caused to be interrupted 232  // by a previous one in the queue being interrupted. 233  interruptedDuringTask |= Thread.interrupted(); 234  try { 235  task.run(); 236  } catch (RuntimeException e) { 237  log.log(Level.SEVERE, "Exception while executing runnable " + task, e); 238  } finally { 239  task = null; 240  } 241  } 242  } finally { 243  // Ensure that if the thread was interrupted at all while processing the task queue, it 244  // is returned to the delegate Executor interrupted so that it may handle the 245  // interruption if it likes. 246  if (interruptedDuringTask) { 247  Thread.currentThread().interrupt(); 248  } 249  } 250  } 251  252  @SuppressWarnings("GuardedBy") 253  @Override 254  public String toString() { 255  Runnable currentlyRunning = task; 256  if (currentlyRunning != null) { 257  return "SequentialExecutorWorker{running=" + currentlyRunning + "}"; 258  } 259  return "SequentialExecutorWorker{state=" + workerRunningState + "}"; 260  } 261  } 262  263  @Override 264  public String toString() { 265  return "SequentialExecutor@" + identityHashCode(this) + "{" + executor + "}"; 266  } 267 }