Coverage Summary for Class: AggregateFuture (com.google.common.util.concurrent)
| Class | Method, % | Line, % |
|---|---|---|
| AggregateFuture | 0% (0/14) | 0% (0/78) |
| AggregateFuture$1 | 0% (0/2) | 0% (0/7) |
| AggregateFuture$2 | 0% (0/2) | 0% (0/2) |
| AggregateFuture$ReleaseResourcesReason | 0% (0/1) | 0% (0/3) |
| Total | 0% (0/19) | 0% (0/90) |
1 /* 2 * Copyright (C) 2006 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkState; 19 import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED; 20 import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; 21 import static com.google.common.util.concurrent.Futures.getDone; 22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 23 import static java.util.Objects.requireNonNull; 24 import static java.util.logging.Level.SEVERE; 25 26 import com.google.common.annotations.GwtCompatible; 27 import com.google.common.collect.ImmutableCollection; 28 import com.google.errorprone.annotations.ForOverride; 29 import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper; 30 import java.util.Set; 31 import java.util.concurrent.ExecutionException; 32 import java.util.concurrent.Future; 33 import java.util.logging.Logger; 34 import javax.annotation.CheckForNull; 35 import org.checkerframework.checker.nullness.qual.Nullable; 36 37 /** 38 * A future whose value is derived from a collection of input futures. 39 * 40 * @param <InputT> the type of the individual inputs 41 * @param <OutputT> the type of the output (i.e. this) future 42 */ 43 @GwtCompatible 44 @ElementTypesAreNonnullByDefault 45 abstract class AggregateFuture<InputT extends @Nullable Object, OutputT extends @Nullable Object> 46 extends AggregateFutureState<OutputT> { 47 private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName()); 48 49 /** 50 * The input futures. After {@link #init}, this field is read only by {@link #afterDone()} (to 51 * propagate cancellation) and {@link #toString()}. To access the futures' <i>values</i>, {@code 52 * AggregateFuture} attaches listeners that hold references to one or more inputs. And in the case 53 * of {@link CombinedFuture}, the user-supplied callback usually has its own references to inputs. 54 */ 55 /* 56 * In certain circumstances, this field might theoretically not be visible to an afterDone() call 57 * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture. 58 */ 59 @CheckForNull private ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures; 60 61 private final boolean allMustSucceed; 62 private final boolean collectsValues; 63 64 AggregateFuture( 65 ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures, 66 boolean allMustSucceed, 67 boolean collectsValues) { 68 super(futures.size()); 69 this.futures = checkNotNull(futures); 70 this.allMustSucceed = allMustSucceed; 71 this.collectsValues = collectsValues; 72 } 73 74 @Override 75 protected final void afterDone() { 76 super.afterDone(); 77 78 ImmutableCollection<? extends Future<?>> localFutures = futures; 79 releaseResources(OUTPUT_FUTURE_DONE); // nulls out `futures` 80 81 if (isCancelled() & localFutures != null) { 82 boolean wasInterrupted = wasInterrupted(); 83 for (Future<?> future : localFutures) { 84 future.cancel(wasInterrupted); 85 } 86 } 87 /* 88 * We don't call clearSeenExceptions() until processCompleted(). Prior to that, it may be needed 89 * again if some outstanding input fails. 90 */ 91 } 92 93 @Override 94 @CheckForNull 95 protected final String pendingToString() { 96 ImmutableCollection<? extends Future<?>> localFutures = futures; 97 if (localFutures != null) { 98 return "futures=" + localFutures; 99 } 100 return super.pendingToString(); 101 } 102 103 /** 104 * Must be called at the end of each subclass's constructor. This method performs the "real" 105 * initialization; we can't put this in the constructor because, in the case where futures are 106 * already complete, we would not initialize the subclass before calling {@link 107 * #collectValueFromNonCancelledFuture}. As this is called after the subclass is constructed, 108 * we're guaranteed to have properly initialized the subclass. 109 */ 110 final void init() { 111 /* 112 * requireNonNull is safe because this is called from the constructor after `futures` is set but 113 * before releaseResources could be called (because we have not yet set up any of the listeners 114 * that could call it, nor exposed this Future for users to call cancel() on). 115 */ 116 requireNonNull(futures); 117 118 // Corner case: List is empty. 119 if (futures.isEmpty()) { 120 handleAllCompleted(); 121 return; 122 } 123 124 // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll 125 // need to handle RejectedExecutionException 126 127 if (allMustSucceed) { 128 // We need fail fast, so we have to keep track of which future failed so we can propagate 129 // the exception immediately 130 131 // Register a listener on each Future in the list to update the state of this future. 132 // Note that if all the futures on the list are done prior to completing this loop, the last 133 // call to addListener() will callback to setOneValue(), transitively call our cleanup 134 // listener, and set this.futures to null. 135 // This is not actually a problem, since the foreach only needs this.futures to be non-null 136 // at the beginning of the loop. 137 int i = 0; 138 for (final ListenableFuture<? extends InputT> future : futures) { 139 final int index = i++; 140 future.addListener( 141 new Runnable() { 142 @Override 143 public void run() { 144 try { 145 if (future.isCancelled()) { 146 // Clear futures prior to cancelling children. This sets our own state but lets 147 // the input futures keep running, as some of them may be used elsewhere. 148 futures = null; 149 cancel(false); 150 } else { 151 collectValueFromNonCancelledFuture(index, future); 152 } 153 } finally { 154 /* 155 * "null" means: There is no need to access `futures` again during 156 * `processCompleted` because we're reading each value during a call to 157 * handleOneInputDone. 158 */ 159 decrementCountAndMaybeComplete(null); 160 } 161 } 162 }, 163 directExecutor()); 164 } 165 } else { 166 /* 167 * We'll call the user callback or collect the values only when all inputs complete, 168 * regardless of whether some failed. This lets us avoid calling expensive methods like 169 * Future.get() when we don't need to (specifically, for whenAllComplete().call*()), and it 170 * lets all futures share the same listener. 171 * 172 * We store `localFutures` inside the listener because `this.futures` might be nulled out by 173 * the time the listener runs for the final future -- at which point we need to check all 174 * inputs for exceptions *if* we're collecting values. If we're not, then the listener doesn't 175 * need access to the futures again, so we can just pass `null`. 176 * 177 * TODO(b/112550045): Allocating a single, cheaper listener is (I think) only an optimization. 178 * If we make some other optimizations, this one will no longer be necessary. The optimization 179 * could actually hurt in some cases, as it forces us to keep all inputs in memory until the 180 * final input completes. 181 */ 182 final ImmutableCollection<? extends Future<? extends InputT>> localFutures = 183 collectsValues ? futures : null; 184 Runnable listener = 185 new Runnable() { 186 @Override 187 public void run() { 188 decrementCountAndMaybeComplete(localFutures); 189 } 190 }; 191 for (ListenableFuture<? extends InputT> future : futures) { 192 future.addListener(listener, directExecutor()); 193 } 194 } 195 } 196 197 /** 198 * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the 199 * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the 200 * throwable did not cause this future to fail, and it is the first time we've seen that 201 * particular Throwable. 202 */ 203 private void handleException(Throwable throwable) { 204 checkNotNull(throwable); 205 206 if (allMustSucceed) { 207 // As soon as the first one fails, make that failure the result of the output future. 208 // The results of all other inputs are then ignored (except for logging any failures). 209 boolean completedWithFailure = setException(throwable); 210 if (!completedWithFailure) { 211 // Go up the causal chain to see if we've already seen this cause; if we have, even if 212 // it's wrapped by a different exception, don't log it. 213 boolean firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); 214 if (firstTimeSeeingThisException) { 215 log(throwable); 216 return; 217 } 218 } 219 } 220 221 /* 222 * TODO(cpovirk): Should whenAllComplete().call*() log errors, too? Currently, it doesn't call 223 * handleException() at all. 224 */ 225 if (throwable instanceof Error) { 226 /* 227 * TODO(cpovirk): Do we really want to log this if we called setException(throwable) and it 228 * returned true? This was intentional (CL 46470009), but it seems odd compared to how we 229 * normally handle Error. 230 * 231 * Similarly, do we really want to log the same Error more than once? 232 */ 233 log(throwable); 234 } 235 } 236 237 private static void log(Throwable throwable) { 238 String message = 239 (throwable instanceof Error) 240 ? "Input Future failed with Error" 241 : "Got more than one input Future failure. Logging failures after the first"; 242 logger.log(SEVERE, message, throwable); 243 } 244 245 @Override 246 final void addInitialException(Set<Throwable> seen) { 247 checkNotNull(seen); 248 if (!isCancelled()) { 249 /* 250 * requireNonNull is safe because this is a TrustedFuture, and we're calling this method only 251 * if it has failed. 252 * 253 * TODO(cpovirk): Think about whether we could/should use Verify to check the return value of 254 * addCausalChain. 255 */ 256 boolean unused = addCausalChain(seen, requireNonNull(tryInternalFastPathGetFailure())); 257 } 258 } 259 260 /** 261 * Collects the result (success or failure) of one input future. The input must not have been 262 * cancelled. For details on when this is called, see {@link #collectOneValue}. 263 */ 264 private void collectValueFromNonCancelledFuture(int index, Future<? extends InputT> future) { 265 try { 266 // We get the result, even if collectOneValue is a no-op, so that we can fail fast. 267 collectOneValue(index, getDone(future)); 268 } catch (ExecutionException e) { 269 handleException(e.getCause()); 270 } catch (Throwable t) { 271 handleException(t); 272 } 273 } 274 275 private void decrementCountAndMaybeComplete( 276 @CheckForNull 277 ImmutableCollection<? extends Future<? extends InputT>> 278 futuresIfNeedToCollectAtCompletion) { 279 int newRemaining = decrementRemainingAndGet(); 280 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 281 if (newRemaining == 0) { 282 processCompleted(futuresIfNeedToCollectAtCompletion); 283 } 284 } 285 286 private void processCompleted( 287 @CheckForNull 288 ImmutableCollection<? extends Future<? extends InputT>> 289 futuresIfNeedToCollectAtCompletion) { 290 if (futuresIfNeedToCollectAtCompletion != null) { 291 int i = 0; 292 for (Future<? extends InputT> future : futuresIfNeedToCollectAtCompletion) { 293 if (!future.isCancelled()) { 294 collectValueFromNonCancelledFuture(i, future); 295 } 296 i++; 297 } 298 } 299 clearSeenExceptions(); 300 handleAllCompleted(); 301 /* 302 * Null out fields, including some used in handleAllCompleted() above (like 303 * `CollectionFuture.values`). This might be a no-op: If this future completed during 304 * handleAllCompleted(), they will already have been nulled out. But in the case of 305 * whenAll*().call*(), this future may be pending until the callback runs -- or even longer in 306 * the case of callAsync(), which waits for the callback's returned future to complete. 307 */ 308 releaseResources(ALL_INPUT_FUTURES_PROCESSED); 309 } 310 311 /** 312 * Clears fields that are no longer needed after this future has completed -- or at least all its 313 * inputs have completed (more precisely, after {@link #handleAllCompleted()} has been called). 314 * Often called multiple times (that is, both when the inputs complete and when the output 315 * completes). 316 * 317 * <p>This is similar to our proposed {@code afterCommit} method but not quite the same. See the 318 * description of CL 265462958. 319 */ 320 // TODO(user): Write more tests for memory retention. 321 @ForOverride 322 @OverridingMethodsMustInvokeSuper 323 void releaseResources(ReleaseResourcesReason reason) { 324 checkNotNull(reason); 325 /* 326 * All elements of `futures` are completed, or this future has already completed and read 327 * `futures` into a local variable (in preparation for propagating cancellation to them). In 328 * either case, no one needs to read `futures` for cancellation purposes later. (And 329 * cancellation purposes are the main reason to access `futures`, as discussed in its docs.) 330 */ 331 this.futures = null; 332 } 333 334 enum ReleaseResourcesReason { 335 OUTPUT_FUTURE_DONE, 336 ALL_INPUT_FUTURES_PROCESSED, 337 } 338 339 /** 340 * If {@code allMustSucceed} is true, called as each future completes; otherwise, if {@code 341 * collectsValues} is true, called for each future when all futures complete. 342 */ 343 abstract void collectOneValue(int index, @ParametricNullness InputT returnValue); 344 345 abstract void handleAllCompleted(); 346 347 /** Adds the chain to the seen set, and returns whether all the chain was new to us. */ 348 private static boolean addCausalChain(Set<Throwable> seen, Throwable param) { 349 // Declare a "true" local variable so that the Checker Framework will infer nullness. 350 Throwable t = param; 351 352 for (; t != null; t = t.getCause()) { 353 boolean firstTimeSeen = seen.add(t); 354 if (!firstTimeSeen) { 355 /* 356 * We've seen this, so we've seen its causes, too. No need to re-add them. (There's one case 357 * where this isn't true, but we ignore it: If we record an exception, then someone calls 358 * initCause() on it, and then we examine it again, we'll conclude that we've seen the whole 359 * chain before when it fact we haven't. But this should be rare.) 360 */ 361 return false; 362 } 363 } 364 return true; 365 } 366 }