Coverage Summary for Class: ClosingFuture (com.google.common.util.concurrent)

Class Method, % Line, %
ClosingFuture 0% (0/38) 0% (0/100)
ClosingFuture$1 0% (0/2) 0% (0/2)
ClosingFuture$10 0% (0/2) 0% (0/2)
ClosingFuture$11 0% (0/2) 0% (0/5)
ClosingFuture$12 0% (0/1) 0% (0/1)
ClosingFuture$2 0% (0/3) 0% (0/3)
ClosingFuture$3 0% (0/3) 0% (0/8)
ClosingFuture$4 0% (0/3) 0% (0/3)
ClosingFuture$5 0% (0/3) 0% (0/3)
ClosingFuture$6 0% (0/2) 0% (0/2)
ClosingFuture$7 0% (0/3) 0% (0/3)
ClosingFuture$8 0% (0/3) 0% (0/3)
ClosingFuture$9 0% (0/2) 0% (0/4)
ClosingFuture$CloseableList 0% (0/7) 0% (0/40)
ClosingFuture$Combiner 0% (0/7) 0% (0/22)
ClosingFuture$Combiner$1 0% (0/3) 0% (0/3)
ClosingFuture$Combiner$2 0% (0/3) 0% (0/3)
ClosingFuture$Combiner$3 0% (0/2) 0% (0/2)
ClosingFuture$Combiner2 0% (0/4) 0% (0/6)
ClosingFuture$Combiner2$1 0% (0/3) 0% (0/3)
ClosingFuture$Combiner2$2 0% (0/3) 0% (0/3)
ClosingFuture$Combiner3 0% (0/4) 0% (0/7)
ClosingFuture$Combiner3$1 0% (0/3) 0% (0/6)
ClosingFuture$Combiner3$2 0% (0/3) 0% (0/6)
ClosingFuture$Combiner4 0% (0/4) 0% (0/8)
ClosingFuture$Combiner4$1 0% (0/3) 0% (0/7)
ClosingFuture$Combiner4$2 0% (0/3) 0% (0/7)
ClosingFuture$Combiner5 0% (0/4) 0% (0/9)
ClosingFuture$Combiner5$1 0% (0/3) 0% (0/8)
ClosingFuture$Combiner5$2 0% (0/3) 0% (0/8)
ClosingFuture$DeferredCloser 0% (0/2) 0% (0/6)
ClosingFuture$Peeker 0% (0/5) 0% (0/20)
ClosingFuture$State 0% (0/1) 0% (0/7)
ClosingFuture$ValueAndCloser 0% (0/3) 0% (0/4)
Total 0% (0/140) 0% (0/324)


