Coverage Summary for Class: AbstractFuture (com.google.common.util.concurrent)
| Class | Method, % | Line, % |
|---|---|---|
| AbstractFuture | 62.5% (20/32) | 36.3% (133/366) |
| AbstractFuture$AtomicHelper | 100% (1/1) | 100% (1/1) |
| AbstractFuture$Cancellation | 100% (2/2) | 80% (8/10) |
| AbstractFuture$Failure | 100% (2/2) | 100% (4/4) |
| AbstractFuture$Failure$1 | 100% (2/2) | 100% (2/2) |
| AbstractFuture$Listener | 100% (2/2) | 100% (5/5) |
| AbstractFuture$SafeAtomicHelper | 0% (0/6) | 0% (0/14) |
| AbstractFuture$SetFuture | 0% (0/2) | 0% (0/10) |
| AbstractFuture$SynchronizedHelper | 0% (0/6) | 0% (0/23) |
| AbstractFuture$Trusted | ||
| AbstractFuture$TrustedFuture | 71.4% (5/7) | 62.5% (5/8) |
| AbstractFuture$UnsafeAtomicHelper | 100% (7/7) | 79.3% (23/29) |
| AbstractFuture$UnsafeAtomicHelper$1 | 100% (2/2) | 87.5% (7/8) |
| AbstractFuture$Waiter | 100% (5/5) | 100% (12/12) |
| Total | 63.2% (48/76) | 40.7% (200/492) |
1 /* 2 * Copyright (C) 2007 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Throwables.throwIfUnchecked; 19 import static java.lang.Integer.toHexString; 20 import static java.lang.System.identityHashCode; 21 import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 22 23 import com.google.common.annotations.Beta; 24 import com.google.common.annotations.GwtCompatible; 25 import com.google.common.base.Strings; 26 import com.google.common.util.concurrent.internal.InternalFutureFailureAccess; 27 import com.google.common.util.concurrent.internal.InternalFutures; 28 import com.google.errorprone.annotations.CanIgnoreReturnValue; 29 import com.google.errorprone.annotations.ForOverride; 30 import com.google.j2objc.annotations.ReflectionSupport; 31 import java.security.AccessController; 32 import java.security.PrivilegedActionException; 33 import java.security.PrivilegedExceptionAction; 34 import java.util.Locale; 35 import java.util.concurrent.CancellationException; 36 import java.util.concurrent.ExecutionException; 37 import java.util.concurrent.Executor; 38 import java.util.concurrent.Future; 39 import java.util.concurrent.ScheduledFuture; 40 import java.util.concurrent.TimeUnit; 41 import java.util.concurrent.TimeoutException; 42 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 43 import java.util.concurrent.locks.LockSupport; 44 import java.util.logging.Level; 45 import java.util.logging.Logger; 46 import org.checkerframework.checker.nullness.qual.Nullable; 47 48 /** 49 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 50 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 51 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 52 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, 53 * com.google.common.base.Function, java.util.concurrent.Executor) Futures.transform} and {@link 54 * Futures#catching(ListenableFuture, Class, com.google.common.base.Function, 55 * java.util.concurrent.Executor) Futures.catching}. 56 * 57 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 58 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 59 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 60 * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses 61 * should rarely override other methods. 62 * 63 * @author Sven Mawson 64 * @author Luke Sandberg 65 * @since 1.0 66 */ 67 // we use non-short circuiting comparisons intentionally 68 @SuppressWarnings("ShortCircuitBoolean") 69 @GwtCompatible(emulated = true) 70 @ReflectionSupport(value = ReflectionSupport.Level.FULL) 71 public abstract class AbstractFuture<V> extends InternalFutureFailureAccess 72 implements ListenableFuture<V> { 73 // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || 74 75 private static final boolean GENERATE_CANCELLATION_CAUSES; 76 77 static { 78 // System.getProperty may throw if the security policy does not permit access. 79 boolean generateCancellationCauses; 80 try { 81 generateCancellationCauses = 82 Boolean.parseBoolean( 83 System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); 84 } catch (SecurityException e) { 85 generateCancellationCauses = false; 86 } 87 GENERATE_CANCELLATION_CAUSES = generateCancellationCauses; 88 } 89 90 /** 91 * Tag interface marking trusted subclasses. This enables some optimizations. The implementation 92 * of this interface must also be an AbstractFuture and must not override or expose for overriding 93 * any of the public methods of ListenableFuture. 94 */ 95 interface Trusted<V> extends ListenableFuture<V> {} 96 97 /** 98 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 99 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 100 */ 101 abstract static class TrustedFuture<V> extends AbstractFuture<V> implements Trusted<V> { 102 @CanIgnoreReturnValue 103 @Override 104 public final V get() throws InterruptedException, ExecutionException { 105 return super.get(); 106 } 107 108 @CanIgnoreReturnValue 109 @Override 110 public final V get(long timeout, TimeUnit unit) 111 throws InterruptedException, ExecutionException, TimeoutException { 112 return super.get(timeout, unit); 113 } 114 115 @Override 116 public final boolean isDone() { 117 return super.isDone(); 118 } 119 120 @Override 121 public final boolean isCancelled() { 122 return super.isCancelled(); 123 } 124 125 @Override 126 public final void addListener(Runnable listener, Executor executor) { 127 super.addListener(listener, executor); 128 } 129 130 @CanIgnoreReturnValue 131 @Override 132 public final boolean cancel(boolean mayInterruptIfRunning) { 133 return super.cancel(mayInterruptIfRunning); 134 } 135 } 136 137 // Logger to log exceptions caught when running listeners. 138 private static final Logger log = Logger.getLogger(AbstractFuture.class.getName()); 139 140 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 141 // blocking. This value is what AbstractQueuedSynchronizer uses. 142 private static final long SPIN_THRESHOLD_NANOS = 1000L; 143 144 private static final AtomicHelper ATOMIC_HELPER; 145 146 static { 147 AtomicHelper helper; 148 Throwable thrownUnsafeFailure = null; 149 Throwable thrownAtomicReferenceFieldUpdaterFailure = null; 150 151 try { 152 helper = new UnsafeAtomicHelper(); 153 } catch (Throwable unsafeFailure) { 154 thrownUnsafeFailure = unsafeFailure; 155 // catch absolutely everything and fall through to our 'SafeAtomicHelper' 156 // The access control checks that ARFU does means the caller class has to be AbstractFuture 157 // instead of SafeAtomicHelper, so we annoyingly define these here 158 try { 159 helper = 160 new SafeAtomicHelper( 161 newUpdater(Waiter.class, Thread.class, "thread"), 162 newUpdater(Waiter.class, Waiter.class, "next"), 163 newUpdater(AbstractFuture.class, Waiter.class, "waiters"), 164 newUpdater(AbstractFuture.class, Listener.class, "listeners"), 165 newUpdater(AbstractFuture.class, Object.class, "value")); 166 } catch (Throwable atomicReferenceFieldUpdaterFailure) { 167 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 168 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 169 // For these users fallback to a suboptimal implementation, based on synchronized. This will 170 // be a definite performance hit to those users. 171 thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure; 172 helper = new SynchronizedHelper(); 173 } 174 } 175 ATOMIC_HELPER = helper; 176 177 // Prevent rare disastrous classloading in first call to LockSupport.park. 178 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 179 @SuppressWarnings("unused") 180 Class<?> ensureLoaded = LockSupport.class; 181 182 // Log after all static init is finished; if an installed logger uses any Futures methods, it 183 // shouldn't break in cases where reflection is missing/broken. 184 if (thrownAtomicReferenceFieldUpdaterFailure != null) { 185 log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", thrownUnsafeFailure); 186 log.log( 187 Level.SEVERE, "SafeAtomicHelper is broken!", thrownAtomicReferenceFieldUpdaterFailure); 188 } 189 } 190 191 /** Waiter links form a Treiber stack, in the {@link #waiters} field. */ 192 private static final class Waiter { 193 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 194 195 volatile @Nullable Thread thread; 196 volatile @Nullable Waiter next; 197 198 /** 199 * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 200 * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 201 */ 202 Waiter(boolean unused) {} 203 204 Waiter() { 205 // avoid volatile write, write is made visible by subsequent CAS on waiters field 206 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 207 } 208 209 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 210 // field. 211 void setNext(Waiter next) { 212 ATOMIC_HELPER.putNext(this, next); 213 } 214 215 void unpark() { 216 // This is racy with removeWaiter. The consequence of the race is that we may spuriously call 217 // unpark even though the thread has already removed itself from the list. But even if we did 218 // use a CAS, that race would still exist (it would just be ever so slightly smaller). 219 Thread w = thread; 220 if (w != null) { 221 thread = null; 222 LockSupport.unpark(w); 223 } 224 } 225 } 226 227 /** 228 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 229 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved 230 * by two things. 231 * 232 * <ul> 233 * <li>This is only called when a waiting thread times out or is interrupted. Both of which 234 * should be rare. 235 * <li>The waiters list should be very short. 236 * </ul> 237 */ 238 private void removeWaiter(Waiter node) { 239 node.thread = null; // mark as 'deleted' 240 restart: 241 while (true) { 242 Waiter pred = null; 243 Waiter curr = waiters; 244 if (curr == Waiter.TOMBSTONE) { 245 return; // give up if someone is calling complete 246 } 247 Waiter succ; 248 while (curr != null) { 249 succ = curr.next; 250 if (curr.thread != null) { // we aren't unlinking this node, update pred. 251 pred = curr; 252 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 253 pred.next = succ; 254 if (pred.thread == null) { // We raced with another node that unlinked pred. Restart. 255 continue restart; 256 } 257 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 258 continue restart; // We raced with an add or complete 259 } 260 curr = succ; 261 } 262 break; 263 } 264 } 265 266 /** Listeners also form a stack through the {@link #listeners} field. */ 267 private static final class Listener { 268 static final Listener TOMBSTONE = new Listener(null, null); 269 final Runnable task; 270 final Executor executor; 271 272 // writes to next are made visible by subsequent CAS's on the listeners field 273 @Nullable Listener next; 274 275 Listener(Runnable task, Executor executor) { 276 this.task = task; 277 this.executor = executor; 278 } 279 } 280 281 /** A special value to represent {@code null}. */ 282 private static final Object NULL = new Object(); 283 284 /** A special value to represent failure, when {@link #setException} is called successfully. */ 285 private static final class Failure { 286 static final Failure FALLBACK_INSTANCE = 287 new Failure( 288 new Throwable("Failure occurred while trying to finish a future.") { 289 @Override 290 public synchronized Throwable fillInStackTrace() { 291 return this; // no stack trace 292 } 293 }); 294 final Throwable exception; 295 296 Failure(Throwable exception) { 297 this.exception = checkNotNull(exception); 298 } 299 } 300 301 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 302 private static final class Cancellation { 303 // constants to use when GENERATE_CANCELLATION_CAUSES = false 304 static final Cancellation CAUSELESS_INTERRUPTED; 305 static final Cancellation CAUSELESS_CANCELLED; 306 307 static { 308 if (GENERATE_CANCELLATION_CAUSES) { 309 CAUSELESS_CANCELLED = null; 310 CAUSELESS_INTERRUPTED = null; 311 } else { 312 CAUSELESS_CANCELLED = new Cancellation(false, null); 313 CAUSELESS_INTERRUPTED = new Cancellation(true, null); 314 } 315 } 316 317 final boolean wasInterrupted; 318 final @Nullable Throwable cause; 319 320 Cancellation(boolean wasInterrupted, @Nullable Throwable cause) { 321 this.wasInterrupted = wasInterrupted; 322 this.cause = cause; 323 } 324 } 325 326 /** A special value that encodes the 'setFuture' state. */ 327 private static final class SetFuture<V> implements Runnable { 328 final AbstractFuture<V> owner; 329 final ListenableFuture<? extends V> future; 330 331 SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) { 332 this.owner = owner; 333 this.future = future; 334 } 335 336 @Override 337 public void run() { 338 if (owner.value != this) { 339 // nothing to do, we must have been cancelled, don't bother inspecting the future. 340 return; 341 } 342 Object valueToSet = getFutureValue(future); 343 if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { 344 complete(owner); 345 } 346 } 347 } 348 349 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 350 // available. 351 /** 352 * This field encodes the current state of the future. 353 * 354 * <p>The valid values are: 355 * 356 * <ul> 357 * <li>{@code null} initial state, nothing has happened. 358 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 359 * <li>{@link Failure} terminal state, {@code setException} was called. 360 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 361 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 362 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null 363 * argument. 364 * </ul> 365 */ 366 private volatile @Nullable Object value; 367 368 /** All listeners. */ 369 private volatile @Nullable Listener listeners; 370 371 /** All waiting threads. */ 372 private volatile @Nullable Waiter waiters; 373 374 /** Constructor for use by subclasses. */ 375 protected AbstractFuture() {} 376 377 // Gets and Timed Gets 378 // 379 // * Be responsive to interruption 380 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 381 // waiters field. 382 // * Future completion is defined by when #value becomes non-null/non SetFuture 383 // * Future completion can be observed if the waiters field contains a TOMBSTONE 384 385 // Timed Get 386 // There are a few design constraints to consider 387 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 388 // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the 389 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 390 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 391 // similar purposes. 392 // * We want to behave reasonably for timeouts of 0 393 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 394 // system scheduling and as such we could either miss our deadline, or unpark() could be delayed 395 // so that it looks like we timed out even though we didn't. For comparison FutureTask respects 396 // completion preferably and AQS is non-deterministic (depends on where in the queue the waiter 397 // is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node 398 // and we could use that to make a decision about whether or not we timed out prior to being 399 // unparked. 400 401 /** 402 * {@inheritDoc} 403 * 404 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 405 * current thread is interrupted during the call, even if the value is already available. 406 * 407 * @throws CancellationException {@inheritDoc} 408 */ 409 @CanIgnoreReturnValue 410 @Override 411 public V get(long timeout, TimeUnit unit) 412 throws InterruptedException, TimeoutException, ExecutionException { 413 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop 414 // at the bottom and throw a timeoutexception. 415 final long timeoutNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit. 416 long remainingNanos = timeoutNanos; 417 if (Thread.interrupted()) { 418 throw new InterruptedException(); 419 } 420 Object localValue = value; 421 if (localValue != null & !(localValue instanceof SetFuture)) { 422 return getDoneValue(localValue); 423 } 424 // we delay calling nanoTime until we know we will need to either park or spin 425 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 426 long_wait_loop: 427 if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 428 Waiter oldHead = waiters; 429 if (oldHead != Waiter.TOMBSTONE) { 430 Waiter node = new Waiter(); 431 do { 432 node.setNext(oldHead); 433 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 434 while (true) { 435 OverflowAvoidingLockSupport.parkNanos(this, remainingNanos); 436 // Check interruption first, if we woke up due to interruption we need to honor that. 437 if (Thread.interrupted()) { 438 removeWaiter(node); 439 throw new InterruptedException(); 440 } 441 442 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 443 // wakeup 444 localValue = value; 445 if (localValue != null & !(localValue instanceof SetFuture)) { 446 return getDoneValue(localValue); 447 } 448 449 // timed out? 450 remainingNanos = endNanos - System.nanoTime(); 451 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 452 // Remove the waiter, one way or another we are done parking this thread. 453 removeWaiter(node); 454 break long_wait_loop; // jump down to the busy wait loop 455 } 456 } 457 } 458 oldHead = waiters; // re-read and loop. 459 } while (oldHead != Waiter.TOMBSTONE); 460 } 461 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 462 // waiter. 463 return getDoneValue(value); 464 } 465 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the 466 // waiters list 467 while (remainingNanos > 0) { 468 localValue = value; 469 if (localValue != null & !(localValue instanceof SetFuture)) { 470 return getDoneValue(localValue); 471 } 472 if (Thread.interrupted()) { 473 throw new InterruptedException(); 474 } 475 remainingNanos = endNanos - System.nanoTime(); 476 } 477 478 String futureToString = toString(); 479 final String unitString = unit.toString().toLowerCase(Locale.ROOT); 480 String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT); 481 // Only report scheduling delay if larger than our spin threshold - otherwise it's just noise 482 if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) { 483 // We over-waited for our timeout. 484 message += " (plus "; 485 long overWaitNanos = -remainingNanos; 486 long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS); 487 long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits); 488 boolean shouldShowExtraNanos = 489 overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS; 490 if (overWaitUnits > 0) { 491 message += overWaitUnits + " " + unitString; 492 if (shouldShowExtraNanos) { 493 message += ","; 494 } 495 message += " "; 496 } 497 if (shouldShowExtraNanos) { 498 message += overWaitLeftoverNanos + " nanoseconds "; 499 } 500 501 message += "delay)"; 502 } 503 // It's confusing to see a completed future in a timeout message; if isDone() returns false, 504 // then we know it must have given a pending toString value earlier. If not, then the future 505 // completed after the timeout expired, and the message might be success. 506 if (isDone()) { 507 throw new TimeoutException(message + " but future completed as timeout expired"); 508 } 509 throw new TimeoutException(message + " for " + futureToString); 510 } 511 512 /** 513 * {@inheritDoc} 514 * 515 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 516 * current thread is interrupted during the call, even if the value is already available. 517 * 518 * @throws CancellationException {@inheritDoc} 519 */ 520 @CanIgnoreReturnValue 521 @Override 522 public V get() throws InterruptedException, ExecutionException { 523 if (Thread.interrupted()) { 524 throw new InterruptedException(); 525 } 526 Object localValue = value; 527 if (localValue != null & !(localValue instanceof SetFuture)) { 528 return getDoneValue(localValue); 529 } 530 Waiter oldHead = waiters; 531 if (oldHead != Waiter.TOMBSTONE) { 532 Waiter node = new Waiter(); 533 do { 534 node.setNext(oldHead); 535 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 536 // we are on the stack, now wait for completion. 537 while (true) { 538 LockSupport.park(this); 539 // Check interruption first, if we woke up due to interruption we need to honor that. 540 if (Thread.interrupted()) { 541 removeWaiter(node); 542 throw new InterruptedException(); 543 } 544 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 545 // wakeup 546 localValue = value; 547 if (localValue != null & !(localValue instanceof SetFuture)) { 548 return getDoneValue(localValue); 549 } 550 } 551 } 552 oldHead = waiters; // re-read and loop. 553 } while (oldHead != Waiter.TOMBSTONE); 554 } 555 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 556 // waiter. 557 return getDoneValue(value); 558 } 559 560 /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ 561 private V getDoneValue(Object obj) throws ExecutionException { 562 // While this seems like it might be too branch-y, simple benchmarking proves it to be 563 // unmeasurable (comparing done AbstractFutures with immediateFuture) 564 if (obj instanceof Cancellation) { 565 throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); 566 } else if (obj instanceof Failure) { 567 throw new ExecutionException(((Failure) obj).exception); 568 } else if (obj == NULL) { 569 return null; 570 } else { 571 @SuppressWarnings("unchecked") // this is the only other option 572 V asV = (V) obj; 573 return asV; 574 } 575 } 576 577 @Override 578 public boolean isDone() { 579 final Object localValue = value; 580 return localValue != null & !(localValue instanceof SetFuture); 581 } 582 583 @Override 584 public boolean isCancelled() { 585 final Object localValue = value; 586 return localValue instanceof Cancellation; 587 } 588 589 /** 590 * {@inheritDoc} 591 * 592 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain 593 * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate 594 * {@code Future} that was supplied in the {@code setFuture} call. 595 * 596 * <p>Rather than override this method to perform additional cancellation work or cleanup, 597 * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link 598 * #wasInterrupted} as necessary. This ensures that the work is done even if the future is 599 * cancelled without a call to {@code cancel}, such as by calling {@code 600 * setFuture(cancelledFuture)}. 601 * 602 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 603 * acquire other locks, risking deadlocks. 604 */ 605 @CanIgnoreReturnValue 606 @Override 607 public boolean cancel(boolean mayInterruptIfRunning) { 608 Object localValue = value; 609 boolean rValue = false; 610 if (localValue == null | localValue instanceof SetFuture) { 611 // Try to delay allocating the exception. At this point we may still lose the CAS, but it is 612 // certainly less likely. 613 Object valueToSet = 614 GENERATE_CANCELLATION_CAUSES 615 ? new Cancellation( 616 mayInterruptIfRunning, new CancellationException("Future.cancel() was called.")) 617 : (mayInterruptIfRunning 618 ? Cancellation.CAUSELESS_INTERRUPTED 619 : Cancellation.CAUSELESS_CANCELLED); 620 AbstractFuture<?> abstractFuture = this; 621 while (true) { 622 if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { 623 rValue = true; 624 // We call interruptTask before calling complete(), which is consistent with 625 // FutureTask 626 if (mayInterruptIfRunning) { 627 abstractFuture.interruptTask(); 628 } 629 complete(abstractFuture); 630 if (localValue instanceof SetFuture) { 631 // propagate cancellation to the future set in setfuture, this is racy, and we don't 632 // care if we are successful or not. 633 ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; 634 if (futureToPropagateTo instanceof Trusted) { 635 // If the future is a TrustedFuture then we specifically avoid calling cancel() 636 // this has 2 benefits 637 // 1. for long chains of futures strung together with setFuture we consume less stack 638 // 2. we avoid allocating Cancellation objects at every level of the cancellation 639 // chain 640 // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and 641 // does nothing but delegate to this method. 642 AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo; 643 localValue = trusted.value; 644 if (localValue == null | localValue instanceof SetFuture) { 645 abstractFuture = trusted; 646 continue; // loop back up and try to complete the new future 647 } 648 } else { 649 // not a TrustedFuture, call cancel directly. 650 futureToPropagateTo.cancel(mayInterruptIfRunning); 651 } 652 } 653 break; 654 } 655 // obj changed, reread 656 localValue = abstractFuture.value; 657 if (!(localValue instanceof SetFuture)) { 658 // obj cannot be null at this point, because value can only change from null to non-null. 659 // So if value changed (and it did since we lost the CAS), then it cannot be null and 660 // since it isn't a SetFuture, then the future must be done and we should exit the loop 661 break; 662 } 663 } 664 } 665 return rValue; 666 } 667 668 /** 669 * Subclasses can override this method to implement interruption of the future's computation. The 670 * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}. 671 * 672 * <p>The default implementation does nothing. 673 * 674 * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, checking 675 * {@link #wasInterrupted} to decide whether to interrupt your task. 676 * 677 * @since 10.0 678 */ 679 protected void interruptTask() {} 680 681 /** 682 * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code 683 * true}. 684 * 685 * @since 14.0 686 */ 687 protected final boolean wasInterrupted() { 688 final Object localValue = value; 689 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 690 } 691 692 /** 693 * {@inheritDoc} 694 * 695 * @since 10.0 696 */ 697 @Override 698 public void addListener(Runnable listener, Executor executor) { 699 checkNotNull(listener, "Runnable was null."); 700 checkNotNull(executor, "Executor was null."); 701 // Checking isDone and listeners != TOMBSTONE may seem redundant, but our contract for 702 // addListener says that listeners execute 'immediate' if the future isDone(). However, our 703 // protocol for completing a future is to assign the value field (which sets isDone to true) and 704 // then to release waiters, followed by executing afterDone(), followed by releasing listeners. 705 // That means that it is possible to observe that the future isDone and that your listeners 706 // don't execute 'immediately'. By checking isDone here we avoid that. 707 // A corollary to all that is that we don't need to check isDone inside the loop because if we 708 // get into the loop we know that we weren't done when we entered and therefore we aren't under 709 // an obligation to execute 'immediately'. 710 if (!isDone()) { 711 Listener oldHead = listeners; 712 if (oldHead != Listener.TOMBSTONE) { 713 Listener newNode = new Listener(listener, executor); 714 do { 715 newNode.next = oldHead; 716 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 717 return; 718 } 719 oldHead = listeners; // re-read 720 } while (oldHead != Listener.TOMBSTONE); 721 } 722 } 723 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 724 // the listener. 725 executeListener(listener, executor); 726 } 727 728 /** 729 * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or 730 * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns, 731 * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was 732 * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code 733 * Future} may have previously been set asynchronously, in which case its result may not be known 734 * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 735 * method, only by a call to {@link #cancel}. 736 * 737 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 738 * acquire other locks, risking deadlocks. 739 * 740 * @param value the value to be used as the result 741 * @return true if the attempt was accepted, completing the {@code Future} 742 */ 743 @CanIgnoreReturnValue 744 protected boolean set(@Nullable V value) { 745 Object valueToSet = value == null ? NULL : value; 746 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 747 complete(this); 748 return true; 749 } 750 return false; 751 } 752 753 /** 754 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 755 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 756 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> 757 * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the 758 * {@code Future} may have previously been set asynchronously, in which case its result may not be 759 * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 760 * method, only by a call to {@link #cancel}. 761 * 762 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 763 * acquire other locks, risking deadlocks. 764 * 765 * @param throwable the exception to be used as the failed result 766 * @return true if the attempt was accepted, completing the {@code Future} 767 */ 768 @CanIgnoreReturnValue 769 protected boolean setException(Throwable throwable) { 770 Object valueToSet = new Failure(checkNotNull(throwable)); 771 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 772 complete(this); 773 return true; 774 } 775 return false; 776 } 777 778 /** 779 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 780 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 781 * (including "set asynchronously," defined below). 782 * 783 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call 784 * is accepted, then this future is guaranteed to have been completed with the supplied future by 785 * the time this method returns. If the supplied future is not done and the call is accepted, then 786 * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known, 787 * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}. 788 * 789 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 790 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 791 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 792 * Future}. 793 * 794 * <p>Note that, even if the supplied future is cancelled and it causes this future to complete, 795 * it will never trigger interruption behavior. In particular, it will not cause this future to 796 * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not 797 * return {@code true}. 798 * 799 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 800 * acquire other locks, risking deadlocks. 801 * 802 * @param future the future to delegate to 803 * @return true if the attempt was accepted, indicating that the {@code Future} was not previously 804 * cancelled or set. 805 * @since 19.0 806 */ 807 @CanIgnoreReturnValue 808 protected boolean setFuture(ListenableFuture<? extends V> future) { 809 checkNotNull(future); 810 Object localValue = value; 811 if (localValue == null) { 812 if (future.isDone()) { 813 Object value = getFutureValue(future); 814 if (ATOMIC_HELPER.casValue(this, null, value)) { 815 complete(this); 816 return true; 817 } 818 return false; 819 } 820 SetFuture<V> valueToSet = new SetFuture<V>(this, future); 821 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 822 // the listener is responsible for calling completeWithFuture, directExecutor is appropriate 823 // since all we are doing is unpacking a completed future which should be fast. 824 try { 825 future.addListener(valueToSet, DirectExecutor.INSTANCE); 826 } catch (Throwable t) { 827 // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this 828 // must have been caused by addListener itself. The most likely explanation is a 829 // misconfigured mock. Try to switch to Failure. 830 Failure failure; 831 try { 832 failure = new Failure(t); 833 } catch (Throwable oomMostLikely) { 834 failure = Failure.FALLBACK_INSTANCE; 835 } 836 // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok. 837 boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); 838 } 839 return true; 840 } 841 localValue = value; // we lost the cas, fall through and maybe cancel 842 } 843 // The future has already been set to something. If it is cancellation we should cancel the 844 // incoming future. 845 if (localValue instanceof Cancellation) { 846 // we don't care if it fails, this is best-effort. 847 future.cancel(((Cancellation) localValue).wasInterrupted); 848 } 849 return false; 850 } 851 852 /** 853 * Returns a value that satisfies the contract of the {@link #value} field based on the state of 854 * given future. 855 * 856 * <p>This is approximately the inverse of {@link #getDoneValue(Object)} 857 */ 858 private static Object getFutureValue(ListenableFuture<?> future) { 859 if (future instanceof Trusted) { 860 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 861 // override .get() (since it is final) and therefore this is equivalent to calling .get() 862 // and unpacking the exceptions like we do below (just much faster because it is a single 863 // field read instead of a read, several branches and possibly creating exceptions). 864 Object v = ((AbstractFuture<?>) future).value; 865 if (v instanceof Cancellation) { 866 // If the other future was interrupted, clear the interrupted bit while preserving the cause 867 // this will make it consistent with how non-trustedfutures work which cannot propagate the 868 // wasInterrupted bit 869 Cancellation c = (Cancellation) v; 870 if (c.wasInterrupted) { 871 v = 872 c.cause != null 873 ? new Cancellation(/* wasInterrupted= */ false, c.cause) 874 : Cancellation.CAUSELESS_CANCELLED; 875 } 876 } 877 return v; 878 } 879 if (future instanceof InternalFutureFailureAccess) { 880 Throwable throwable = 881 InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess) future); 882 if (throwable != null) { 883 return new Failure(throwable); 884 } 885 } 886 boolean wasCancelled = future.isCancelled(); 887 // Don't allocate a CancellationException if it's not necessary 888 if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) { 889 return Cancellation.CAUSELESS_CANCELLED; 890 } 891 // Otherwise calculate the value by calling .get() 892 try { 893 Object v = getUninterruptibly(future); 894 if (wasCancelled) { 895 return new Cancellation( 896 false, 897 new IllegalArgumentException( 898 "get() did not throw CancellationException, despite reporting " 899 + "isCancelled() == true: " 900 + future)); 901 } 902 return v == null ? NULL : v; 903 } catch (ExecutionException exception) { 904 if (wasCancelled) { 905 return new Cancellation( 906 false, 907 new IllegalArgumentException( 908 "get() did not throw CancellationException, despite reporting " 909 + "isCancelled() == true: " 910 + future, 911 exception)); 912 } 913 return new Failure(exception.getCause()); 914 } catch (CancellationException cancellation) { 915 if (!wasCancelled) { 916 return new Failure( 917 new IllegalArgumentException( 918 "get() threw CancellationException, despite reporting isCancelled() == false: " 919 + future, 920 cancellation)); 921 } 922 return new Cancellation(false, cancellation); 923 } catch (Throwable t) { 924 return new Failure(t); 925 } 926 } 927 928 /** 929 * An inlined private copy of {@link Uninterruptibles#getUninterruptibly} used to break an 930 * internal dependency on other /util/concurrent classes. 931 */ 932 private static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { 933 boolean interrupted = false; 934 try { 935 while (true) { 936 try { 937 return future.get(); 938 } catch (InterruptedException e) { 939 interrupted = true; 940 } 941 } 942 } finally { 943 if (interrupted) { 944 Thread.currentThread().interrupt(); 945 } 946 } 947 } 948 949 /** Unblocks all threads and runs all listeners. */ 950 private static void complete(AbstractFuture<?> future) { 951 Listener next = null; 952 outer: 953 while (true) { 954 future.releaseWaiters(); 955 // We call this before the listeners in order to avoid needing to manage a separate stack data 956 // structure for them. Also, some implementations rely on this running prior to listeners 957 // so that the cleanup work is visible to listeners. 958 // afterDone() should be generally fast and only used for cleanup work... but in theory can 959 // also be recursive and create StackOverflowErrors 960 future.afterDone(); 961 // push the current set of listeners onto next 962 next = future.clearListeners(next); 963 future = null; 964 while (next != null) { 965 Listener curr = next; 966 next = next.next; 967 Runnable task = curr.task; 968 if (task instanceof SetFuture) { 969 SetFuture<?> setFuture = (SetFuture<?>) task; 970 // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long 971 // chains of SetFutures 972 // Handling this special case is important because there is no way to pass an executor to 973 // setFuture, so a user couldn't break the chain by doing this themselves. It is also 974 // potentially common if someone writes a recursive Futures.transformAsync transformer. 975 future = setFuture.owner; 976 if (future.value == setFuture) { 977 Object valueToSet = getFutureValue(setFuture.future); 978 if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { 979 continue outer; 980 } 981 } 982 // other wise the future we were trying to set is already done. 983 } else { 984 executeListener(task, curr.executor); 985 } 986 } 987 break; 988 } 989 } 990 991 /** 992 * Callback method that is called exactly once after the future is completed. 993 * 994 * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. 995 * 996 * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is 997 * intended for very lightweight cleanup work, for example, timing statistics or clearing fields. 998 * If your task does anything heavier consider, just using a listener with an executor. 999 * 1000 * @since 20.0 1001 */ 1002 @Beta 1003 @ForOverride 1004 protected void afterDone() {} 1005 1006 // TODO(b/114236866): Inherit doc from InternalFutureFailureAccess. Also, -link to its URL. 1007 /** 1008 * Usually returns {@code null} but, if this {@code Future} has failed, may <i>optionally</i> 1009 * return the cause of the failure. "Failure" means specifically "completed with an exception"; it 1010 * does not include "was cancelled." To be explicit: If this method returns a non-null value, 1011 * then: 1012 * 1013 * <ul> 1014 * <li>{@code isDone()} must return {@code true} 1015 * <li>{@code isCancelled()} must return {@code false} 1016 * <li>{@code get()} must not block, and it must throw an {@code ExecutionException} with the 1017 * return value of this method as its cause 1018 * </ul> 1019 * 1020 * <p>This method is {@code protected} so that classes like {@code 1021 * com.google.common.util.concurrent.SettableFuture} do not expose it to their users as an 1022 * instance method. In the unlikely event that you need to call this method, call {@link 1023 * InternalFutures#tryInternalFastPathGetFailure(InternalFutureFailureAccess)}. 1024 * 1025 * @since 27.0 1026 */ 1027 @Override 1028 protected final @Nullable Throwable tryInternalFastPathGetFailure() { 1029 if (this instanceof Trusted) { 1030 Object obj = value; 1031 if (obj instanceof Failure) { 1032 return ((Failure) obj).exception; 1033 } 1034 } 1035 return null; 1036 } 1037 1038 /** 1039 * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts) 1040 * the given future (if available). 1041 */ 1042 final void maybePropagateCancellationTo(@Nullable Future<?> related) { 1043 if (related != null & isCancelled()) { 1044 related.cancel(wasInterrupted()); 1045 } 1046 } 1047 1048 /** Releases all threads in the {@link #waiters} list, and clears the list. */ 1049 private void releaseWaiters() { 1050 Waiter head; 1051 do { 1052 head = waiters; 1053 } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); 1054 for (Waiter currentWaiter = head; currentWaiter != null; currentWaiter = currentWaiter.next) { 1055 currentWaiter.unpark(); 1056 } 1057 } 1058 1059 /** 1060 * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently 1061 * added first. 1062 */ 1063 private Listener clearListeners(Listener onto) { 1064 // We need to 1065 // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to 1066 // to synchronize with us 1067 // 2. reverse the linked list, because despite our rather clear contract, people depend on us 1068 // executing listeners in the order they were added 1069 // 3. push all the items onto 'onto' and return the new head of the stack 1070 Listener head; 1071 do { 1072 head = listeners; 1073 } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); 1074 Listener reversedList = onto; 1075 while (head != null) { 1076 Listener tmp = head; 1077 head = head.next; 1078 tmp.next = reversedList; 1079 reversedList = tmp; 1080 } 1081 return reversedList; 1082 } 1083 1084 // TODO(user): move parts into a default method on ListenableFuture? 1085 @Override 1086 public String toString() { 1087 // TODO(cpovirk): Presize to something plausible? 1088 StringBuilder builder = new StringBuilder(); 1089 if (getClass().getName().startsWith("com.google.common.util.concurrent.")) { 1090 builder.append(getClass().getSimpleName()); 1091 } else { 1092 builder.append(getClass().getName()); 1093 } 1094 builder.append('@').append(toHexString(identityHashCode(this))).append("[status="); 1095 if (isCancelled()) { 1096 builder.append("CANCELLED"); 1097 } else if (isDone()) { 1098 addDoneString(builder); 1099 } else { 1100 addPendingString(builder); // delegates to addDoneString if future completes mid-way 1101 } 1102 return builder.append("]").toString(); 1103 } 1104 1105 /** 1106 * Provide a human-readable explanation of why this future has not yet completed. 1107 * 1108 * @return null if an explanation cannot be provided (e.g. because the future is done). 1109 * @since 23.0 1110 */ 1111 protected @Nullable String pendingToString() { 1112 // TODO(diamondm) consider moving this into addPendingString so it's always in the output 1113 if (this instanceof ScheduledFuture) { 1114 return "remaining delay=[" 1115 + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS) 1116 + " ms]"; 1117 } 1118 return null; 1119 } 1120 1121 private void addPendingString(StringBuilder builder) { 1122 // Capture current builder length so it can be truncated if this future ends up completing while 1123 // the toString is being calculated 1124 int truncateLength = builder.length(); 1125 1126 builder.append("PENDING"); 1127 1128 Object localValue = value; 1129 if (localValue instanceof SetFuture) { 1130 builder.append(", setFuture=["); 1131 appendUserObject(builder, ((SetFuture) localValue).future); 1132 builder.append("]"); 1133 } else { 1134 String pendingDescription; 1135 try { 1136 pendingDescription = Strings.emptyToNull(pendingToString()); 1137 } catch (RuntimeException | StackOverflowError e) { 1138 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1139 // subclass is implemented with bugs similar to the subclass. 1140 pendingDescription = "Exception thrown from implementation: " + e.getClass(); 1141 } 1142 if (pendingDescription != null) { 1143 builder.append(", info=[").append(pendingDescription).append("]"); 1144 } 1145 } 1146 1147 // The future may complete while calculating the toString, so we check once more to see if the 1148 // future is done 1149 if (isDone()) { 1150 // Truncate anything that was appended before realizing this future is done 1151 builder.delete(truncateLength, builder.length()); 1152 addDoneString(builder); 1153 } 1154 } 1155 1156 private void addDoneString(StringBuilder builder) { 1157 try { 1158 V value = getUninterruptibly(this); 1159 builder.append("SUCCESS, result=["); 1160 appendResultObject(builder, value); 1161 builder.append("]"); 1162 } catch (ExecutionException e) { 1163 builder.append("FAILURE, cause=[").append(e.getCause()).append("]"); 1164 } catch (CancellationException e) { 1165 builder.append("CANCELLED"); // shouldn't be reachable 1166 } catch (RuntimeException e) { 1167 builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]"); 1168 } 1169 } 1170 1171 /** 1172 * Any object can be the result of a Future, and not every object has a reasonable toString() 1173 * implementation. Using a reconstruction of the default Object.toString() prevents OOMs and stack 1174 * overflows, and helps avoid sensitive data inadvertently ending up in exception messages. 1175 */ 1176 private void appendResultObject(StringBuilder builder, Object o) { 1177 if (o == null) { 1178 builder.append("null"); 1179 } else if (o == this) { 1180 builder.append("this future"); 1181 } else { 1182 builder 1183 .append(o.getClass().getName()) 1184 .append("@") 1185 .append(Integer.toHexString(System.identityHashCode(o))); 1186 } 1187 } 1188 1189 /** Helper for printing user supplied objects into our toString method. */ 1190 private void appendUserObject(StringBuilder builder, Object o) { 1191 // This is some basic recursion detection for when people create cycles via set/setFuture or 1192 // when deep chains of futures exist resulting in a StackOverflowException. We could detect 1193 // arbitrary cycles using a thread local but this should be a good enough solution (it is also 1194 // what jdk collections do in these cases) 1195 try { 1196 if (o == this) { 1197 builder.append("this future"); 1198 } else { 1199 builder.append(o); 1200 } 1201 } catch (RuntimeException | StackOverflowError e) { 1202 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1203 // user object is implemented with bugs similar to the user object. 1204 builder.append("Exception thrown from implementation: ").append(e.getClass()); 1205 } 1206 } 1207 1208 /** 1209 * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain 1210 * RuntimeException runtime exceptions} thrown by the executor. 1211 */ 1212 private static void executeListener(Runnable runnable, Executor executor) { 1213 try { 1214 executor.execute(runnable); 1215 } catch (RuntimeException e) { 1216 // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if 1217 // we're given a bad one. We only catch RuntimeException because we want Errors to propagate 1218 // up. 1219 log.log( 1220 Level.SEVERE, 1221 "RuntimeException while executing runnable " + runnable + " with executor " + executor, 1222 e); 1223 } 1224 } 1225 1226 private abstract static class AtomicHelper { 1227 /** Non volatile write of the thread to the {@link Waiter#thread} field. */ 1228 abstract void putThread(Waiter waiter, Thread newValue); 1229 1230 /** Non volatile write of the waiter to the {@link Waiter#next} field. */ 1231 abstract void putNext(Waiter waiter, Waiter newValue); 1232 1233 /** Performs a CAS operation on the {@link #waiters} field. */ 1234 abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update); 1235 1236 /** Performs a CAS operation on the {@link #listeners} field. */ 1237 abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update); 1238 1239 /** Performs a CAS operation on the {@link #value} field. */ 1240 abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update); 1241 } 1242 1243 /** 1244 * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. 1245 * 1246 * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot 1247 * be accessed. 1248 */ 1249 @SuppressWarnings("sunapi") 1250 private static final class UnsafeAtomicHelper extends AtomicHelper { 1251 static final sun.misc.Unsafe UNSAFE; 1252 static final long LISTENERS_OFFSET; 1253 static final long WAITERS_OFFSET; 1254 static final long VALUE_OFFSET; 1255 static final long WAITER_THREAD_OFFSET; 1256 static final long WAITER_NEXT_OFFSET; 1257 1258 static { 1259 sun.misc.Unsafe unsafe = null; 1260 try { 1261 unsafe = sun.misc.Unsafe.getUnsafe(); 1262 } catch (SecurityException tryReflectionInstead) { 1263 try { 1264 unsafe = 1265 AccessController.doPrivileged( 1266 new PrivilegedExceptionAction<sun.misc.Unsafe>() { 1267 @Override 1268 public sun.misc.Unsafe run() throws Exception { 1269 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; 1270 for (java.lang.reflect.Field f : k.getDeclaredFields()) { 1271 f.setAccessible(true); 1272 Object x = f.get(null); 1273 if (k.isInstance(x)) { 1274 return k.cast(x); 1275 } 1276 } 1277 throw new NoSuchFieldError("the Unsafe"); 1278 } 1279 }); 1280 } catch (PrivilegedActionException e) { 1281 throw new RuntimeException("Could not initialize intrinsics", e.getCause()); 1282 } 1283 } 1284 try { 1285 Class<?> abstractFuture = AbstractFuture.class; 1286 WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters")); 1287 LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners")); 1288 VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value")); 1289 WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread")); 1290 WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next")); 1291 UNSAFE = unsafe; 1292 } catch (Exception e) { 1293 throwIfUnchecked(e); 1294 throw new RuntimeException(e); 1295 } 1296 } 1297 1298 @Override 1299 void putThread(Waiter waiter, Thread newValue) { 1300 UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue); 1301 } 1302 1303 @Override 1304 void putNext(Waiter waiter, Waiter newValue) { 1305 UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue); 1306 } 1307 1308 /** Performs a CAS operation on the {@link #waiters} field. */ 1309 @Override 1310 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1311 return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update); 1312 } 1313 1314 /** Performs a CAS operation on the {@link #listeners} field. */ 1315 @Override 1316 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1317 return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update); 1318 } 1319 1320 /** Performs a CAS operation on the {@link #value} field. */ 1321 @Override 1322 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1323 return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update); 1324 } 1325 } 1326 1327 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 1328 @SuppressWarnings("rawtypes") 1329 private static final class SafeAtomicHelper extends AtomicHelper { 1330 final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; 1331 final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; 1332 final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater; 1333 final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater; 1334 final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater; 1335 1336 SafeAtomicHelper( 1337 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, 1338 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, 1339 AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, 1340 AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, 1341 AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) { 1342 this.waiterThreadUpdater = waiterThreadUpdater; 1343 this.waiterNextUpdater = waiterNextUpdater; 1344 this.waitersUpdater = waitersUpdater; 1345 this.listenersUpdater = listenersUpdater; 1346 this.valueUpdater = valueUpdater; 1347 } 1348 1349 @Override 1350 void putThread(Waiter waiter, Thread newValue) { 1351 waiterThreadUpdater.lazySet(waiter, newValue); 1352 } 1353 1354 @Override 1355 void putNext(Waiter waiter, Waiter newValue) { 1356 waiterNextUpdater.lazySet(waiter, newValue); 1357 } 1358 1359 @Override 1360 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1361 return waitersUpdater.compareAndSet(future, expect, update); 1362 } 1363 1364 @Override 1365 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1366 return listenersUpdater.compareAndSet(future, expect, update); 1367 } 1368 1369 @Override 1370 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1371 return valueUpdater.compareAndSet(future, expect, update); 1372 } 1373 } 1374 1375 /** 1376 * {@link AtomicHelper} based on {@code synchronized} and volatile writes. 1377 * 1378 * <p>This is an implementation of last resort for when certain basic VM features are broken (like 1379 * AtomicReferenceFieldUpdater). 1380 */ 1381 private static final class SynchronizedHelper extends AtomicHelper { 1382 @Override 1383 void putThread(Waiter waiter, Thread newValue) { 1384 waiter.thread = newValue; 1385 } 1386 1387 @Override 1388 void putNext(Waiter waiter, Waiter newValue) { 1389 waiter.next = newValue; 1390 } 1391 1392 @Override 1393 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1394 synchronized (future) { 1395 if (future.waiters == expect) { 1396 future.waiters = update; 1397 return true; 1398 } 1399 return false; 1400 } 1401 } 1402 1403 @Override 1404 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1405 synchronized (future) { 1406 if (future.listeners == expect) { 1407 future.listeners = update; 1408 return true; 1409 } 1410 return false; 1411 } 1412 } 1413 1414 @Override 1415 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1416 synchronized (future) { 1417 if (future.value == expect) { 1418 future.value = update; 1419 return true; 1420 } 1421 return false; 1422 } 1423 } 1424 } 1425 1426 private static CancellationException cancellationExceptionWithCause( 1427 @Nullable String message, @Nullable Throwable cause) { 1428 CancellationException exception = new CancellationException(message); 1429 exception.initCause(cause); 1430 return exception; 1431 } 1432 }