Coverage Summary for Class: InterruptibleTask (com.google.common.util.concurrent)
| Class | Method, % | Line, % |
|---|---|---|
| InterruptibleTask | 83.3% (5/6) | 47.5% (28/59) |
| InterruptibleTask$Blocker | 0% (0/4) | 0% (0/5) |
| InterruptibleTask$DoNothingRunnable | 50% (1/2) | 50% (1/2) |
| Total | 50% (6/12) | 43.9% (29/66) |
1 /* 2 * Copyright (C) 2015 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.util.concurrent.NullnessCasts.uncheckedCastNullableTToT; 18 19 import com.google.common.annotations.GwtCompatible; 20 import com.google.common.annotations.VisibleForTesting; 21 import com.google.j2objc.annotations.ReflectionSupport; 22 import java.util.concurrent.atomic.AtomicReference; 23 import java.util.concurrent.locks.AbstractOwnableSynchronizer; 24 import java.util.concurrent.locks.LockSupport; 25 import org.checkerframework.checker.nullness.qual.Nullable; 26 27 @GwtCompatible(emulated = true) 28 @ReflectionSupport(value = ReflectionSupport.Level.FULL) 29 @ElementTypesAreNonnullByDefault 30 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 31 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 32 // Since this class only needs CAS on one field, we can avoid this bug by extending AtomicReference 33 // instead of using an AtomicReferenceFieldUpdater. This reference stores Thread instances 34 // and DONE/INTERRUPTED - they have a common ancestor of Runnable. 35 abstract class InterruptibleTask<T extends @Nullable Object> 36 extends AtomicReference<@Nullable Runnable> implements Runnable { 37 static { 38 // Prevent rare disastrous classloading in first call to LockSupport.park. 39 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 40 @SuppressWarnings("unused") 41 Class<?> ensureLoaded = LockSupport.class; 42 } 43 44 private static final class DoNothingRunnable implements Runnable { 45 @Override 46 public void run() {} 47 } 48 // The thread executing the task publishes itself to the superclass' reference and the thread 49 // interrupting sets DONE when it has finished interrupting. 50 private static final Runnable DONE = new DoNothingRunnable(); 51 private static final Runnable PARKED = new DoNothingRunnable(); 52 // Why 1000? WHY NOT! 53 private static final int MAX_BUSY_WAIT_SPINS = 1000; 54 55 @SuppressWarnings("ThreadPriorityCheck") // The cow told me to 56 @Override 57 public final void run() { 58 /* 59 * Set runner thread before checking isDone(). If we were to check isDone() first, the task 60 * might be cancelled before we set the runner thread. That would make it impossible to 61 * interrupt, yet it will still run, since interruptTask will leave the runner value null, 62 * allowing the CAS below to succeed. 63 */ 64 Thread currentThread = Thread.currentThread(); 65 if (!compareAndSet(null, currentThread)) { 66 return; // someone else has run or is running. 67 } 68 69 boolean run = !isDone(); 70 T result = null; 71 Throwable error = null; 72 try { 73 if (run) { 74 result = runInterruptibly(); 75 } 76 } catch (Throwable t) { 77 error = t; 78 } finally { 79 // Attempt to set the task as done so that further attempts to interrupt will fail. 80 if (!compareAndSet(currentThread, DONE)) { 81 waitForInterrupt(currentThread); 82 } 83 if (run) { 84 if (error == null) { 85 // The cast is safe because of the `run` and `error` checks. 86 afterRanInterruptiblySuccess(uncheckedCastNullableTToT(result)); 87 } else { 88 afterRanInterruptiblyFailure(error); 89 } 90 } 91 } 92 } 93 94 private void waitForInterrupt(Thread currentThread) { 95 /* 96 * If someone called cancel(true), it is possible that the interrupted bit hasn't been set yet. 97 * Wait for the interrupting thread to set DONE. (See interruptTask().) We want to wait so that 98 * the interrupting thread doesn't interrupt the _next_ thing to run on this thread. 99 * 100 * Note: We don't reset the interrupted bit, just wait for it to be set. If this is a thread 101 * pool thread, the thread pool will reset it for us. Otherwise, the interrupted bit may have 102 * been intended for something else, so don't clear it. 103 */ 104 boolean restoreInterruptedBit = false; 105 int spinCount = 0; 106 // Interrupting Cow Says: 107 // ______ 108 // < Spin > 109 // ------ 110 // \ ^__^ 111 // \ (oo)\_______ 112 // (__)\ )\/\ 113 // ||----w | 114 // || || 115 Runnable state = get(); 116 Blocker blocker = null; 117 while (state instanceof Blocker || state == PARKED) { 118 if (state instanceof Blocker) { 119 blocker = (Blocker) state; 120 } 121 spinCount++; 122 if (spinCount > MAX_BUSY_WAIT_SPINS) { 123 /* 124 * If we have spun a lot, just park ourselves. This will save CPU while we wait for a slow 125 * interrupting thread. In theory, interruptTask() should be very fast, but due to 126 * InterruptibleChannel and JavaLangAccess.blockedOn(Thread, Interruptible), it isn't 127 * predictable what work might be done. (e.g., close a file and flush buffers to disk). To 128 * protect ourselves from this, we park ourselves and tell our interrupter that we did so. 129 */ 130 if (state == PARKED || compareAndSet(state, PARKED)) { 131 // Interrupting Cow Says: 132 // ______ 133 // < Park > 134 // ------ 135 // \ ^__^ 136 // \ (oo)\_______ 137 // (__)\ )\/\ 138 // ||----w | 139 // || || 140 // We need to clear the interrupted bit prior to calling park and maintain it in case we 141 // wake up spuriously. 142 restoreInterruptedBit = Thread.interrupted() || restoreInterruptedBit; 143 LockSupport.park(blocker); 144 } 145 } else { 146 Thread.yield(); 147 } 148 state = get(); 149 } 150 if (restoreInterruptedBit) { 151 currentThread.interrupt(); 152 } 153 /* 154 * TODO(cpovirk): Clear interrupt status here? We currently don't, which means that an interrupt 155 * before, during, or after runInterruptibly() (unless it produced an InterruptedException 156 * caught above) can linger and affect listeners. 157 */ 158 } 159 160 /** 161 * Called before runInterruptibly - if true, runInterruptibly and afterRanInterruptibly will not 162 * be called. 163 */ 164 abstract boolean isDone(); 165 166 /** 167 * Do interruptible work here - do not complete Futures here, as their listeners could be 168 * interrupted. 169 */ 170 @ParametricNullness 171 abstract T runInterruptibly() throws Exception; 172 173 /** 174 * Any interruption that happens as a result of calling interruptTask will arrive before this 175 * method is called. Complete Futures here. 176 */ 177 abstract void afterRanInterruptiblySuccess(@ParametricNullness T result); 178 179 /** 180 * Any interruption that happens as a result of calling interruptTask will arrive before this 181 * method is called. Complete Futures here. 182 */ 183 abstract void afterRanInterruptiblyFailure(Throwable error); 184 185 /** 186 * Interrupts the running task. Because this internally calls {@link Thread#interrupt()} which can 187 * in turn invoke arbitrary code it is not safe to call while holding a lock. 188 */ 189 final void interruptTask() { 190 // Since the Thread is replaced by DONE before run() invokes listeners or returns, if we succeed 191 // in this CAS, there's no risk of interrupting the wrong thread or interrupting a thread that 192 // isn't currently executing this task. 193 Runnable currentRunner = get(); 194 if (currentRunner instanceof Thread) { 195 Blocker blocker = new Blocker(this); 196 blocker.setOwner(Thread.currentThread()); 197 if (compareAndSet(currentRunner, blocker)) { 198 // Thread.interrupt can throw arbitrary exceptions due to the nio InterruptibleChannel API 199 // This will make sure that tasks don't get stuck busy waiting. 200 // Some of this is fixed in jdk11 (see https://bugs.openjdk.java.net/browse/JDK-8198692) but 201 // not all. See the test cases for examples on how this can happen. 202 try { 203 ((Thread) currentRunner).interrupt(); 204 } finally { 205 Runnable prev = getAndSet(DONE); 206 if (prev == PARKED) { 207 LockSupport.unpark((Thread) currentRunner); 208 } 209 } 210 } 211 } 212 } 213 214 /** 215 * Using this as the blocker object allows introspection and debugging tools to see that the 216 * currentRunner thread is blocked on the progress of the interruptor thread, which can help 217 * identify deadlocks. 218 */ 219 @VisibleForTesting 220 static final class Blocker extends AbstractOwnableSynchronizer implements Runnable { 221 private final InterruptibleTask<?> task; 222 223 private Blocker(InterruptibleTask<?> task) { 224 this.task = task; 225 } 226 227 @Override 228 public void run() {} 229 230 private void setOwner(Thread thread) { 231 super.setExclusiveOwnerThread(thread); 232 } 233 234 @Override 235 public String toString() { 236 return task.toString(); 237 } 238 } 239 240 @Override 241 public final String toString() { 242 Runnable state = get(); 243 final String result; 244 if (state == DONE) { 245 result = "running=[DONE]"; 246 } else if (state instanceof Blocker) { 247 result = "running=[INTERRUPTED]"; 248 } else if (state instanceof Thread) { 249 // getName is final on Thread, no need to worry about exceptions 250 result = "running=[RUNNING ON " + ((Thread) state).getName() + "]"; 251 } else { 252 result = "running=[NOT STARTED YET]"; 253 } 254 return result + ", " + toPendingString(); 255 } 256 257 abstract String toPendingString(); 258 }