Coverage Summary for Class: InterruptibleTask (com.google.common.util.concurrent)

Class Method, % Line, %
InterruptibleTask 83.3% (5/6) 47.5% (28/59)
InterruptibleTask$Blocker 0% (0/4) 0% (0/5)
InterruptibleTask$DoNothingRunnable 50% (1/2) 50% (1/2)
Total 50% (6/12) 43.9% (29/66)


1 /* 2  * Copyright (C) 2015 The Guava Authors 3  * 4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5  * in compliance with the License. You may obtain a copy of the License at 6  * 7  * http://www.apache.org/licenses/LICENSE-2.0 8  * 9  * Unless required by applicable law or agreed to in writing, software distributed under the License 10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11  * or implied. See the License for the specific language governing permissions and limitations under 12  * the License. 13  */ 14  15 package com.google.common.util.concurrent; 16  17 import static com.google.common.util.concurrent.NullnessCasts.uncheckedCastNullableTToT; 18  19 import com.google.common.annotations.GwtCompatible; 20 import com.google.common.annotations.VisibleForTesting; 21 import com.google.j2objc.annotations.ReflectionSupport; 22 import java.util.concurrent.atomic.AtomicReference; 23 import java.util.concurrent.locks.AbstractOwnableSynchronizer; 24 import java.util.concurrent.locks.LockSupport; 25 import org.checkerframework.checker.nullness.qual.Nullable; 26  27 @GwtCompatible(emulated = true) 28 @ReflectionSupport(value = ReflectionSupport.Level.FULL) 29 @ElementTypesAreNonnullByDefault 30 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 31 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 32 // Since this class only needs CAS on one field, we can avoid this bug by extending AtomicReference 33 // instead of using an AtomicReferenceFieldUpdater. This reference stores Thread instances 34 // and DONE/INTERRUPTED - they have a common ancestor of Runnable. 35 abstract class InterruptibleTask<T extends @Nullable Object> 36  extends AtomicReference<@Nullable Runnable> implements Runnable { 37  static { 38  // Prevent rare disastrous classloading in first call to LockSupport.park. 39  // See: https://bugs.openjdk.java.net/browse/JDK-8074773 40  @SuppressWarnings("unused") 41  Class<?> ensureLoaded = LockSupport.class; 42  } 43  44  private static final class DoNothingRunnable implements Runnable { 45  @Override 46  public void run() {} 47  } 48  // The thread executing the task publishes itself to the superclass' reference and the thread 49  // interrupting sets DONE when it has finished interrupting. 50  private static final Runnable DONE = new DoNothingRunnable(); 51  private static final Runnable PARKED = new DoNothingRunnable(); 52  // Why 1000? WHY NOT! 53  private static final int MAX_BUSY_WAIT_SPINS = 1000; 54  55  @SuppressWarnings("ThreadPriorityCheck") // The cow told me to 56  @Override 57  public final void run() { 58  /* 59  * Set runner thread before checking isDone(). If we were to check isDone() first, the task 60  * might be cancelled before we set the runner thread. That would make it impossible to 61  * interrupt, yet it will still run, since interruptTask will leave the runner value null, 62  * allowing the CAS below to succeed. 63  */ 64  Thread currentThread = Thread.currentThread(); 65  if (!compareAndSet(null, currentThread)) { 66  return; // someone else has run or is running. 67  } 68  69  boolean run = !isDone(); 70  T result = null; 71  Throwable error = null; 72  try { 73  if (run) { 74  result = runInterruptibly(); 75  } 76  } catch (Throwable t) { 77  error = t; 78  } finally { 79  // Attempt to set the task as done so that further attempts to interrupt will fail. 80  if (!compareAndSet(currentThread, DONE)) { 81  waitForInterrupt(currentThread); 82  } 83  if (run) { 84  if (error == null) { 85  // The cast is safe because of the `run` and `error` checks. 86  afterRanInterruptiblySuccess(uncheckedCastNullableTToT(result)); 87  } else { 88  afterRanInterruptiblyFailure(error); 89  } 90  } 91  } 92  } 93  94  private void waitForInterrupt(Thread currentThread) { 95  /* 96  * If someone called cancel(true), it is possible that the interrupted bit hasn't been set yet. 97  * Wait for the interrupting thread to set DONE. (See interruptTask().) We want to wait so that 98  * the interrupting thread doesn't interrupt the _next_ thing to run on this thread. 99  * 100  * Note: We don't reset the interrupted bit, just wait for it to be set. If this is a thread 101  * pool thread, the thread pool will reset it for us. Otherwise, the interrupted bit may have 102  * been intended for something else, so don't clear it. 103  */ 104  boolean restoreInterruptedBit = false; 105  int spinCount = 0; 106  // Interrupting Cow Says: 107  // ______ 108  // < Spin > 109  // ------ 110  // \ ^__^ 111  // \ (oo)\_______ 112  // (__)\ )\/\ 113  // ||----w | 114  // || || 115  Runnable state = get(); 116  Blocker blocker = null; 117  while (state instanceof Blocker || state == PARKED) { 118  if (state instanceof Blocker) { 119  blocker = (Blocker) state; 120  } 121  spinCount++; 122  if (spinCount > MAX_BUSY_WAIT_SPINS) { 123  /* 124  * If we have spun a lot, just park ourselves. This will save CPU while we wait for a slow 125  * interrupting thread. In theory, interruptTask() should be very fast, but due to 126  * InterruptibleChannel and JavaLangAccess.blockedOn(Thread, Interruptible), it isn't 127  * predictable what work might be done. (e.g., close a file and flush buffers to disk). To 128  * protect ourselves from this, we park ourselves and tell our interrupter that we did so. 129  */ 130  if (state == PARKED || compareAndSet(state, PARKED)) { 131  // Interrupting Cow Says: 132  // ______ 133  // < Park > 134  // ------ 135  // \ ^__^ 136  // \ (oo)\_______ 137  // (__)\ )\/\ 138  // ||----w | 139  // || || 140  // We need to clear the interrupted bit prior to calling park and maintain it in case we 141  // wake up spuriously. 142  restoreInterruptedBit = Thread.interrupted() || restoreInterruptedBit; 143  LockSupport.park(blocker); 144  } 145  } else { 146  Thread.yield(); 147  } 148  state = get(); 149  } 150  if (restoreInterruptedBit) { 151  currentThread.interrupt(); 152  } 153  /* 154  * TODO(cpovirk): Clear interrupt status here? We currently don't, which means that an interrupt 155  * before, during, or after runInterruptibly() (unless it produced an InterruptedException 156  * caught above) can linger and affect listeners. 157  */ 158  } 159  160  /** 161  * Called before runInterruptibly - if true, runInterruptibly and afterRanInterruptibly will not 162  * be called. 163  */ 164  abstract boolean isDone(); 165  166  /** 167  * Do interruptible work here - do not complete Futures here, as their listeners could be 168  * interrupted. 169  */ 170  @ParametricNullness 171  abstract T runInterruptibly() throws Exception; 172  173  /** 174  * Any interruption that happens as a result of calling interruptTask will arrive before this 175  * method is called. Complete Futures here. 176  */ 177  abstract void afterRanInterruptiblySuccess(@ParametricNullness T result); 178  179  /** 180  * Any interruption that happens as a result of calling interruptTask will arrive before this 181  * method is called. Complete Futures here. 182  */ 183  abstract void afterRanInterruptiblyFailure(Throwable error); 184  185  /** 186  * Interrupts the running task. Because this internally calls {@link Thread#interrupt()} which can 187  * in turn invoke arbitrary code it is not safe to call while holding a lock. 188  */ 189  final void interruptTask() { 190  // Since the Thread is replaced by DONE before run() invokes listeners or returns, if we succeed 191  // in this CAS, there's no risk of interrupting the wrong thread or interrupting a thread that 192  // isn't currently executing this task. 193  Runnable currentRunner = get(); 194  if (currentRunner instanceof Thread) { 195  Blocker blocker = new Blocker(this); 196  blocker.setOwner(Thread.currentThread()); 197  if (compareAndSet(currentRunner, blocker)) { 198  // Thread.interrupt can throw arbitrary exceptions due to the nio InterruptibleChannel API 199  // This will make sure that tasks don't get stuck busy waiting. 200  // Some of this is fixed in jdk11 (see https://bugs.openjdk.java.net/browse/JDK-8198692) but 201  // not all. See the test cases for examples on how this can happen. 202  try { 203  ((Thread) currentRunner).interrupt(); 204  } finally { 205  Runnable prev = getAndSet(DONE); 206  if (prev == PARKED) { 207  LockSupport.unpark((Thread) currentRunner); 208  } 209  } 210  } 211  } 212  } 213  214  /** 215  * Using this as the blocker object allows introspection and debugging tools to see that the 216  * currentRunner thread is blocked on the progress of the interruptor thread, which can help 217  * identify deadlocks. 218  */ 219  @VisibleForTesting 220  static final class Blocker extends AbstractOwnableSynchronizer implements Runnable { 221  private final InterruptibleTask<?> task; 222  223  private Blocker(InterruptibleTask<?> task) { 224  this.task = task; 225  } 226  227  @Override 228  public void run() {} 229  230  private void setOwner(Thread thread) { 231  super.setExclusiveOwnerThread(thread); 232  } 233  234  @Override 235  public String toString() { 236  return task.toString(); 237  } 238  } 239  240  @Override 241  public final String toString() { 242  Runnable state = get(); 243  final String result; 244  if (state == DONE) { 245  result = "running=[DONE]"; 246  } else if (state instanceof Blocker) { 247  result = "running=[INTERRUPTED]"; 248  } else if (state instanceof Thread) { 249  // getName is final on Thread, no need to worry about exceptions 250  result = "running=[RUNNING ON " + ((Thread) state).getName() + "]"; 251  } else { 252  result = "running=[NOT STARTED YET]"; 253  } 254  return result + ", " + toPendingString(); 255  } 256  257  abstract String toPendingString(); 258 }