1 /* 2  * Copyright (C) 2017 The Guava Authors 3  * 4  * Licensed under the Apache License, Version 2.0 (the "License"); 5  * you may not use this file except in compliance with the License. 6  * You may obtain a copy of the License at 7  * 8  * http://www.apache.org/licenses/LICENSE-2.0 9  * 10  * Unless required by applicable law or agreed to in writing, software 11  * distributed under the License is distributed on an "AS IS" BASIS, 12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13  * See the License for the specific language governing permissions and 14  * limitations under the License. 15  */ 16  17 package com.google.common.util.concurrent; 18  19 import static com.google.common.base.Functions.constant; 20 import static com.google.common.base.MoreObjects.toStringHelper; 21 import static com.google.common.base.Preconditions.checkArgument; 22 import static com.google.common.base.Preconditions.checkNotNull; 23 import static com.google.common.base.Preconditions.checkState; 24 import static com.google.common.collect.Lists.asList; 25 import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 26 import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 27 import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 28 import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 29 import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 30 import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 31 import static com.google.common.util.concurrent.Futures.getDone; 32 import static com.google.common.util.concurrent.Futures.immediateFuture; 33 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 34 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 35 import static java.util.logging.Level.FINER; 36 import static java.util.logging.Level.SEVERE; 37 import static java.util.logging.Level.WARNING; 38  39 import com.google.common.annotations.Beta; 40 import com.google.common.annotations.VisibleForTesting; 41 import com.google.common.base.Function; 42 import com.google.common.collect.FluentIterable; 43 import com.google.common.collect.ImmutableList; 44 import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 45 import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 46 import com.google.common.util.concurrent.Futures.FutureCombiner; 47 import com.google.errorprone.annotations.CanIgnoreReturnValue; 48 import com.google.errorprone.annotations.DoNotMock; 49 import com.google.j2objc.annotations.RetainedWith; 50 import java.io.Closeable; 51 import java.util.IdentityHashMap; 52 import java.util.Map; 53 import java.util.concurrent.Callable; 54 import java.util.concurrent.CancellationException; 55 import java.util.concurrent.CountDownLatch; 56 import java.util.concurrent.ExecutionException; 57 import java.util.concurrent.Executor; 58 import java.util.concurrent.Future; 59 import java.util.concurrent.RejectedExecutionException; 60 import java.util.concurrent.atomic.AtomicReference; 61 import java.util.logging.Logger; 62 import org.checkerframework.checker.nullness.qual.Nullable; 63  64 /** 65  * A step in a pipeline of an asynchronous computation. When the last step in the computation is 66  * complete, some objects captured during the computation are closed. 67  * 68  * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 69  * asynchronously-computed intermediate value, or else an exception that indicates the failure or 70  * cancellation of the operation so far. The only way to extract the value or exception from a step 71  * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 72  * "value" of a successful step or the "result" (value or exception) of any step. 73  * 74  * <ol> 75  * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 76  * block or a {@link ListenableFuture}. 77  * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 78  * can be captured for later closing. 79  * <li>There is one last step (the root of the tree), from which you can extract the final result 80  * of the computation. After that result is available (or the computation fails), all objects 81  * captured by any of the steps in the pipeline are closed. 82  * </ol> 83  * 84  * <h3>Starting a pipeline</h3> 85  * 86  * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 87  * callable block} that may capture objects for later closing. To start a pipeline from a {@link 88  * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 89  * #from(ListenableFuture)} instead. 90  * 91  * <h3>Derived steps</h3> 92  * 93  * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 94  * ways similar to {@link FluentFuture}s: 95  * 96  * <ul> 97  * <li>by transforming the value from a successful input step, 98  * <li>by catching the exception from a failed input step, or 99  * <li>by combining the results of several input steps. 100  * </ul> 101  * 102  * Each derivation can capture the next value or any intermediate objects for later closing. 103  * 104  * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 105  * exception, or combine it with others, you cannot do anything else with it, including declare it 106  * to be the last step of the pipeline. 107  * 108  * <h4>Transforming</h4> 109  * 110  * To derive the next step by asynchronously applying a function to an input step's value, call 111  * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 112  * Executor)} on the input step. 113  * 114  * <h4>Catching</h4> 115  * 116  * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 117  * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 118  * 119  * <h4>Combining</h4> 120  * 121  * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 122  * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 123  * 124  * <h3>Cancelling</h3> 125  * 126  * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 127  * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 128  * successfully cancelled step will immediately start closing all objects captured for later closing 129  * by it and by its input steps. 130  * 131  * <h3>Ending a pipeline</h3> 132  * 133  * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 134  * close the captured objects automatically or manually. 135  * 136  * <h4>Automatically closing</h4> 137  * 138  * You can extract a {@link Future} that represents the result of the last step in the pipeline by 139  * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin 140  * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future 141  * completes before closing starts, rather than once it has finished. 142  * 143  * <pre>{@code 144  * FluentFuture<UserName> userName = 145  * ClosingFuture.submit( 146  * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 147  * executor) 148  * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 149  * .transform((closer, result) -> result.get("userName"), directExecutor()) 150  * .catching(DBException.class, e -> "no user", directExecutor()) 151  * .finishToFuture(); 152  * }</pre> 153  * 154  * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 155  * result cursor will both be closed, even if the operation is cancelled or fails. 156  * 157  * <h4>Manually closing</h4> 158  * 159  * If you want to close the captured objects manually, after you've used the final result, call 160  * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 161  * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 162  * 163  * <pre>{@code 164  * ClosingFuture.submit( 165  * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 166  * executor) 167  * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 168  * .transform((closer, result) -> result.get("userName"), directExecutor()) 169  * .catching(DBException.class, e -> "no user", directExecutor()) 170  * .finishToValueAndCloser( 171  * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 172  * 173  * // later 174  * try { // get() will throw if the operation failed or was cancelled. 175  * UserName userName = userNameValueAndCloser.get(); 176  * // do something with userName 177  * } finally { 178  * userNameValueAndCloser.closeAsync(); 179  * } 180  * }</pre> 181  * 182  * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 183  * the query result cursor will both be closed, even if the operation is cancelled or fails. 184  * 185  * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 186  * automatic-closing approach described above is safer. 187  * 188  * @param <V> the type of the value of this step 189  * @since 30.0 190  */ 191 // TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 192 @Beta // @Beta for one release. 193 @DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 194 // TODO(dpb): GWT compatibility. 195 public final class ClosingFuture<V> { 196  197  private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName()); 198  199  /** 200  * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 201  * done. 202  */ 203  public static final class DeferredCloser { 204  @RetainedWith private final CloseableList list; 205  206  DeferredCloser(CloseableList list) { 207  this.list = list; 208  } 209  210  /** 211  * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 212  * 213  * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 214  * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 215  * Closeable}. (For more about the flavors, see <a 216  * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 217  * build</a>.) 218  * 219  * <p>Be careful when targeting an older SDK than you are building against (most commonly when 220  * building for Android): Ensure that any object you pass implements the interface not just in 221  * your current SDK version but also at the oldest version you support. For example, <a 222  * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 223  * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 224  * {@code Closeable} with a method reference like {@code cursor::close}. 225  * 226  * <p>Note that this method is still binary-compatible between flavors because the erasure of 227  * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 228  * 229  * @param closeable the object to be closed (see notes above) 230  * @param closingExecutor the object will be closed on this executor 231  * @return the first argument 232  */ 233  @CanIgnoreReturnValue 234  public <C extends @Nullable Object & @Nullable AutoCloseable> C eventuallyClose( 235  C closeable, Executor closingExecutor) { 236  checkNotNull(closingExecutor); 237  if (closeable != null) { 238  list.add(closeable, closingExecutor); 239  } 240  return closeable; 241  } 242  } 243  244  /** 245  * An operation that computes a result. 246  * 247  * @param <V> the type of the result 248  */ 249  @FunctionalInterface 250  public interface ClosingCallable<V extends @Nullable Object> { 251  /** 252  * Computes a result, or throws an exception if unable to do so. 253  * 254  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 255  * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 256  * not before this method completes), even if this method throws or the pipeline is cancelled. 257  */ 258  V call(DeferredCloser closer) throws Exception; 259  } 260  261  /** 262  * An operation that computes a {@link ClosingFuture} of a result. 263  * 264  * @param <V> the type of the result 265  * @since 30.1 266  */ 267  @FunctionalInterface 268  public interface AsyncClosingCallable<V extends @Nullable Object> { 269  /** 270  * Computes a result, or throws an exception if unable to do so. 271  * 272  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 273  * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 274  * not before this method completes), even if this method throws or the pipeline is cancelled. 275  */ 276  ClosingFuture<V> call(DeferredCloser closer) throws Exception; 277  } 278  279  /** 280  * A function from an input to a result. 281  * 282  * @param <T> the type of the input to the function 283  * @param <U> the type of the result of the function 284  */ 285  @FunctionalInterface 286  public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 287  288  /** 289  * Applies this function to an input, or throws an exception if unable to do so. 290  * 291  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 292  * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 293  * not before this method completes), even if this method throws or the pipeline is cancelled. 294  */ 295  U apply(DeferredCloser closer, T input) throws Exception; 296  } 297  298  /** 299  * A function from an input to a {@link ClosingFuture} of a result. 300  * 301  * @param <T> the type of the input to the function 302  * @param <U> the type of the result of the function 303  */ 304  @FunctionalInterface 305  public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 306  /** 307  * Applies this function to an input, or throws an exception if unable to do so. 308  * 309  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 310  * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 311  * not before this method completes), even if this method throws or the pipeline is cancelled. 312  */ 313  ClosingFuture<U> apply(DeferredCloser closer, T input) throws Exception; 314  } 315  316  /** 317  * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 318  * allows the user to close all the closeable objects that were captured during it for later 319  * closing. 320  * 321  * <p>The asynchronous operation will have completed before this object is created. 322  * 323  * @param <V> the type of the value of a successful operation 324  * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 325  */ 326  public static final class ValueAndCloser<V> { 327  328  private final ClosingFuture<? extends V> closingFuture; 329  330  ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 331  this.closingFuture = checkNotNull(closingFuture); 332  } 333  334  /** 335  * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 336  * {@link Future#get()} would. 337  * 338  * <p>Because the asynchronous operation has already completed, this method is synchronous and 339  * returns immediately. 340  * 341  * @throws CancellationException if the computation was cancelled 342  * @throws ExecutionException if the computation threw an exception 343  */ 344  @Nullable 345  public V get() throws ExecutionException { 346  return getDone(closingFuture.future); 347  } 348  349  /** 350  * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 351  * operation on the {@link Executor}s specified by calls to {@link 352  * DeferredCloser#eventuallyClose(Closeable, Executor)}. 353  * 354  * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 355  * closed synchronously. 356  * 357  * <p>Idempotent: objects will be closed at most once. 358  */ 359  public void closeAsync() { 360  closingFuture.close(); 361  } 362  } 363  364  /** 365  * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 366  * ClosingFuture} pipeline. 367  * 368  * @param <V> the type of the final value of a successful pipeline 369  * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 370  */ 371  @FunctionalInterface 372  public interface ValueAndCloserConsumer<V> { 373  374  /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 375  void accept(ValueAndCloser<V> valueAndCloser); 376  } 377  378  /** 379  * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 380  * 381  * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 382  * execution 383  */ 384  public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) { 385  return new ClosingFuture<>(callable, executor); 386  } 387  388  /** 389  * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 390  * 391  * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 392  * execution 393  * @since 30.1 394  */ 395  public static <V> ClosingFuture<V> submitAsync( 396  AsyncClosingCallable<V> callable, Executor executor) { 397  return new ClosingFuture<>(callable, executor); 398  } 399  400  /** 401  * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 402  * 403  * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 404  * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 405  * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 406  */ 407  public static <V> ClosingFuture<V> from(ListenableFuture<V> future) { 408  return new ClosingFuture<V>(future); 409  } 410  411  /** 412  * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 413  * 414  * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when 415  * the pipeline is done, even if the pipeline is canceled or fails. 416  * 417  * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 418  * value in order to close it. 419  * 420  * @param future the future to create the {@code ClosingFuture} from. For discussion of the 421  * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable, 422  * Executor)}. 423  * @param closingExecutor the future's result will be closed on this executor 424  * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 425  * underlying value may never be closed if the {@link Future} is canceled after its operation 426  * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 427  * including those that pass them to this method, with {@link #submit(ClosingCallable, 428  * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 429  * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 430  * ClosingFuture#from}. 431  */ 432  @Deprecated 433  public static <C extends @Nullable Object & @Nullable AutoCloseable> 434  ClosingFuture<C> eventuallyClosing( 435  ListenableFuture<C> future, final Executor closingExecutor) { 436  checkNotNull(closingExecutor); 437  final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 438  Futures.addCallback( 439  future, 440  new FutureCallback<@Nullable AutoCloseable>() { 441  @Override 442  public void onSuccess(@Nullable AutoCloseable result) { 443  closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 444  } 445  446  @Override 447  public void onFailure(Throwable t) {} 448  }, 449  directExecutor()); 450  return closingFuture; 451  } 452  453  /** 454  * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 455  * 456  * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 457  * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 458  */ 459  public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 460  return new Combiner(false, futures); 461  } 462  463  /** 464  * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 465  * 466  * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 467  * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 468  */ 469  public static Combiner whenAllComplete( 470  ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 471  return whenAllComplete(asList(future1, moreFutures)); 472  } 473  474  /** 475  * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 476  * all succeed. If any fail, the resulting pipeline will fail. 477  * 478  * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 479  * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 480  */ 481  public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 482  return new Combiner(true, futures); 483  } 484  485  /** 486  * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 487  * they all succeed. If any fail, the resulting pipeline will fail. 488  * 489  * <p>Calling this method allows you to use lambdas or method references typed with the types of 490  * the input {@link ClosingFuture}s. 491  * 492  * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 493  * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 494  */ 495  public static <V1, V2> Combiner2<V1, V2> whenAllSucceed( 496  ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 497  return new Combiner2<>(future1, future2); 498  } 499  500  /** 501  * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 502  * they all succeed. If any fail, the resulting pipeline will fail. 503  * 504  * <p>Calling this method allows you to use lambdas or method references typed with the types of 505  * the input {@link ClosingFuture}s. 506  * 507  * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 508  * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 509  */ 510  public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed( 511  ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 512  return new Combiner3<>(future1, future2, future3); 513  } 514  515  /** 516  * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 517  * they all succeed. If any fail, the resulting pipeline will fail. 518  * 519  * <p>Calling this method allows you to use lambdas or method references typed with the types of 520  * the input {@link ClosingFuture}s. 521  * 522  * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 523  * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 524  */ 525  public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed( 526  ClosingFuture<V1> future1, 527  ClosingFuture<V2> future2, 528  ClosingFuture<V3> future3, 529  ClosingFuture<V4> future4) { 530  return new Combiner4<>(future1, future2, future3, future4); 531  } 532  533  /** 534  * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 535  * they all succeed. If any fail, the resulting pipeline will fail. 536  * 537  * <p>Calling this method allows you to use lambdas or method references typed with the types of 538  * the input {@link ClosingFuture}s. 539  * 540  * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 541  * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 542  */ 543  public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 544  ClosingFuture<V1> future1, 545  ClosingFuture<V2> future2, 546  ClosingFuture<V3> future3, 547  ClosingFuture<V4> future4, 548  ClosingFuture<V5> future5) { 549  return new Combiner5<>(future1, future2, future3, future4, future5); 550  } 551  552  /** 553  * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 554  * all succeed. If any fail, the resulting pipeline will fail. 555  * 556  * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 557  * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 558  */ 559  public static Combiner whenAllSucceed( 560  ClosingFuture<?> future1, 561  ClosingFuture<?> future2, 562  ClosingFuture<?> future3, 563  ClosingFuture<?> future4, 564  ClosingFuture<?> future5, 565  ClosingFuture<?> future6, 566  ClosingFuture<?>... moreFutures) { 567  return whenAllSucceed( 568  FluentIterable.of(future1, future2, future3, future4, future5, future6) 569  .append(moreFutures)); 570  } 571  572  private final AtomicReference<State> state = new AtomicReference<>(OPEN); 573  private final CloseableList closeables = new CloseableList(); 574  private final FluentFuture<V> future; 575  576  private ClosingFuture(ListenableFuture<V> future) { 577  this.future = FluentFuture.from(future); 578  } 579  580  private ClosingFuture(final ClosingCallable<V> callable, Executor executor) { 581  checkNotNull(callable); 582  TrustedListenableFutureTask<V> task = 583  TrustedListenableFutureTask.create( 584  new Callable<V>() { 585  @Override 586  public V call() throws Exception { 587  return callable.call(closeables.closer); 588  } 589  590  @Override 591  public String toString() { 592  return callable.toString(); 593  } 594  }); 595  executor.execute(task); 596  this.future = task; 597  } 598  599  private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) { 600  checkNotNull(callable); 601  TrustedListenableFutureTask<V> task = 602  TrustedListenableFutureTask.create( 603  new AsyncCallable<V>() { 604  @Override 605  public ListenableFuture<V> call() throws Exception { 606  CloseableList newCloseables = new CloseableList(); 607  try { 608  ClosingFuture<V> closingFuture = callable.call(newCloseables.closer); 609  closingFuture.becomeSubsumedInto(closeables); 610  return closingFuture.future; 611  } finally { 612  closeables.add(newCloseables, directExecutor()); 613  } 614  } 615  616  @Override 617  public String toString() { 618  return callable.toString(); 619  } 620  }); 621  executor.execute(task); 622  this.future = task; 623  } 624  625  /** 626  * Returns a future that finishes when this step does. Calling {@code get()} on the returned 627  * future returns {@code null} if the step is successful or throws the same exception that would 628  * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 629  * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 630  * 631  * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 632  * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 633  * a derivation method <i>on the same instance</i>. This is important because calling {@code 634  * statusFuture} alone does not provide a way to close the pipeline. 635  */ 636  public ListenableFuture<?> statusFuture() { 637  return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 638  } 639  640  /** 641  * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 642  * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 643  * when the pipeline is done. 644  * 645  * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 646  * ClosingFuture} will be equivalent to this one. 647  * 648  * <p>If the function throws an exception, that exception is used as the result of the derived 649  * {@code ClosingFuture}. 650  * 651  * <p>Example usage: 652  * 653  * <pre>{@code 654  * ClosingFuture<List<Row>> rowsFuture = 655  * queryFuture.transform((closer, result) -> result.getRows(), executor); 656  * }</pre> 657  * 658  * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 659  * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 660  * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 661  * 662  * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 663  * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 664  * this {@code ClosingFuture}. 665  * 666  * @param function transforms the value of this step to the value of the derived step 667  * @param executor executor to run the function in 668  * @return the derived step 669  * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 670  * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 671  * finished} 672  */ 673  public <U> ClosingFuture<U> transform( 674  final ClosingFunction<? super V, U> function, Executor executor) { 675  checkNotNull(function); 676  AsyncFunction<V, U> applyFunction = 677  new AsyncFunction<V, U>() { 678  @Override 679  public ListenableFuture<U> apply(V input) throws Exception { 680  return closeables.applyClosingFunction(function, input); 681  } 682  683  @Override 684  public String toString() { 685  return function.toString(); 686  } 687  }; 688  // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 689  return derive(future.transformAsync(applyFunction, executor)); 690  } 691  692  /** 693  * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 694  * that returns a {@code ClosingFuture} to its value. The function can use a {@link 695  * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 696  * captured by the returned {@link ClosingFuture}). 697  * 698  * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 699  * returned by the function. 700  * 701  * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 702  * ClosingFuture} will be equivalent to this one. 703  * 704  * <p>If the function throws an exception, that exception is used as the result of the derived 705  * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 706  * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 707  * closed. 708  * 709  * <p>Usage guidelines for this method: 710  * 711  * <ul> 712  * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 713  * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 714  * Executor)} instead, with a function that returns the next value directly. 715  * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 716  * for every closeable object this step creates in order to capture it for later closing. 717  * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 718  * ClosingFuture} call {@link #from(ListenableFuture)}. 719  * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 720  * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 721  * {@link #withoutCloser(AsyncFunction)} 722  * </ul> 723  * 724  * <p>Example usage: 725  * 726  * <pre>{@code 727  * // Result.getRowsClosingFuture() returns a ClosingFuture. 728  * ClosingFuture<List<Row>> rowsFuture = 729  * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 730  * 731  * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 732  * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 733  * // Closeable). 734  * ClosingFuture<Integer> rowsFuture2 = 735  * queryFuture.transformAsync( 736  * (closer, result) -> { 737  * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 738  * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 739  * }, 740  * executor); 741  * 742  * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 743  * ClosingFuture<List<Row>> rowsFuture3 = 744  * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 745  * 746  * }</pre> 747  * 748  * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 749  * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 750  * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 751  * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 752  * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 753  * responsible for completing the returned {@code ClosingFuture}.) 754  * 755  * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 756  * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 757  * this {@code ClosingFuture}. 758  * 759  * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 760  * the derived step 761  * @param executor executor to run the function in 762  * @return the derived step 763  * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 764  * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 765  * finished} 766  */ 767  public <U> ClosingFuture<U> transformAsync( 768  final AsyncClosingFunction<? super V, U> function, Executor executor) { 769  checkNotNull(function); 770  AsyncFunction<V, U> applyFunction = 771  new AsyncFunction<V, U>() { 772  @Override 773  public ListenableFuture<U> apply(V input) throws Exception { 774  return closeables.applyAsyncClosingFunction(function, input); 775  } 776  777  @Override 778  public String toString() { 779  return function.toString(); 780  } 781  }; 782  return derive(future.transformAsync(applyFunction, executor)); 783  } 784  785  /** 786  * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 787  * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 788  * {@link ListenableFuture}. 789  * 790  * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 791  * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 792  * meets these conditions: 793  * 794  * <ul> 795  * <li>It does not need to capture any {@link Closeable} objects by calling {@link 796  * DeferredCloser#eventuallyClose(Closeable, Executor)}. 797  * <li>It returns a {@link ListenableFuture}. 798  * </ul> 799  * 800  * <p>Example usage: 801  * 802  * <pre>{@code 803  * // Result.getRowsFuture() returns a ListenableFuture. 804  * ClosingFuture<List<Row>> rowsFuture = 805  * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 806  * }</pre> 807  * 808  * @param function transforms the value of a {@code ClosingFuture} step to a {@link 809  * ListenableFuture} with the value of a derived step 810  */ 811  public static <V, U> AsyncClosingFunction<V, U> withoutCloser( 812  final AsyncFunction<V, U> function) { 813  checkNotNull(function); 814  return new AsyncClosingFunction<V, U>() { 815  @Override 816  public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 817  return ClosingFuture.from(function.apply(input)); 818  } 819  }; 820  } 821  822  /** 823  * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 824  * to its exception if it is an instance of a given exception type. The function can use a {@link 825  * DeferredCloser} to capture objects to be closed when the pipeline is done. 826  * 827  * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 828  * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 829  * one. 830  * 831  * <p>If the function throws an exception, that exception is used as the result of the derived 832  * {@code ClosingFuture}. 833  * 834  * <p>Example usage: 835  * 836  * <pre>{@code 837  * ClosingFuture<QueryResult> queryFuture = 838  * queryFuture.catching( 839  * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 840  * }</pre> 841  * 842  * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 843  * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 844  * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 845  * 846  * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 847  * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 848  * this {@code ClosingFuture}. 849  * 850  * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 851  * type is matched against this step's exception. "This step's exception" means the cause of 852  * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 853  * underlying this step or, if {@code get()} throws a different kind of exception, that 854  * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 855  * prefer more specific types, avoiding {@code Throwable.class} in particular. 856  * @param fallback the function to be called if this step fails with the expected exception type. 857  * The function's argument is this step's exception. "This step's exception" means the cause 858  * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 859  * underlying this step or, if {@code get()} throws a different kind of exception, that 860  * exception itself. 861  * @param executor the executor that runs {@code fallback} if the input fails 862  */ 863  public <X extends Throwable> ClosingFuture<V> catching( 864  Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 865  return catchingMoreGeneric(exceptionType, fallback, executor); 866  } 867  868  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 869  private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 870  Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 871  checkNotNull(fallback); 872  AsyncFunction<X, W> applyFallback = 873  new AsyncFunction<X, W>() { 874  @Override 875  public ListenableFuture<W> apply(X exception) throws Exception { 876  return closeables.applyClosingFunction(fallback, exception); 877  } 878  879  @Override 880  public String toString() { 881  return fallback.toString(); 882  } 883  }; 884  // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 885  return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 886  } 887  888  /** 889  * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 890  * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 891  * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 892  * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 893  * 894  * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 895  * ClosingFuture} will be equivalent to the one returned by the function. 896  * 897  * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 898  * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 899  * one. 900  * 901  * <p>If the function throws an exception, that exception is used as the result of the derived 902  * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 903  * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 904  * closed. 905  * 906  * <p>Usage guidelines for this method: 907  * 908  * <ul> 909  * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 910  * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 911  * ClosingFunction, Executor)} instead, with a function that returns the next value 912  * directly. 913  * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 914  * for every closeable object this step creates in order to capture it for later closing. 915  * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 916  * ClosingFuture} call {@link #from(ListenableFuture)}. 917  * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 918  * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 919  * {@link #withoutCloser(AsyncFunction)} 920  * </ul> 921  * 922  * <p>Example usage: 923  * 924  * <pre>{@code 925  * // Fall back to a secondary input stream in case of IOException. 926  * ClosingFuture<InputStream> inputFuture = 927  * firstInputFuture.catchingAsync( 928  * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 929  * } 930  * }</pre> 931  * 932  * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 933  * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 934  * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 935  * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 936  * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 937  * responsible for completing the returned {@code ClosingFuture}.) 938  * 939  * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 940  * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 941  * this {@code ClosingFuture}. 942  * 943  * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 944  * type is matched against this step's exception. "This step's exception" means the cause of 945  * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 946  * underlying this step or, if {@code get()} throws a different kind of exception, that 947  * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 948  * prefer more specific types, avoiding {@code Throwable.class} in particular. 949  * @param fallback the function to be called if this step fails with the expected exception type. 950  * The function's argument is this step's exception. "This step's exception" means the cause 951  * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 952  * underlying this step or, if {@code get()} throws a different kind of exception, that 953  * exception itself. 954  * @param executor the executor that runs {@code fallback} if the input fails 955  */ 956  // TODO(dpb): Should this do something special if the function throws CancellationException or 957  // ExecutionException? 958  public <X extends Throwable> ClosingFuture<V> catchingAsync( 959  Class<X> exceptionType, 960  AsyncClosingFunction<? super X, ? extends V> fallback, 961  Executor executor) { 962  return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 963  } 964  965  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 966  private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 967  Class<X> exceptionType, 968  final AsyncClosingFunction<? super X, W> fallback, 969  Executor executor) { 970  checkNotNull(fallback); 971  AsyncFunction<X, W> asyncFunction = 972  new AsyncFunction<X, W>() { 973  @Override 974  public ListenableFuture<W> apply(X exception) throws Exception { 975  return closeables.applyAsyncClosingFunction(fallback, exception); 976  } 977  978  @Override 979  public String toString() { 980  return fallback.toString(); 981  } 982  }; 983  return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 984  } 985  986  /** 987  * Marks this step as the last step in the {@code ClosingFuture} pipeline. 988  * 989  * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when 990  * the pipeline is cancelled. 991  * 992  * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously 993  * <b>after</b> the returned {@code Future} is done: the future completes before closing starts, 994  * rather than once it has finished. 995  * 996  * <p>After calling this method, you may not call {@link 997  * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 998  * derivation method on this {@code ClosingFuture}. 999  * 1000  * @return a {@link Future} that represents the final value or exception of the pipeline 1001  */ 1002  public FluentFuture<V> finishToFuture() { 1003  if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 1004  logger.log(FINER, "will close {0}", this); 1005  future.addListener( 1006  new Runnable() { 1007  @Override 1008  public void run() { 1009  checkAndUpdateState(WILL_CLOSE, CLOSING); 1010  close(); 1011  checkAndUpdateState(CLOSING, CLOSED); 1012  } 1013  }, 1014  directExecutor()); 1015  } else { 1016  switch (state.get()) { 1017  case SUBSUMED: 1018  throw new IllegalStateException( 1019  "Cannot call finishToFuture() after deriving another step"); 1020  1021  case WILL_CREATE_VALUE_AND_CLOSER: 1022  throw new IllegalStateException( 1023  "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 1024  1025  case WILL_CLOSE: 1026  case CLOSING: 1027  case CLOSED: 1028  throw new IllegalStateException("Cannot call finishToFuture() twice"); 1029  1030  case OPEN: 1031  throw new AssertionError(); 1032  } 1033  } 1034  return future; 1035  } 1036  1037  /** 1038  * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 1039  * {@code receiver} will be called with an object that contains the result of the operation. The 1040  * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 1041  * 1042  * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 1043  * any other derivation method on this {@code ClosingFuture}. 1044  * 1045  * @param consumer a callback whose method will be called (using {@code executor}) when this 1046  * operation is done 1047  */ 1048  public void finishToValueAndCloser( 1049  final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 1050  checkNotNull(consumer); 1051  if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 1052  switch (state.get()) { 1053  case SUBSUMED: 1054  throw new IllegalStateException( 1055  "Cannot call finishToValueAndCloser() after deriving another step"); 1056  1057  case WILL_CLOSE: 1058  case CLOSING: 1059  case CLOSED: 1060  throw new IllegalStateException( 1061  "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1062  1063  case WILL_CREATE_VALUE_AND_CLOSER: 1064  throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1065  1066  case OPEN: 1067  break; 1068  } 1069  throw new AssertionError(state); 1070  } 1071  future.addListener( 1072  new Runnable() { 1073  @Override 1074  public void run() { 1075  provideValueAndCloser(consumer, ClosingFuture.this); 1076  } 1077  }, 1078  executor); 1079  } 1080  1081  private static <C, V extends C> void provideValueAndCloser( 1082  ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1083  consumer.accept(new ValueAndCloser<C>(closingFuture)); 1084  } 1085  1086  /** 1087  * Attempts to cancel execution of this step. This attempt will fail if the step has already 1088  * completed, has already been cancelled, or could not be cancelled for some other reason. If 1089  * successful, and this step has not started when {@code cancel} is called, this step should never 1090  * run. 1091  * 1092  * <p>If successful, causes the objects captured by this step (if already started) and its input 1093  * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1094  * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1095  * 1096  * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1097  * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1098  * cancelled regardless 1099  * @return {@code false} if the step could not be cancelled, typically because it has already 1100  * completed normally; {@code true} otherwise 1101  */ 1102  @CanIgnoreReturnValue 1103  public boolean cancel(boolean mayInterruptIfRunning) { 1104  logger.log(FINER, "cancelling {0}", this); 1105  boolean cancelled = future.cancel(mayInterruptIfRunning); 1106  if (cancelled) { 1107  close(); 1108  } 1109  return cancelled; 1110  } 1111  1112  private void close() { 1113  logger.log(FINER, "closing {0}", this); 1114  closeables.close(); 1115  } 1116  1117  private <U> ClosingFuture<U> derive(FluentFuture<U> future) { 1118  ClosingFuture<U> derived = new ClosingFuture<>(future); 1119  becomeSubsumedInto(derived.closeables); 1120  return derived; 1121  } 1122  1123  private void becomeSubsumedInto(CloseableList otherCloseables) { 1124  checkAndUpdateState(OPEN, SUBSUMED); 1125  otherCloseables.add(closeables, directExecutor()); 1126  } 1127  1128  /** 1129  * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1130  * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1131  * 1132  * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1133  */ 1134  public static final class Peeker { 1135  private final ImmutableList<ClosingFuture<?>> futures; 1136  private volatile boolean beingCalled; 1137  1138  private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1139  this.futures = checkNotNull(futures); 1140  } 1141  1142  /** 1143  * Returns the value of {@code closingFuture}. 1144  * 1145  * @throws ExecutionException if {@code closingFuture} is a failed step 1146  * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1147  * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1148  * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1149  * @throws IllegalStateException if called outside of a call to {@link 1150  * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1151  * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1152  */ 1153  public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture) 1154  throws ExecutionException { 1155  checkState(beingCalled); 1156  checkArgument(futures.contains(closingFuture)); 1157  return Futures.getDone(closingFuture.future); 1158  } 1159  1160  private <V extends @Nullable Object> V call( 1161  CombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1162  beingCalled = true; 1163  CloseableList newCloseables = new CloseableList(); 1164  try { 1165  return combiner.call(newCloseables.closer, this); 1166  } finally { 1167  closeables.add(newCloseables, directExecutor()); 1168  beingCalled = false; 1169  } 1170  } 1171  1172  private <V extends @Nullable Object> FluentFuture<V> callAsync( 1173  AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1174  beingCalled = true; 1175  CloseableList newCloseables = new CloseableList(); 1176  try { 1177  ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1178  closingFuture.becomeSubsumedInto(closeables); 1179  return closingFuture.future; 1180  } finally { 1181  closeables.add(newCloseables, directExecutor()); 1182  beingCalled = false; 1183  } 1184  } 1185  } 1186  1187  /** 1188  * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1189  * 1190  * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1191  * instantiate this class. 1192  * 1193  * <p>Example: 1194  * 1195  * <pre>{@code 1196  * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1197  * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1198  * ListenableFuture<Integer> numberOfDifferentLines = 1199  * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1200  * .call( 1201  * (closer, peeker) -> { 1202  * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1203  * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1204  * return countDifferentLines(file1Reader, file2Reader); 1205  * }, 1206  * executor) 1207  * .closing(executor); 1208  * }</pre> 1209  */ 1210  // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8. 1211  @com.google.errorprone.annotations.DoNotMock( 1212  "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1213  public static class Combiner { 1214  1215  private final CloseableList closeables = new CloseableList(); 1216  1217  /** 1218  * An operation that returns a result and may throw an exception. 1219  * 1220  * @param <V> the type of the result 1221  */ 1222  @FunctionalInterface 1223  public interface CombiningCallable<V extends @Nullable Object> { 1224  /** 1225  * Computes a result, or throws an exception if unable to do so. 1226  * 1227  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1228  * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1229  * is done (but not before this method completes), even if this method throws or the pipeline 1230  * is cancelled. 1231  * 1232  * @param peeker used to get the value of any of the input futures 1233  */ 1234  V call(DeferredCloser closer, Peeker peeker) throws Exception; 1235  } 1236  1237  /** 1238  * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1239  * 1240  * @param <V> the type of the result 1241  */ 1242  @FunctionalInterface 1243  public interface AsyncCombiningCallable<V extends @Nullable Object> { 1244  /** 1245  * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1246  * 1247  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1248  * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1249  * is done (but not before this method completes), even if this method throws or the pipeline 1250  * is cancelled. 1251  * 1252  * @param peeker used to get the value of any of the input futures 1253  */ 1254  ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1255  } 1256  1257  private final boolean allMustSucceed; 1258  protected final ImmutableList<ClosingFuture<?>> inputs; 1259  1260  private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1261  this.allMustSucceed = allMustSucceed; 1262  this.inputs = ImmutableList.copyOf(inputs); 1263  for (ClosingFuture<?> input : inputs) { 1264  input.becomeSubsumedInto(closeables); 1265  } 1266  } 1267  1268  /** 1269  * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1270  * combining function to their values. The function can use a {@link DeferredCloser} to capture 1271  * objects to be closed when the pipeline is done. 1272  * 1273  * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1274  * fail, so will the returned step. 1275  * 1276  * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1277  * cancelled. 1278  * 1279  * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1280  * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1281  */ 1282  public <V> ClosingFuture<V> call( 1283  final CombiningCallable<V> combiningCallable, Executor executor) { 1284  Callable<V> callable = 1285  new Callable<V>() { 1286  @Override 1287  public V call() throws Exception { 1288  return new Peeker(inputs).call(combiningCallable, closeables); 1289  } 1290  1291  @Override 1292  public String toString() { 1293  return combiningCallable.toString(); 1294  } 1295  }; 1296  ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1297  derived.closeables.add(closeables, directExecutor()); 1298  return derived; 1299  } 1300  1301  /** 1302  * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1303  * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1304  * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1305  * captured by the returned {@link ClosingFuture}). 1306  * 1307  * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1308  * fail, so will the returned step. 1309  * 1310  * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1311  * cancelled. 1312  * 1313  * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1314  * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1315  * 1316  * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1317  * derived step. 1318  * 1319  * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1320  * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1321  * 1322  * <p>Usage guidelines for this method: 1323  * 1324  * <ul> 1325  * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1326  * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1327  * Executor)} instead, with a function that returns the next value directly. 1328  * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1329  * closer.eventuallyClose()} for every closeable object this step creates in order to 1330  * capture it for later closing. 1331  * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1332  * ClosingFuture} call {@link #from(ListenableFuture)}. 1333  * </ul> 1334  * 1335  * <p>The same warnings about doing heavyweight operations within {@link 1336  * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1337  */ 1338  public <V> ClosingFuture<V> callAsync( 1339  final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1340  AsyncCallable<V> asyncCallable = 1341  new AsyncCallable<V>() { 1342  @Override 1343  public ListenableFuture<V> call() throws Exception { 1344  return new Peeker(inputs).callAsync(combiningCallable, closeables); 1345  } 1346  1347  @Override 1348  public String toString() { 1349  return combiningCallable.toString(); 1350  } 1351  }; 1352  ClosingFuture<V> derived = 1353  new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1354  derived.closeables.add(closeables, directExecutor()); 1355  return derived; 1356  } 1357  1358  private FutureCombiner<Object> futureCombiner() { 1359  return allMustSucceed 1360  ? Futures.whenAllSucceed(inputFutures()) 1361  : Futures.whenAllComplete(inputFutures()); 1362  } 1363  1364  private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE = 1365  new Function<ClosingFuture<?>, FluentFuture<?>>() { 1366  @Override 1367  public FluentFuture<?> apply(ClosingFuture<?> future) { 1368  return future.future; 1369  } 1370  }; 1371  1372  private ImmutableList<FluentFuture<?>> inputFutures() { 1373  return FluentIterable.from(inputs).transform(INNER_FUTURE).toList(); 1374  } 1375  } 1376  1377  /** 1378  * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1379  * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1380  * combination. 1381  * 1382  * @param <V1> the type returned by the first future 1383  * @param <V2> the type returned by the second future 1384  */ 1385  public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object> 1386  extends Combiner { 1387  1388  /** 1389  * A function that returns a value when applied to the values of the two futures passed to 1390  * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1391  * 1392  * @param <V1> the type returned by the first future 1393  * @param <V2> the type returned by the second future 1394  * @param <U> the type returned by the function 1395  */ 1396  @FunctionalInterface 1397  public interface ClosingFunction2< 1398  V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1399  1400  /** 1401  * Applies this function to two inputs, or throws an exception if unable to do so. 1402  * 1403  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1404  * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1405  * is done (but not before this method completes), even if this method throws or the pipeline 1406  * is cancelled. 1407  */ 1408  U apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception; 1409  } 1410  1411  /** 1412  * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1413  * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1414  * 1415  * @param <V1> the type returned by the first future 1416  * @param <V2> the type returned by the second future 1417  * @param <U> the type returned by the function 1418  */ 1419  @FunctionalInterface 1420  public interface AsyncClosingFunction2< 1421  V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1422  1423  /** 1424  * Applies this function to two inputs, or throws an exception if unable to do so. 1425  * 1426  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1427  * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1428  * is done (but not before this method completes), even if this method throws or the pipeline 1429  * is cancelled. 1430  */ 1431  ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception; 1432  } 1433  1434  private final ClosingFuture<V1> future1; 1435  private final ClosingFuture<V2> future2; 1436  1437  private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1438  super(true, ImmutableList.of(future1, future2)); 1439  this.future1 = future1; 1440  this.future2 = future2; 1441  } 1442  1443  /** 1444  * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1445  * combining function to their values. The function can use a {@link DeferredCloser} to capture 1446  * objects to be closed when the pipeline is done. 1447  * 1448  * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1449  * any of the inputs fail, so will the returned step. 1450  * 1451  * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1452  * 1453  * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1454  * ExecutionException} will be extracted and used as the failure of the derived step. 1455  */ 1456  public <U extends @Nullable Object> ClosingFuture<U> call( 1457  final ClosingFunction2<V1, V2, U> function, Executor executor) { 1458  return call( 1459  new CombiningCallable<U>() { 1460  @Override 1461  public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1462  return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1463  } 1464  1465  @Override 1466  public String toString() { 1467  return function.toString(); 1468  } 1469  }, 1470  executor); 1471  } 1472  1473  /** 1474  * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1475  * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1476  * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1477  * captured by the returned {@link ClosingFuture}). 1478  * 1479  * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1480  * any of the inputs fail, so will the returned step. 1481  * 1482  * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1483  * 1484  * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1485  * ExecutionException} will be extracted and used as the failure of the derived step. 1486  * 1487  * <p>If the function throws any other exception, it will be used as the failure of the derived 1488  * step. 1489  * 1490  * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1491  * the closeable objects in that {@code ClosingFuture} will be closed. 1492  * 1493  * <p>Usage guidelines for this method: 1494  * 1495  * <ul> 1496  * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1497  * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1498  * Executor)} instead, with a function that returns the next value directly. 1499  * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1500  * closer.eventuallyClose()} for every closeable object this step creates in order to 1501  * capture it for later closing. 1502  * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1503  * ClosingFuture} call {@link #from(ListenableFuture)}. 1504  * </ul> 1505  * 1506  * <p>The same warnings about doing heavyweight operations within {@link 1507  * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1508  */ 1509  public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1510  final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1511  return callAsync( 1512  new AsyncCombiningCallable<U>() { 1513  @Override 1514  public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1515  return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1516  } 1517  1518  @Override 1519  public String toString() { 1520  return function.toString(); 1521  } 1522  }, 1523  executor); 1524  } 1525  } 1526  1527  /** 1528  * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1529  * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1530  * ClosingFuture)} to start this combination. 1531  * 1532  * @param <V1> the type returned by the first future 1533  * @param <V2> the type returned by the second future 1534  * @param <V3> the type returned by the third future 1535  */ 1536  public static final class Combiner3< 1537  V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 1538  extends Combiner { 1539  /** 1540  * A function that returns a value when applied to the values of the three futures passed to 1541  * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1542  * 1543  * @param <V1> the type returned by the first future 1544  * @param <V2> the type returned by the second future 1545  * @param <V3> the type returned by the third future 1546  * @param <U> the type returned by the function 1547  */ 1548  @FunctionalInterface 1549  public interface ClosingFunction3< 1550  V1 extends @Nullable Object, 1551  V2 extends @Nullable Object, 1552  V3 extends @Nullable Object, 1553  U extends @Nullable Object> { 1554  /** 1555  * Applies this function to three inputs, or throws an exception if unable to do so. 1556  * 1557  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1558  * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1559  * is done (but not before this method completes), even if this method throws or the pipeline 1560  * is cancelled. 1561  */ 1562  U apply(DeferredCloser closer, V1 value1, V2 value2, V3 v3) throws Exception; 1563  } 1564  1565  /** 1566  * A function that returns a {@link ClosingFuture} when applied to the values of the three 1567  * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1568  * 1569  * @param <V1> the type returned by the first future 1570  * @param <V2> the type returned by the second future 1571  * @param <V3> the type returned by the third future 1572  * @param <U> the type returned by the function 1573  */ 1574  @FunctionalInterface 1575  public interface AsyncClosingFunction3< 1576  V1 extends @Nullable Object, 1577  V2 extends @Nullable Object, 1578  V3 extends @Nullable Object, 1579  U extends @Nullable Object> { 1580  /** 1581  * Applies this function to three inputs, or throws an exception if unable to do so. 1582  * 1583  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1584  * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1585  * is done (but not before this method completes), even if this method throws or the pipeline 1586  * is cancelled. 1587  */ 1588  ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3) 1589  throws Exception; 1590  } 1591  1592  private final ClosingFuture<V1> future1; 1593  private final ClosingFuture<V2> future2; 1594  private final ClosingFuture<V3> future3; 1595  1596  private Combiner3( 1597  ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1598  super(true, ImmutableList.of(future1, future2, future3)); 1599  this.future1 = future1; 1600  this.future2 = future2; 1601  this.future3 = future3; 1602  } 1603  1604  /** 1605  * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1606  * combining function to their values. The function can use a {@link DeferredCloser} to capture 1607  * objects to be closed when the pipeline is done. 1608  * 1609  * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1610  * ClosingFuture)} and any of the inputs fail, so will the returned step. 1611  * 1612  * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1613  * 1614  * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1615  * ExecutionException} will be extracted and used as the failure of the derived step. 1616  */ 1617  public <U extends @Nullable Object> ClosingFuture<U> call( 1618  final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1619  return call( 1620  new CombiningCallable<U>() { 1621  @Override 1622  public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1623  return function.apply( 1624  closer, 1625  peeker.getDone(future1), 1626  peeker.getDone(future2), 1627  peeker.getDone(future3)); 1628  } 1629  1630  @Override 1631  public String toString() { 1632  return function.toString(); 1633  } 1634  }, 1635  executor); 1636  } 1637  1638  /** 1639  * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1640  * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1641  * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1642  * captured by the returned {@link ClosingFuture}). 1643  * 1644  * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1645  * ClosingFuture)} and any of the inputs fail, so will the returned step. 1646  * 1647  * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1648  * 1649  * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1650  * ExecutionException} will be extracted and used as the failure of the derived step. 1651  * 1652  * <p>If the function throws any other exception, it will be used as the failure of the derived 1653  * step. 1654  * 1655  * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1656  * the closeable objects in that {@code ClosingFuture} will be closed. 1657  * 1658  * <p>Usage guidelines for this method: 1659  * 1660  * <ul> 1661  * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1662  * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1663  * Executor)} instead, with a function that returns the next value directly. 1664  * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1665  * closer.eventuallyClose()} for every closeable object this step creates in order to 1666  * capture it for later closing. 1667  * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1668  * ClosingFuture} call {@link #from(ListenableFuture)}. 1669  * </ul> 1670  * 1671  * <p>The same warnings about doing heavyweight operations within {@link 1672  * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1673  */ 1674  public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1675  final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1676  return callAsync( 1677  new AsyncCombiningCallable<U>() { 1678  @Override 1679  public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1680  return function.apply( 1681  closer, 1682  peeker.getDone(future1), 1683  peeker.getDone(future2), 1684  peeker.getDone(future3)); 1685  } 1686  1687  @Override 1688  public String toString() { 1689  return function.toString(); 1690  } 1691  }, 1692  executor); 1693  } 1694  } 1695  1696  /** 1697  * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1698  * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1699  * ClosingFuture)} to start this combination. 1700  * 1701  * @param <V1> the type returned by the first future 1702  * @param <V2> the type returned by the second future 1703  * @param <V3> the type returned by the third future 1704  * @param <V4> the type returned by the fourth future 1705  */ 1706  public static final class Combiner4< 1707  V1 extends @Nullable Object, 1708  V2 extends @Nullable Object, 1709  V3 extends @Nullable Object, 1710  V4 extends @Nullable Object> 1711  extends Combiner { 1712  /** 1713  * A function that returns a value when applied to the values of the four futures passed to 1714  * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1715  * 1716  * @param <V1> the type returned by the first future 1717  * @param <V2> the type returned by the second future 1718  * @param <V3> the type returned by the third future 1719  * @param <V4> the type returned by the fourth future 1720  * @param <U> the type returned by the function 1721  */ 1722  @FunctionalInterface 1723  public interface ClosingFunction4< 1724  V1 extends @Nullable Object, 1725  V2 extends @Nullable Object, 1726  V3 extends @Nullable Object, 1727  V4 extends @Nullable Object, 1728  U extends @Nullable Object> { 1729  /** 1730  * Applies this function to four inputs, or throws an exception if unable to do so. 1731  * 1732  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1733  * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1734  * is done (but not before this method completes), even if this method throws or the pipeline 1735  * is cancelled. 1736  */ 1737  U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4) throws Exception; 1738  } 1739  1740  /** 1741  * A function that returns a {@link ClosingFuture} when applied to the values of the four 1742  * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1743  * ClosingFuture)}. 1744  * 1745  * @param <V1> the type returned by the first future 1746  * @param <V2> the type returned by the second future 1747  * @param <V3> the type returned by the third future 1748  * @param <V4> the type returned by the fourth future 1749  * @param <U> the type returned by the function 1750  */ 1751  @FunctionalInterface 1752  public interface AsyncClosingFunction4< 1753  V1 extends @Nullable Object, 1754  V2 extends @Nullable Object, 1755  V3 extends @Nullable Object, 1756  V4 extends @Nullable Object, 1757  U extends @Nullable Object> { 1758  /** 1759  * Applies this function to four inputs, or throws an exception if unable to do so. 1760  * 1761  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1762  * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1763  * is done (but not before this method completes), even if this method throws or the pipeline 1764  * is cancelled. 1765  */ 1766  ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4) 1767  throws Exception; 1768  } 1769  1770  private final ClosingFuture<V1> future1; 1771  private final ClosingFuture<V2> future2; 1772  private final ClosingFuture<V3> future3; 1773  private final ClosingFuture<V4> future4; 1774  1775  private Combiner4( 1776  ClosingFuture<V1> future1, 1777  ClosingFuture<V2> future2, 1778  ClosingFuture<V3> future3, 1779  ClosingFuture<V4> future4) { 1780  super(true, ImmutableList.of(future1, future2, future3, future4)); 1781  this.future1 = future1; 1782  this.future2 = future2; 1783  this.future3 = future3; 1784  this.future4 = future4; 1785  } 1786  1787  /** 1788  * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1789  * combining function to their values. The function can use a {@link DeferredCloser} to capture 1790  * objects to be closed when the pipeline is done. 1791  * 1792  * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1793  * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1794  * 1795  * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1796  * 1797  * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1798  * ExecutionException} will be extracted and used as the failure of the derived step. 1799  */ 1800  public <U extends @Nullable Object> ClosingFuture<U> call( 1801  final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1802  return call( 1803  new CombiningCallable<U>() { 1804  @Override 1805  public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1806  return function.apply( 1807  closer, 1808  peeker.getDone(future1), 1809  peeker.getDone(future2), 1810  peeker.getDone(future3), 1811  peeker.getDone(future4)); 1812  } 1813  1814  @Override 1815  public String toString() { 1816  return function.toString(); 1817  } 1818  }, 1819  executor); 1820  } 1821  1822  /** 1823  * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1824  * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1825  * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1826  * captured by the returned {@link ClosingFuture}). 1827  * 1828  * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1829  * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1830  * 1831  * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1832  * 1833  * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1834  * ExecutionException} will be extracted and used as the failure of the derived step. 1835  * 1836  * <p>If the function throws any other exception, it will be used as the failure of the derived 1837  * step. 1838  * 1839  * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1840  * the closeable objects in that {@code ClosingFuture} will be closed. 1841  * 1842  * <p>Usage guidelines for this method: 1843  * 1844  * <ul> 1845  * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1846  * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1847  * Executor)} instead, with a function that returns the next value directly. 1848  * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1849  * closer.eventuallyClose()} for every closeable object this step creates in order to 1850  * capture it for later closing. 1851  * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1852  * ClosingFuture} call {@link #from(ListenableFuture)}. 1853  * </ul> 1854  * 1855  * <p>The same warnings about doing heavyweight operations within {@link 1856  * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1857  */ 1858  public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1859  final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1860  return callAsync( 1861  new AsyncCombiningCallable<U>() { 1862  @Override 1863  public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1864  return function.apply( 1865  closer, 1866  peeker.getDone(future1), 1867  peeker.getDone(future2), 1868  peeker.getDone(future3), 1869  peeker.getDone(future4)); 1870  } 1871  1872  @Override 1873  public String toString() { 1874  return function.toString(); 1875  } 1876  }, 1877  executor); 1878  } 1879  } 1880  1881  /** 1882  * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1883  * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1884  * ClosingFuture, ClosingFuture)} to start this combination. 1885  * 1886  * @param <V1> the type returned by the first future 1887  * @param <V2> the type returned by the second future 1888  * @param <V3> the type returned by the third future 1889  * @param <V4> the type returned by the fourth future 1890  * @param <V5> the type returned by the fifth future 1891  */ 1892  public static final class Combiner5< 1893  V1 extends @Nullable Object, 1894  V2 extends @Nullable Object, 1895  V3 extends @Nullable Object, 1896  V4 extends @Nullable Object, 1897  V5 extends @Nullable Object> 1898  extends Combiner { 1899  /** 1900  * A function that returns a value when applied to the values of the five futures passed to 1901  * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1902  * ClosingFuture)}. 1903  * 1904  * @param <V1> the type returned by the first future 1905  * @param <V2> the type returned by the second future 1906  * @param <V3> the type returned by the third future 1907  * @param <V4> the type returned by the fourth future 1908  * @param <V5> the type returned by the fifth future 1909  * @param <U> the type returned by the function 1910  */ 1911  @FunctionalInterface 1912  public interface ClosingFunction5< 1913  V1 extends @Nullable Object, 1914  V2 extends @Nullable Object, 1915  V3 extends @Nullable Object, 1916  V4 extends @Nullable Object, 1917  V5 extends @Nullable Object, 1918  U extends @Nullable Object> { 1919  /** 1920  * Applies this function to five inputs, or throws an exception if unable to do so. 1921  * 1922  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1923  * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1924  * is done (but not before this method completes), even if this method throws or the pipeline 1925  * is cancelled. 1926  */ 1927  U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5) 1928  throws Exception; 1929  } 1930  1931  /** 1932  * A function that returns a {@link ClosingFuture} when applied to the values of the five 1933  * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1934  * ClosingFuture, ClosingFuture)}. 1935  * 1936  * @param <V1> the type returned by the first future 1937  * @param <V2> the type returned by the second future 1938  * @param <V3> the type returned by the third future 1939  * @param <V4> the type returned by the fourth future 1940  * @param <V5> the type returned by the fifth future 1941  * @param <U> the type returned by the function 1942  */ 1943  @FunctionalInterface 1944  public interface AsyncClosingFunction5< 1945  V1 extends @Nullable Object, 1946  V2 extends @Nullable Object, 1947  V3 extends @Nullable Object, 1948  V4 extends @Nullable Object, 1949  V5 extends @Nullable Object, 1950  U extends @Nullable Object> { 1951  /** 1952  * Applies this function to five inputs, or throws an exception if unable to do so. 1953  * 1954  * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1955  * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1956  * is done (but not before this method completes), even if this method throws or the pipeline 1957  * is cancelled. 1958  */ 1959  ClosingFuture<U> apply( 1960  DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5) 1961  throws Exception; 1962  } 1963  1964  private final ClosingFuture<V1> future1; 1965  private final ClosingFuture<V2> future2; 1966  private final ClosingFuture<V3> future3; 1967  private final ClosingFuture<V4> future4; 1968  private final ClosingFuture<V5> future5; 1969  1970  private Combiner5( 1971  ClosingFuture<V1> future1, 1972  ClosingFuture<V2> future2, 1973  ClosingFuture<V3> future3, 1974  ClosingFuture<V4> future4, 1975  ClosingFuture<V5> future5) { 1976  super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 1977  this.future1 = future1; 1978  this.future2 = future2; 1979  this.future3 = future3; 1980  this.future4 = future4; 1981  this.future5 = future5; 1982  } 1983  1984  /** 1985  * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1986  * combining function to their values. The function can use a {@link DeferredCloser} to capture 1987  * objects to be closed when the pipeline is done. 1988  * 1989  * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1990  * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 1991  * returned step. 1992  * 1993  * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1994  * 1995  * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1996  * ExecutionException} will be extracted and used as the failure of the derived step. 1997  */ 1998  public <U extends @Nullable Object> ClosingFuture<U> call( 1999  final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2000  return call( 2001  new CombiningCallable<U>() { 2002  @Override 2003  public U call(DeferredCloser closer, Peeker peeker) throws Exception { 2004  return function.apply( 2005  closer, 2006  peeker.getDone(future1), 2007  peeker.getDone(future2), 2008  peeker.getDone(future3), 2009  peeker.getDone(future4), 2010  peeker.getDone(future5)); 2011  } 2012  2013  @Override 2014  public String toString() { 2015  return function.toString(); 2016  } 2017  }, 2018  executor); 2019  } 2020  2021  /** 2022  * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2023  * {@code ClosingFuture}-returning function to their values. The function can use a {@link 2024  * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 2025  * captured by the returned {@link ClosingFuture}). 2026  * 2027  * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2028  * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2029  * returned step. 2030  * 2031  * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2032  * 2033  * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2034  * ExecutionException} will be extracted and used as the failure of the derived step. 2035  * 2036  * <p>If the function throws any other exception, it will be used as the failure of the derived 2037  * step. 2038  * 2039  * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2040  * the closeable objects in that {@code ClosingFuture} will be closed. 2041  * 2042  * <p>Usage guidelines for this method: 2043  * 2044  * <ul> 2045  * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2046  * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2047  * Executor)} instead, with a function that returns the next value directly. 2048  * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 2049  * closer.eventuallyClose()} for every closeable object this step creates in order to 2050  * capture it for later closing. 2051  * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2052  * ClosingFuture} call {@link #from(ListenableFuture)}. 2053  * </ul> 2054  * 2055  * <p>The same warnings about doing heavyweight operations within {@link 2056  * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2057  */ 2058  public <U extends @Nullable Object> ClosingFuture<U> callAsync( 2059  final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2060  return callAsync( 2061  new AsyncCombiningCallable<U>() { 2062  @Override 2063  public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2064  return function.apply( 2065  closer, 2066  peeker.getDone(future1), 2067  peeker.getDone(future2), 2068  peeker.getDone(future3), 2069  peeker.getDone(future4), 2070  peeker.getDone(future5)); 2071  } 2072  2073  @Override 2074  public String toString() { 2075  return function.toString(); 2076  } 2077  }, 2078  executor); 2079  } 2080  } 2081  2082  @Override 2083  public String toString() { 2084  // TODO(dpb): Better toString, in the style of Futures.transform etc. 2085  return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2086  } 2087  2088  @Override 2089  protected void finalize() { 2090  if (state.get().equals(OPEN)) { 2091  logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2092  FluentFuture<V> unused = finishToFuture(); 2093  } 2094  } 2095  2096  private static void closeQuietly(final AutoCloseable closeable, Executor executor) { 2097  if (closeable == null) { 2098  return; 2099  } 2100  try { 2101  executor.execute( 2102  new Runnable() { 2103  @Override 2104  public void run() { 2105  try { 2106  closeable.close(); 2107  } catch (Exception e) { 2108  logger.log(WARNING, "thrown by close()", e); 2109  } 2110  } 2111  }); 2112  } catch (RejectedExecutionException e) { 2113  if (logger.isLoggable(WARNING)) { 2114  logger.log( 2115  WARNING, String.format("while submitting close to %s; will close inline", executor), e); 2116  } 2117  closeQuietly(closeable, directExecutor()); 2118  } 2119  } 2120  2121  private void checkAndUpdateState(State oldState, State newState) { 2122  checkState( 2123  compareAndUpdateState(oldState, newState), 2124  "Expected state to be %s, but it was %s", 2125  oldState, 2126  newState); 2127  } 2128  2129  private boolean compareAndUpdateState(State oldState, State newState) { 2130  return state.compareAndSet(oldState, newState); 2131  } 2132  2133  // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2134  private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor> 2135  implements Closeable { 2136  private final DeferredCloser closer = new DeferredCloser(this); 2137  private volatile boolean closed; 2138  private volatile CountDownLatch whenClosed; 2139  2140  <V, U> ListenableFuture<U> applyClosingFunction( 2141  ClosingFunction<? super V, U> transformation, V input) throws Exception { 2142  // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2143  CloseableList newCloseables = new CloseableList(); 2144  try { 2145  return immediateFuture(transformation.apply(newCloseables.closer, input)); 2146  } finally { 2147  add(newCloseables, directExecutor()); 2148  } 2149  } 2150  2151  <V, U> FluentFuture<U> applyAsyncClosingFunction( 2152  AsyncClosingFunction<V, U> transformation, V input) throws Exception { 2153  // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2154  CloseableList newCloseables = new CloseableList(); 2155  try { 2156  ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2157  closingFuture.becomeSubsumedInto(newCloseables); 2158  return closingFuture.future; 2159  } finally { 2160  add(newCloseables, directExecutor()); 2161  } 2162  } 2163  2164  @Override 2165  public void close() { 2166  if (closed) { 2167  return; 2168  } 2169  synchronized (this) { 2170  if (closed) { 2171  return; 2172  } 2173  closed = true; 2174  } 2175  for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) { 2176  closeQuietly(entry.getKey(), entry.getValue()); 2177  } 2178  clear(); 2179  if (whenClosed != null) { 2180  whenClosed.countDown(); 2181  } 2182  } 2183  2184  void add(@Nullable AutoCloseable closeable, Executor executor) { 2185  checkNotNull(executor); 2186  if (closeable == null) { 2187  return; 2188  } 2189  synchronized (this) { 2190  if (!closed) { 2191  put(closeable, executor); 2192  return; 2193  } 2194  } 2195  closeQuietly(closeable, executor); 2196  } 2197  2198  /** 2199  * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2200  */ 2201  CountDownLatch whenClosedCountDown() { 2202  if (closed) { 2203  return new CountDownLatch(0); 2204  } 2205  synchronized (this) { 2206  if (closed) { 2207  return new CountDownLatch(0); 2208  } 2209  checkState(whenClosed == null); 2210  return whenClosed = new CountDownLatch(1); 2211  } 2212  } 2213  } 2214  2215  /** 2216  * Returns an object that can be used to wait until this objects' deferred closeables have all had 2217  * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2218  */ 2219  @VisibleForTesting 2220  CountDownLatch whenClosedCountDown() { 2221  return closeables.whenClosedCountDown(); 2222  } 2223  2224  /** The state of a {@link CloseableList}. */ 2225  enum State { 2226  /** The {@link CloseableList} has not been subsumed or closed. */ 2227  OPEN, 2228  2229  /** 2230  * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2231  * into any other. 2232  */ 2233  SUBSUMED, 2234  2235  /** 2236  * Some {@link ListenableFuture} has a callback attached that will close the {@link 2237  * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2238  */ 2239  WILL_CLOSE, 2240  2241  /** 2242  * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2243  * {@link CloseableList} may not be subsumed. 2244  */ 2245  CLOSING, 2246  2247  /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2248  CLOSED, 2249  2250  /** 2251  * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2252  * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2253  */ 2254  WILL_CREATE_VALUE_AND_CLOSER, 2255  } 2256 }