Coverage Summary for Class: AggregateFuture (com.google.common.util.concurrent)

Class Method, % Line, %
AggregateFuture 0% (0/14) 0% (0/78)
AggregateFuture$1 0% (0/2) 0% (0/7)
AggregateFuture$2 0% (0/2) 0% (0/2)
AggregateFuture$ReleaseResourcesReason 0% (0/1) 0% (0/3)
Total 0% (0/19) 0% (0/90)


1 /* 2  * Copyright (C) 2006 The Guava Authors 3  * 4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5  * in compliance with the License. You may obtain a copy of the License at 6  * 7  * http://www.apache.org/licenses/LICENSE-2.0 8  * 9  * Unless required by applicable law or agreed to in writing, software distributed under the License 10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11  * or implied. See the License for the specific language governing permissions and limitations under 12  * the License. 13  */ 14  15 package com.google.common.util.concurrent; 16  17 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkState; 19 import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED; 20 import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; 21 import static com.google.common.util.concurrent.Futures.getDone; 22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 23 import static java.util.Objects.requireNonNull; 24 import static java.util.logging.Level.SEVERE; 25  26 import com.google.common.annotations.GwtCompatible; 27 import com.google.common.collect.ImmutableCollection; 28 import com.google.errorprone.annotations.ForOverride; 29 import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper; 30 import java.util.Set; 31 import java.util.concurrent.ExecutionException; 32 import java.util.concurrent.Future; 33 import java.util.logging.Logger; 34 import javax.annotation.CheckForNull; 35 import org.checkerframework.checker.nullness.qual.Nullable; 36  37 /** 38  * A future whose value is derived from a collection of input futures. 39  * 40  * @param <InputT> the type of the individual inputs 41  * @param <OutputT> the type of the output (i.e. this) future 42  */ 43 @GwtCompatible 44 @ElementTypesAreNonnullByDefault 45 abstract class AggregateFuture<InputT extends @Nullable Object, OutputT extends @Nullable Object> 46  extends AggregateFutureState<OutputT> { 47  private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName()); 48  49  /** 50  * The input futures. After {@link #init}, this field is read only by {@link #afterDone()} (to 51  * propagate cancellation) and {@link #toString()}. To access the futures' <i>values</i>, {@code 52  * AggregateFuture} attaches listeners that hold references to one or more inputs. And in the case 53  * of {@link CombinedFuture}, the user-supplied callback usually has its own references to inputs. 54  */ 55  /* 56  * In certain circumstances, this field might theoretically not be visible to an afterDone() call 57  * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture. 58  */ 59  @CheckForNull private ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures; 60  61  private final boolean allMustSucceed; 62  private final boolean collectsValues; 63  64  AggregateFuture( 65  ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures, 66  boolean allMustSucceed, 67  boolean collectsValues) { 68  super(futures.size()); 69  this.futures = checkNotNull(futures); 70  this.allMustSucceed = allMustSucceed; 71  this.collectsValues = collectsValues; 72  } 73  74  @Override 75  protected final void afterDone() { 76  super.afterDone(); 77  78  ImmutableCollection<? extends Future<?>> localFutures = futures; 79  releaseResources(OUTPUT_FUTURE_DONE); // nulls out `futures` 80  81  if (isCancelled() & localFutures != null) { 82  boolean wasInterrupted = wasInterrupted(); 83  for (Future<?> future : localFutures) { 84  future.cancel(wasInterrupted); 85  } 86  } 87  /* 88  * We don't call clearSeenExceptions() until processCompleted(). Prior to that, it may be needed 89  * again if some outstanding input fails. 90  */ 91  } 92  93  @Override 94  @CheckForNull 95  protected final String pendingToString() { 96  ImmutableCollection<? extends Future<?>> localFutures = futures; 97  if (localFutures != null) { 98  return "futures=" + localFutures; 99  } 100  return super.pendingToString(); 101  } 102  103  /** 104  * Must be called at the end of each subclass's constructor. This method performs the "real" 105  * initialization; we can't put this in the constructor because, in the case where futures are 106  * already complete, we would not initialize the subclass before calling {@link 107  * #collectValueFromNonCancelledFuture}. As this is called after the subclass is constructed, 108  * we're guaranteed to have properly initialized the subclass. 109  */ 110  final void init() { 111  /* 112  * requireNonNull is safe because this is called from the constructor after `futures` is set but 113  * before releaseResources could be called (because we have not yet set up any of the listeners 114  * that could call it, nor exposed this Future for users to call cancel() on). 115  */ 116  requireNonNull(futures); 117  118  // Corner case: List is empty. 119  if (futures.isEmpty()) { 120  handleAllCompleted(); 121  return; 122  } 123  124  // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll 125  // need to handle RejectedExecutionException 126  127  if (allMustSucceed) { 128  // We need fail fast, so we have to keep track of which future failed so we can propagate 129  // the exception immediately 130  131  // Register a listener on each Future in the list to update the state of this future. 132  // Note that if all the futures on the list are done prior to completing this loop, the last 133  // call to addListener() will callback to setOneValue(), transitively call our cleanup 134  // listener, and set this.futures to null. 135  // This is not actually a problem, since the foreach only needs this.futures to be non-null 136  // at the beginning of the loop. 137  int i = 0; 138  for (final ListenableFuture<? extends InputT> future : futures) { 139  final int index = i++; 140  future.addListener( 141  new Runnable() { 142  @Override 143  public void run() { 144  try { 145  if (future.isCancelled()) { 146  // Clear futures prior to cancelling children. This sets our own state but lets 147  // the input futures keep running, as some of them may be used elsewhere. 148  futures = null; 149  cancel(false); 150  } else { 151  collectValueFromNonCancelledFuture(index, future); 152  } 153  } finally { 154  /* 155  * "null" means: There is no need to access `futures` again during 156  * `processCompleted` because we're reading each value during a call to 157  * handleOneInputDone. 158  */ 159  decrementCountAndMaybeComplete(null); 160  } 161  } 162  }, 163  directExecutor()); 164  } 165  } else { 166  /* 167  * We'll call the user callback or collect the values only when all inputs complete, 168  * regardless of whether some failed. This lets us avoid calling expensive methods like 169  * Future.get() when we don't need to (specifically, for whenAllComplete().call*()), and it 170  * lets all futures share the same listener. 171  * 172  * We store `localFutures` inside the listener because `this.futures` might be nulled out by 173  * the time the listener runs for the final future -- at which point we need to check all 174  * inputs for exceptions *if* we're collecting values. If we're not, then the listener doesn't 175  * need access to the futures again, so we can just pass `null`. 176  * 177  * TODO(b/112550045): Allocating a single, cheaper listener is (I think) only an optimization. 178  * If we make some other optimizations, this one will no longer be necessary. The optimization 179  * could actually hurt in some cases, as it forces us to keep all inputs in memory until the 180  * final input completes. 181  */ 182  final ImmutableCollection<? extends Future<? extends InputT>> localFutures = 183  collectsValues ? futures : null; 184  Runnable listener = 185  new Runnable() { 186  @Override 187  public void run() { 188  decrementCountAndMaybeComplete(localFutures); 189  } 190  }; 191  for (ListenableFuture<? extends InputT> future : futures) { 192  future.addListener(listener, directExecutor()); 193  } 194  } 195  } 196  197  /** 198  * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the 199  * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the 200  * throwable did not cause this future to fail, and it is the first time we've seen that 201  * particular Throwable. 202  */ 203  private void handleException(Throwable throwable) { 204  checkNotNull(throwable); 205  206  if (allMustSucceed) { 207  // As soon as the first one fails, make that failure the result of the output future. 208  // The results of all other inputs are then ignored (except for logging any failures). 209  boolean completedWithFailure = setException(throwable); 210  if (!completedWithFailure) { 211  // Go up the causal chain to see if we've already seen this cause; if we have, even if 212  // it's wrapped by a different exception, don't log it. 213  boolean firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); 214  if (firstTimeSeeingThisException) { 215  log(throwable); 216  return; 217  } 218  } 219  } 220  221  /* 222  * TODO(cpovirk): Should whenAllComplete().call*() log errors, too? Currently, it doesn't call 223  * handleException() at all. 224  */ 225  if (throwable instanceof Error) { 226  /* 227  * TODO(cpovirk): Do we really want to log this if we called setException(throwable) and it 228  * returned true? This was intentional (CL 46470009), but it seems odd compared to how we 229  * normally handle Error. 230  * 231  * Similarly, do we really want to log the same Error more than once? 232  */ 233  log(throwable); 234  } 235  } 236  237  private static void log(Throwable throwable) { 238  String message = 239  (throwable instanceof Error) 240  ? "Input Future failed with Error" 241  : "Got more than one input Future failure. Logging failures after the first"; 242  logger.log(SEVERE, message, throwable); 243  } 244  245  @Override 246  final void addInitialException(Set<Throwable> seen) { 247  checkNotNull(seen); 248  if (!isCancelled()) { 249  /* 250  * requireNonNull is safe because this is a TrustedFuture, and we're calling this method only 251  * if it has failed. 252  * 253  * TODO(cpovirk): Think about whether we could/should use Verify to check the return value of 254  * addCausalChain. 255  */ 256  boolean unused = addCausalChain(seen, requireNonNull(tryInternalFastPathGetFailure())); 257  } 258  } 259  260  /** 261  * Collects the result (success or failure) of one input future. The input must not have been 262  * cancelled. For details on when this is called, see {@link #collectOneValue}. 263  */ 264  private void collectValueFromNonCancelledFuture(int index, Future<? extends InputT> future) { 265  try { 266  // We get the result, even if collectOneValue is a no-op, so that we can fail fast. 267  collectOneValue(index, getDone(future)); 268  } catch (ExecutionException e) { 269  handleException(e.getCause()); 270  } catch (Throwable t) { 271  handleException(t); 272  } 273  } 274  275  private void decrementCountAndMaybeComplete( 276  @CheckForNull 277  ImmutableCollection<? extends Future<? extends InputT>> 278  futuresIfNeedToCollectAtCompletion) { 279  int newRemaining = decrementRemainingAndGet(); 280  checkState(newRemaining >= 0, "Less than 0 remaining futures"); 281  if (newRemaining == 0) { 282  processCompleted(futuresIfNeedToCollectAtCompletion); 283  } 284  } 285  286  private void processCompleted( 287  @CheckForNull 288  ImmutableCollection<? extends Future<? extends InputT>> 289  futuresIfNeedToCollectAtCompletion) { 290  if (futuresIfNeedToCollectAtCompletion != null) { 291  int i = 0; 292  for (Future<? extends InputT> future : futuresIfNeedToCollectAtCompletion) { 293  if (!future.isCancelled()) { 294  collectValueFromNonCancelledFuture(i, future); 295  } 296  i++; 297  } 298  } 299  clearSeenExceptions(); 300  handleAllCompleted(); 301  /* 302  * Null out fields, including some used in handleAllCompleted() above (like 303  * `CollectionFuture.values`). This might be a no-op: If this future completed during 304  * handleAllCompleted(), they will already have been nulled out. But in the case of 305  * whenAll*().call*(), this future may be pending until the callback runs -- or even longer in 306  * the case of callAsync(), which waits for the callback's returned future to complete. 307  */ 308  releaseResources(ALL_INPUT_FUTURES_PROCESSED); 309  } 310  311  /** 312  * Clears fields that are no longer needed after this future has completed -- or at least all its 313  * inputs have completed (more precisely, after {@link #handleAllCompleted()} has been called). 314  * Often called multiple times (that is, both when the inputs complete and when the output 315  * completes). 316  * 317  * <p>This is similar to our proposed {@code afterCommit} method but not quite the same. See the 318  * description of CL 265462958. 319  */ 320  // TODO(user): Write more tests for memory retention. 321  @ForOverride 322  @OverridingMethodsMustInvokeSuper 323  void releaseResources(ReleaseResourcesReason reason) { 324  checkNotNull(reason); 325  /* 326  * All elements of `futures` are completed, or this future has already completed and read 327  * `futures` into a local variable (in preparation for propagating cancellation to them). In 328  * either case, no one needs to read `futures` for cancellation purposes later. (And 329  * cancellation purposes are the main reason to access `futures`, as discussed in its docs.) 330  */ 331  this.futures = null; 332  } 333  334  enum ReleaseResourcesReason { 335  OUTPUT_FUTURE_DONE, 336  ALL_INPUT_FUTURES_PROCESSED, 337  } 338  339  /** 340  * If {@code allMustSucceed} is true, called as each future completes; otherwise, if {@code 341  * collectsValues} is true, called for each future when all futures complete. 342  */ 343  abstract void collectOneValue(int index, @ParametricNullness InputT returnValue); 344  345  abstract void handleAllCompleted(); 346  347  /** Adds the chain to the seen set, and returns whether all the chain was new to us. */ 348  private static boolean addCausalChain(Set<Throwable> seen, Throwable param) { 349  // Declare a "true" local variable so that the Checker Framework will infer nullness. 350  Throwable t = param; 351  352  for (; t != null; t = t.getCause()) { 353  boolean firstTimeSeen = seen.add(t); 354  if (!firstTimeSeen) { 355  /* 356  * We've seen this, so we've seen its causes, too. No need to re-add them. (There's one case 357  * where this isn't true, but we ignore it: If we record an exception, then someone calls 358  * initCause() on it, and then we examine it again, we'll conclude that we've seen the whole 359  * chain before when it fact we haven't. But this should be rare.) 360  */ 361  return false; 362  } 363  } 364  return true; 365  } 366 }