Coverage Summary for Class: ExecutionSequencer (com.google.common.util.concurrent)

Class Method, % Line, %
ExecutionSequencer 0% (0/5) 0% (0/21)
ExecutionSequencer$1 0% (0/3) 0% (0/3)
ExecutionSequencer$2 0% (0/3) 0% (0/5)
ExecutionSequencer$3 0% (0/2) 0% (0/5)
ExecutionSequencer$RunningState 0% (0/1) 0% (0/4)
ExecutionSequencer$TaskNonReentrantExecutor 0% (0/6) 0% (0/41)
ExecutionSequencer$ThreadConfinedTaskQueue 0% (0/1) 0% (0/1)
Total 0% (0/21) 0% (0/80)


1 /* 2  * Copyright (C) 2018 The Guava Authors 3  * 4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5  * in compliance with the License. You may obtain a copy of the License at 6  * 7  * http://www.apache.org/licenses/LICENSE-2.0 8  * 9  * Unless required by applicable law or agreed to in writing, software distributed under the License 10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11  * or implied. See the License for the specific language governing permissions and limitations under 12  * the License. 13  */ 14  15 package com.google.common.util.concurrent; 16  17 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkState; 19 import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED; 20 import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN; 21 import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED; 22 import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 23 import static com.google.common.util.concurrent.Futures.immediateFuture; 24 import static com.google.common.util.concurrent.Futures.immediateVoidFuture; 25 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 26 import static java.util.Objects.requireNonNull; 27  28 import com.google.common.annotations.Beta; 29 import java.util.concurrent.Callable; 30 import java.util.concurrent.Executor; 31 import java.util.concurrent.atomic.AtomicReference; 32 import javax.annotation.CheckForNull; 33 import org.checkerframework.checker.nullness.qual.Nullable; 34  35 /** 36  * Serializes execution of tasks, somewhat like an "asynchronous {@code synchronized} block." Each 37  * {@linkplain #submit enqueued} callable will not be submitted to its associated executor until the 38  * previous callable has returned -- and, if the previous callable was an {@link AsyncCallable}, not 39  * until the {@code Future} it returned is {@linkplain Future#isDone done} (successful, failed, or 40  * cancelled). 41  * 42  * <p>This class has limited support for cancellation and other "early completion": 43  * 44  * <ul> 45  * <li>While calls to {@code submit} and {@code submitAsync} return a {@code Future} that can be 46  * cancelled, cancellation never propagates to a task that has started to run -- neither to 47  * the callable itself nor to any {@code Future} returned by an {@code AsyncCallable}. 48  * (However, cancellation can prevent an <i>unstarted</i> task from running.) Therefore, the 49  * next task will wait for any running callable (or pending {@code Future} returned by an 50  * {@code AsyncCallable}) to complete, without interrupting it (and without calling {@code 51  * cancel} on the {@code Future}). So beware: <i>Even if you cancel every precededing {@code 52  * Future} returned by this class, the next task may still have to wait.</i>. 53  * <li>Once an {@code AsyncCallable} returns a {@code Future}, this class considers that task to 54  * be "done" as soon as <i>that</i> {@code Future} completes in any way. Notably, a {@code 55  * Future} is "completed" even if it is cancelled while its underlying work continues on a 56  * thread, an RPC, etc. The {@code Future} is also "completed" if it fails "early" -- for 57  * example, if the deadline expires on a {@code Future} returned from {@link 58  * Futures#withTimeout} while the {@code Future} it wraps continues its underlying work. So 59  * beware: <i>Your {@code AsyncCallable} should not complete its {@code Future} until it is 60  * safe for the next task to start.</i> 61  * </ul> 62  * 63  * <p>An additional limitation: this class serializes execution of <i>tasks</i> but not any 64  * <i>listeners</i> of those tasks. 65  * 66  * <p>This class is similar to {@link MoreExecutors#newSequentialExecutor}. This class is different 67  * in a few ways: 68  * 69  * <ul> 70  * <li>Each task may be associated with a different executor. 71  * <li>Tasks may be of type {@code AsyncCallable}. 72  * <li>Running tasks <i>cannot</i> be interrupted. (Note that {@code newSequentialExecutor} does 73  * not return {@code Future} objects, so it doesn't support interruption directly, either. 74  * However, utilities that <i>use</i> that executor have the ability to interrupt tasks 75  * running on it. This class, by contrast, does not expose an {@code Executor} API.) 76  * </ul> 77  * 78  * <p>If you don't need the features of this class, you may prefer {@code newSequentialExecutor} for 79  * its simplicity and ability to accommodate interruption. 80  * 81  * @since 26.0 82  */ 83 @Beta 84 @ElementTypesAreNonnullByDefault 85 public final class ExecutionSequencer { 86  87  private ExecutionSequencer() {} 88  89  /** Creates a new instance. */ 90  public static ExecutionSequencer create() { 91  return new ExecutionSequencer(); 92  } 93  94  /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ 95  private final AtomicReference<ListenableFuture<@Nullable Void>> ref = 96  new AtomicReference<>(immediateVoidFuture()); 97  98  private ThreadConfinedTaskQueue latestTaskQueue = new ThreadConfinedTaskQueue(); 99  100  /** 101  * This object is unsafely published, but avoids problematic races by relying exclusively on the 102  * identity equality of its Thread field so that the task field is only accessed by a single 103  * thread. 104  */ 105  private static final class ThreadConfinedTaskQueue { 106  /** 107  * This field is only used for identity comparisons with the current thread. Field assignments 108  * are atomic, but do not provide happens-before ordering; however: 109  * 110  * <ul> 111  * <li>If this field's value == currentThread, we know that it's up to date, because write 112  * operations in a thread always happen-before subsequent read operations in the same 113  * thread 114  * <li>If this field's value == null because of unsafe publication, we know that it isn't the 115  * object associated with our thread, because if it was the publication wouldn't have been 116  * unsafe and we'd have seen our thread as the value. This state is also why a new 117  * ThreadConfinedTaskQueue object must be created for each inline execution, because 118  * observing a null thread does not mean the object is safe to reuse. 119  * <li>If this field's value is some other thread object, we know that it's not our thread. 120  * <li>If this field's value == null because it originally belonged to another thread and that 121  * thread cleared it, we still know that it's not associated with our thread 122  * <li>If this field's value == null because it was associated with our thread and was 123  * cleared, we know that we're not executing inline any more 124  * </ul> 125  * 126  * All the states where thread != currentThread are identical for our purposes, and so even 127  * though it's racy, we don't care which of those values we get, so no need to synchronize. 128  */ 129  @CheckForNull Thread thread; 130  /** Only used by the thread associated with this object */ 131  @CheckForNull Runnable nextTask; 132  /** Only used by the thread associated with this object */ 133  @CheckForNull Executor nextExecutor; 134  } 135  136  /** 137  * Enqueues a task to run when the previous task (if any) completes. 138  * 139  * <p>Cancellation does not propagate from the output future to a callable that has begun to 140  * execute, but if the output future is cancelled before {@link Callable#call()} is invoked, 141  * {@link Callable#call()} will not be invoked. 142  */ 143  public <T extends @Nullable Object> ListenableFuture<T> submit( 144  final Callable<T> callable, Executor executor) { 145  checkNotNull(callable); 146  checkNotNull(executor); 147  return submitAsync( 148  new AsyncCallable<T>() { 149  @Override 150  public ListenableFuture<T> call() throws Exception { 151  return immediateFuture(callable.call()); 152  } 153  154  @Override 155  public String toString() { 156  return callable.toString(); 157  } 158  }, 159  executor); 160  } 161  162  /** 163  * Enqueues a task to run when the previous task (if any) completes. 164  * 165  * <p>Cancellation does not propagate from the output future to the future returned from {@code 166  * callable} or a callable that has begun to execute, but if the output future is cancelled before 167  * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked. 168  */ 169  public <T extends @Nullable Object> ListenableFuture<T> submitAsync( 170  final AsyncCallable<T> callable, final Executor executor) { 171  checkNotNull(callable); 172  checkNotNull(executor); 173  final TaskNonReentrantExecutor taskExecutor = new TaskNonReentrantExecutor(executor, this); 174  final AsyncCallable<T> task = 175  new AsyncCallable<T>() { 176  @Override 177  public ListenableFuture<T> call() throws Exception { 178  if (!taskExecutor.trySetStarted()) { 179  return immediateCancelledFuture(); 180  } 181  return callable.call(); 182  } 183  184  @Override 185  public String toString() { 186  return callable.toString(); 187  } 188  }; 189  /* 190  * Four futures are at play here: 191  * taskFuture is the future tracking the result of the callable. 192  * newFuture is a future that completes after this and all prior tasks are done. 193  * oldFuture is the previous task's newFuture. 194  * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture. 195  * 196  * newFuture is guaranteed to only complete once all tasks previously submitted to this instance 197  * have completed - namely after oldFuture is done, and taskFuture has either completed or been 198  * cancelled before the callable started execution. 199  */ 200  final SettableFuture<@Nullable Void> newFuture = SettableFuture.create(); 201  202  final ListenableFuture<@Nullable Void> oldFuture = ref.getAndSet(newFuture); 203  204  // Invoke our task once the previous future completes. 205  final TrustedListenableFutureTask<T> taskFuture = TrustedListenableFutureTask.create(task); 206  oldFuture.addListener(taskFuture, taskExecutor); 207  208  final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture); 209  210  // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture 211  // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that 212  // if the future we return is cancelled, we don't begin execution of the next task until after 213  // oldFuture completes. 214  Runnable listener = 215  new Runnable() { 216  @Override 217  public void run() { 218  if (taskFuture.isDone()) { 219  // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of 220  // a future that eventually came from immediateFuture(null), this doesn't leak 221  // throwables or completion values. 222  newFuture.setFuture(oldFuture); 223  } else if (outputFuture.isCancelled() && taskExecutor.trySetCancelled()) { 224  // If this CAS succeeds, we know that the provided callable will never be invoked, 225  // so when oldFuture completes it is safe to allow the next submitted task to 226  // proceed. Doing this immediately here lets the next task run without waiting for 227  // the cancelled task's executor to run the noop AsyncCallable. 228  // 229  // --- 230  // 231  // If the CAS fails, the provided callable already started running (or it is about 232  // to). Our contract promises: 233  // 234  // 1. not to execute a new callable until the old one has returned 235  // 236  // If we were to cancel taskFuture, that would let the next task start while the old 237  // one is still running. 238  // 239  // Now, maybe we could tweak our implementation to not start the next task until the 240  // callable actually completes. (We could detect completion in our wrapper 241  // `AsyncCallable task`.) However, our contract also promises: 242  // 243  // 2. not to cancel any Future the user returned from an AsyncCallable 244  // 245  // We promise this because, once we cancel that Future, we would no longer be able to 246  // tell when any underlying work it is doing is done. Thus, we might start a new task 247  // while that underlying work is still running. 248  // 249  // So that is why we cancel only in the case of CAS success. 250  taskFuture.cancel(false); 251  } 252  } 253  }; 254  // Adding the listener to both futures guarantees that newFuture will aways be set. Adding to 255  // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture 256  // propagates cancellation if the callable has not yet been invoked. 257  outputFuture.addListener(listener, directExecutor()); 258  taskFuture.addListener(listener, directExecutor()); 259  260  return outputFuture; 261  } 262  263  enum RunningState { 264  NOT_RUN, 265  CANCELLED, 266  STARTED, 267  } 268  269  /** 270  * This class helps avoid a StackOverflowError when large numbers of tasks are submitted with 271  * {@link MoreExecutors#directExecutor}. Normally, when the first future completes, all the other 272  * tasks would be called recursively. Here, we detect that the delegate executor is executing 273  * inline, and maintain a queue to dispatch tasks iteratively. There is one instance of this class 274  * per call to submit() or submitAsync(), and each instance supports only one call to execute(). 275  * 276  * <p>This class would certainly be simpler and easier to reason about if it were built with 277  * ThreadLocal; however, ThreadLocal is not well optimized for the case where the ThreadLocal is 278  * non-static, and is initialized/removed frequently - this causes churn in the Thread specific 279  * hashmaps. Using a static ThreadLocal to avoid that overhead would mean that different 280  * ExecutionSequencer objects interfere with each other, which would be undesirable, in addition 281  * to increasing the memory footprint of every thread that interacted with it. In order to release 282  * entries in thread-specific maps when the ThreadLocal object itself is no longer referenced, 283  * ThreadLocal is usually implemented with a WeakReference, which can have negative performance 284  * properties; for example, calling WeakReference.get() on Android will block during an 285  * otherwise-concurrent GC cycle. 286  */ 287  private static final class TaskNonReentrantExecutor extends AtomicReference<RunningState> 288  implements Executor, Runnable { 289  290  /** 291  * Used to update and read the latestTaskQueue field. Set to null once the runnable has been run 292  * or queued. 293  */ 294  @CheckForNull ExecutionSequencer sequencer; 295  296  /** 297  * Executor the task was set to run on. Set to null when the task has been queued, run, or 298  * cancelled. 299  */ 300  @CheckForNull Executor delegate; 301  302  /** 303  * Set before calling delegate.execute(); set to null once run, so that it can be GCed; this 304  * object may live on after, if submitAsync returns an incomplete future. 305  */ 306  @CheckForNull Runnable task; 307  308  /** Thread that called execute(). Set in execute, cleared when delegate.execute() returns. */ 309  @CheckForNull Thread submitting; 310  311  private TaskNonReentrantExecutor(Executor delegate, ExecutionSequencer sequencer) { 312  super(NOT_RUN); 313  this.delegate = delegate; 314  this.sequencer = sequencer; 315  } 316  317  @Override 318  public void execute(Runnable task) { 319  // If this operation was successfully cancelled already, calling the runnable will be a noop. 320  // This also avoids a race where if outputFuture is cancelled, it will call taskFuture.cancel, 321  // which will call newFuture.setFuture(oldFuture), to allow the next task in the queue to run 322  // without waiting for the user's executor to run our submitted Runnable. However, this can 323  // interact poorly with the reentrancy-avoiding behavior of this executor - when the operation 324  // before the cancelled future completes, it will synchronously complete both the newFuture 325  // from the cancelled operation and its own. This can cause one runnable to queue two tasks, 326  // breaking the invariant this method relies on to iteratively run the next task after the 327  // previous one completes. 328  if (get() == RunningState.CANCELLED) { 329  delegate = null; 330  sequencer = null; 331  return; 332  } 333  submitting = Thread.currentThread(); 334  335  try { 336  /* 337  * requireNonNull is safe because we don't null out `sequencer` except: 338  * 339  * - above, where we return (in which case we never get here) 340  * 341  * - in `run`, which can't run until this Runnable is submitted to an executor, which 342  * doesn't happen until below. (And this Executor -- yes, the object is both a Runnable 343  * and an Executor -- is used for only a single `execute` call.) 344  */ 345  ThreadConfinedTaskQueue submittingTaskQueue = requireNonNull(sequencer).latestTaskQueue; 346  if (submittingTaskQueue.thread == submitting) { 347  sequencer = null; 348  // Submit from inside a reentrant submit. We don't know if this one will be reentrant (and 349  // can't know without submitting something to the executor) so queue to run iteratively. 350  // Task must be null, since each execution on this executor can only produce one more 351  // execution. 352  checkState(submittingTaskQueue.nextTask == null); 353  submittingTaskQueue.nextTask = task; 354  // requireNonNull(delegate) is safe for reasons similar to requireNonNull(sequencer). 355  submittingTaskQueue.nextExecutor = requireNonNull(delegate); 356  delegate = null; 357  } else { 358  // requireNonNull(delegate) is safe for reasons similar to requireNonNull(sequencer). 359  Executor localDelegate = requireNonNull(delegate); 360  delegate = null; 361  this.task = task; 362  localDelegate.execute(this); 363  } 364  } finally { 365  // Important to null this out here - if we did *not* execute inline, we might still 366  // run() on the same thread that called execute() - such as in a thread pool, and think 367  // that it was happening inline. As a side benefit, avoids holding on to the Thread object 368  // longer than necessary. 369  submitting = null; 370  } 371  } 372  373  @SuppressWarnings("ShortCircuitBoolean") 374  @Override 375  public void run() { 376  Thread currentThread = Thread.currentThread(); 377  if (currentThread != submitting) { 378  /* 379  * requireNonNull is safe because we set `task` before submitting this Runnable to an 380  * Executor, and we don't null it out until here. 381  */ 382  Runnable localTask = requireNonNull(task); 383  task = null; 384  localTask.run(); 385  return; 386  } 387  // Executor called reentrantly! Make sure that further calls don't overflow stack. Further 388  // reentrant calls will see that their current thread is the same as the one set in 389  // latestTaskQueue, and queue rather than calling execute() directly. 390  ThreadConfinedTaskQueue executingTaskQueue = new ThreadConfinedTaskQueue(); 391  executingTaskQueue.thread = currentThread; 392  /* 393  * requireNonNull is safe because we don't null out `sequencer` except: 394  * 395  * - after the requireNonNull call below. (And this object has its Runnable.run override 396  * called only once, just as it has its Executor.execute override called only once.) 397  * 398  * - if we return immediately from `execute` (in which case we never get here) 399  * 400  * - in the "reentrant submit" case of `execute` (in which case we must have started running a 401  * user task -- which means that we already got past this code (or else we exited early 402  * above)) 403  */ 404  // Unconditionally set; there is no risk of throwing away a queued task from another thread, 405  // because in order for the current task to run on this executor the previous task must have 406  // already started execution. Because each task on a TaskNonReentrantExecutor can only produce 407  // one execute() call to another instance from the same ExecutionSequencer, we know by 408  // induction that the task that launched this one must not have added any other runnables to 409  // that thread's queue, and thus we cannot be replacing a TaskAndThread object that would 410  // otherwise have another task queued on to it. Note the exception to this, cancellation, is 411  // specially handled in execute() - execute() calls triggered by cancellation are no-ops, and 412  // thus don't count. 413  requireNonNull(sequencer).latestTaskQueue = executingTaskQueue; 414  sequencer = null; 415  try { 416  // requireNonNull is safe, as discussed above. 417  Runnable localTask = requireNonNull(task); 418  task = null; 419  localTask.run(); 420  // Now check if our task attempted to reentrantly execute the next task. 421  Runnable queuedTask; 422  Executor queuedExecutor; 423  // Intentionally using non-short-circuit operator 424  while ((queuedTask = executingTaskQueue.nextTask) != null 425  & (queuedExecutor = executingTaskQueue.nextExecutor) != null) { 426  executingTaskQueue.nextTask = null; 427  executingTaskQueue.nextExecutor = null; 428  queuedExecutor.execute(queuedTask); 429  } 430  } finally { 431  // Null out the thread field, so that we don't leak a reference to Thread, and so that 432  // future `thread == currentThread()` calls from this thread don't incorrectly queue instead 433  // of executing. Don't null out the latestTaskQueue field, because the work done here 434  // may have scheduled more operations on another thread, and if those operations then 435  // trigger reentrant calls that thread will have updated the latestTaskQueue field, and 436  // we'd be interfering with their operation. 437  executingTaskQueue.thread = null; 438  } 439  } 440  441  private boolean trySetStarted() { 442  return compareAndSet(NOT_RUN, STARTED); 443  } 444  445  private boolean trySetCancelled() { 446  return compareAndSet(NOT_RUN, CANCELLED); 447  } 448  } 449 }