Coverage Summary for Class: AbstractService (com.google.common.util.concurrent)

Class Method, % Line, %
AbstractService 0% (0/28) 0% (0/140)
AbstractService$1 0% (0/3) 0% (0/3)
AbstractService$2 0% (0/3) 0% (0/3)
AbstractService$3 0% (0/3) 0% (0/3)
AbstractService$4 0% (0/3) 0% (0/3)
AbstractService$5 0% (0/3) 0% (0/3)
AbstractService$6 0% (0/1) 0% (0/1)
AbstractService$HasReachedRunningGuard 0% (0/2) 0% (0/3)
AbstractService$IsStartableGuard 0% (0/2) 0% (0/3)
AbstractService$IsStoppableGuard 0% (0/2) 0% (0/3)
AbstractService$IsStoppedGuard 0% (0/2) 0% (0/3)
AbstractService$StateSnapshot 0% (0/4) 0% (0/12)
Total 0% (0/56) 0% (0/180)


1 /* 2  * Copyright (C) 2009 The Guava Authors 3  * 4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5  * in compliance with the License. You may obtain a copy of the License at 6  * 7  * http://www.apache.org/licenses/LICENSE-2.0 8  * 9  * Unless required by applicable law or agreed to in writing, software distributed under the License 10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11  * or implied. See the License for the specific language governing permissions and limitations under 12  * the License. 13  */ 14  15 package com.google.common.util.concurrent; 16  17 import static com.google.common.base.Preconditions.checkArgument; 18 import static com.google.common.base.Preconditions.checkNotNull; 19 import static com.google.common.base.Preconditions.checkState; 20 import static com.google.common.util.concurrent.Service.State.FAILED; 21 import static com.google.common.util.concurrent.Service.State.NEW; 22 import static com.google.common.util.concurrent.Service.State.RUNNING; 23 import static com.google.common.util.concurrent.Service.State.STARTING; 24 import static com.google.common.util.concurrent.Service.State.STOPPING; 25 import static com.google.common.util.concurrent.Service.State.TERMINATED; 26 import static java.util.Objects.requireNonNull; 27  28 import com.google.common.annotations.Beta; 29 import com.google.common.annotations.GwtIncompatible; 30 import com.google.common.util.concurrent.Monitor.Guard; 31 import com.google.common.util.concurrent.Service.State; // javadoc needs this 32 import com.google.errorprone.annotations.CanIgnoreReturnValue; 33 import com.google.errorprone.annotations.ForOverride; 34 import com.google.errorprone.annotations.concurrent.GuardedBy; 35 import com.google.j2objc.annotations.WeakOuter; 36 import java.time.Duration; 37 import java.util.concurrent.Executor; 38 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.TimeoutException; 40 import javax.annotation.CheckForNull; 41  42 /** 43  * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 44  * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 45  * callbacks. Its subclasses must manage threads manually; consider {@link 46  * AbstractExecutionThreadService} if you need only a single execution thread. 47  * 48  * @author Jesse Wilson 49  * @author Luke Sandberg 50  * @since 1.0 51  */ 52 @GwtIncompatible 53 @ElementTypesAreNonnullByDefault 54 public abstract class AbstractService implements Service { 55  private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 56  new ListenerCallQueue.Event<Listener>() { 57  @Override 58  public void call(Listener listener) { 59  listener.starting(); 60  } 61  62  @Override 63  public String toString() { 64  return "starting()"; 65  } 66  }; 67  private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 68  new ListenerCallQueue.Event<Listener>() { 69  @Override 70  public void call(Listener listener) { 71  listener.running(); 72  } 73  74  @Override 75  public String toString() { 76  return "running()"; 77  } 78  }; 79  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 80  stoppingEvent(STARTING); 81  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 82  stoppingEvent(RUNNING); 83  84  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 85  terminatedEvent(NEW); 86  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 87  terminatedEvent(STARTING); 88  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 89  terminatedEvent(RUNNING); 90  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 91  terminatedEvent(STOPPING); 92  93  private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 94  return new ListenerCallQueue.Event<Listener>() { 95  @Override 96  public void call(Listener listener) { 97  listener.terminated(from); 98  } 99  100  @Override 101  public String toString() { 102  return "terminated({from = " + from + "})"; 103  } 104  }; 105  } 106  107  private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 108  return new ListenerCallQueue.Event<Listener>() { 109  @Override 110  public void call(Listener listener) { 111  listener.stopping(from); 112  } 113  114  @Override 115  public String toString() { 116  return "stopping({from = " + from + "})"; 117  } 118  }; 119  } 120  121  private final Monitor monitor = new Monitor(); 122  123  private final Guard isStartable = new IsStartableGuard(); 124  125  @WeakOuter 126  private final class IsStartableGuard extends Guard { 127  IsStartableGuard() { 128  super(AbstractService.this.monitor); 129  } 130  131  @Override 132  public boolean isSatisfied() { 133  return state() == NEW; 134  } 135  } 136  137  private final Guard isStoppable = new IsStoppableGuard(); 138  139  @WeakOuter 140  private final class IsStoppableGuard extends Guard { 141  IsStoppableGuard() { 142  super(AbstractService.this.monitor); 143  } 144  145  @Override 146  public boolean isSatisfied() { 147  return state().compareTo(RUNNING) <= 0; 148  } 149  } 150  151  private final Guard hasReachedRunning = new HasReachedRunningGuard(); 152  153  @WeakOuter 154  private final class HasReachedRunningGuard extends Guard { 155  HasReachedRunningGuard() { 156  super(AbstractService.this.monitor); 157  } 158  159  @Override 160  public boolean isSatisfied() { 161  return state().compareTo(RUNNING) >= 0; 162  } 163  } 164  165  private final Guard isStopped = new IsStoppedGuard(); 166  167  @WeakOuter 168  private final class IsStoppedGuard extends Guard { 169  IsStoppedGuard() { 170  super(AbstractService.this.monitor); 171  } 172  173  @Override 174  public boolean isSatisfied() { 175  return state().compareTo(TERMINATED) >= 0; 176  } 177  } 178  179  /** The listeners to notify during a state transition. */ 180  private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 181  182  /** 183  * The current state of the service. This should be written with the lock held but can be read 184  * without it because it is an immutable object in a volatile field. This is desirable so that 185  * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 186  * without grabbing the lock. 187  * 188  * <p>To update this field correctly the lock must be held to guarantee that the state is 189  * consistent. 190  */ 191  private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 192  193  /** Constructor for use by subclasses. */ 194  protected AbstractService() {} 195  196  /** 197  * This method is called by {@link #startAsync} to initiate service startup. The invocation of 198  * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 199  * or after it has returned. If startup fails, the invocation should cause a call to {@link 200  * #notifyFailed(Throwable)} instead. 201  * 202  * <p>This method should return promptly; prefer to do work on a different thread where it is 203  * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 204  * called multiple times. 205  */ 206  @ForOverride 207  protected abstract void doStart(); 208  209  /** 210  * This method should be used to initiate service shutdown. The invocation of this method should 211  * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 212  * returned. If shutdown fails, the invocation should cause a call to {@link 213  * #notifyFailed(Throwable)} instead. 214  * 215  * <p>This method should return promptly; prefer to do work on a different thread where it is 216  * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 217  * called multiple times. 218  * 219  * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 220  * invoked immediately. Instead, it will be deferred until after the service is {@link 221  * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}. 222  */ 223  @ForOverride 224  protected abstract void doStop(); 225  226  /** 227  * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 228  * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 229  * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 230  * 231  * <p>This method should return promptly; prefer to do work on a different thread where it is 232  * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 233  * called multiple times. 234  * 235  * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 236  * external state observable by the caller of {@link #stopAsync}. 237  * 238  * @since 27.0 239  */ 240  @Beta 241  @ForOverride 242  protected void doCancelStart() {} 243  244  @CanIgnoreReturnValue 245  @Override 246  public final Service startAsync() { 247  if (monitor.enterIf(isStartable)) { 248  try { 249  snapshot = new StateSnapshot(STARTING); 250  enqueueStartingEvent(); 251  doStart(); 252  } catch (Throwable startupFailure) { 253  notifyFailed(startupFailure); 254  } finally { 255  monitor.leave(); 256  dispatchListenerEvents(); 257  } 258  } else { 259  throw new IllegalStateException("Service " + this + " has already been started"); 260  } 261  return this; 262  } 263  264  @CanIgnoreReturnValue 265  @Override 266  public final Service stopAsync() { 267  if (monitor.enterIf(isStoppable)) { 268  try { 269  State previous = state(); 270  switch (previous) { 271  case NEW: 272  snapshot = new StateSnapshot(TERMINATED); 273  enqueueTerminatedEvent(NEW); 274  break; 275  case STARTING: 276  snapshot = new StateSnapshot(STARTING, true, null); 277  enqueueStoppingEvent(STARTING); 278  doCancelStart(); 279  break; 280  case RUNNING: 281  snapshot = new StateSnapshot(STOPPING); 282  enqueueStoppingEvent(RUNNING); 283  doStop(); 284  break; 285  case STOPPING: 286  case TERMINATED: 287  case FAILED: 288  // These cases are impossible due to the if statement above. 289  throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 290  } 291  } catch (Throwable shutdownFailure) { 292  notifyFailed(shutdownFailure); 293  } finally { 294  monitor.leave(); 295  dispatchListenerEvents(); 296  } 297  } 298  return this; 299  } 300  301  @Override 302  public final void awaitRunning() { 303  monitor.enterWhenUninterruptibly(hasReachedRunning); 304  try { 305  checkCurrentState(RUNNING); 306  } finally { 307  monitor.leave(); 308  } 309  } 310  311  /** @since 28.0 */ 312  @Override 313  public final void awaitRunning(Duration timeout) throws TimeoutException { 314  Service.super.awaitRunning(timeout); 315  } 316  317  @Override 318  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 319  if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 320  try { 321  checkCurrentState(RUNNING); 322  } finally { 323  monitor.leave(); 324  } 325  } else { 326  // It is possible due to races the we are currently in the expected state even though we 327  // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 328  // even check the guard. I don't think we care too much about this use case but it could lead 329  // to a confusing error message. 330  throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 331  } 332  } 333  334  @Override 335  public final void awaitTerminated() { 336  monitor.enterWhenUninterruptibly(isStopped); 337  try { 338  checkCurrentState(TERMINATED); 339  } finally { 340  monitor.leave(); 341  } 342  } 343  344  /** @since 28.0 */ 345  @Override 346  public final void awaitTerminated(Duration timeout) throws TimeoutException { 347  Service.super.awaitTerminated(timeout); 348  } 349  350  @Override 351  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 352  if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 353  try { 354  checkCurrentState(TERMINATED); 355  } finally { 356  monitor.leave(); 357  } 358  } else { 359  // It is possible due to races the we are currently in the expected state even though we 360  // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 361  // even check the guard. I don't think we care too much about this use case but it could lead 362  // to a confusing error message. 363  throw new TimeoutException( 364  "Timed out waiting for " 365  + this 366  + " to reach a terminal state. " 367  + "Current state: " 368  + state()); 369  } 370  } 371  372  /** Checks that the current state is equal to the expected state. */ 373  @GuardedBy("monitor") 374  private void checkCurrentState(State expected) { 375  State actual = state(); 376  if (actual != expected) { 377  if (actual == FAILED) { 378  // Handle this specially so that we can include the failureCause, if there is one. 379  throw new IllegalStateException( 380  "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 381  failureCause()); 382  } 383  throw new IllegalStateException( 384  "Expected the service " + this + " to be " + expected + ", but was " + actual); 385  } 386  } 387  388  /** 389  * Implementing classes should invoke this method once their service has started. It will cause 390  * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 391  * 392  * @throws IllegalStateException if the service is not {@link State#STARTING}. 393  */ 394  protected final void notifyStarted() { 395  monitor.enter(); 396  try { 397  // We have to examine the internal state of the snapshot here to properly handle the stop 398  // while starting case. 399  if (snapshot.state != STARTING) { 400  IllegalStateException failure = 401  new IllegalStateException( 402  "Cannot notifyStarted() when the service is " + snapshot.state); 403  notifyFailed(failure); 404  throw failure; 405  } 406  407  if (snapshot.shutdownWhenStartupFinishes) { 408  snapshot = new StateSnapshot(STOPPING); 409  // We don't call listeners here because we already did that when we set the 410  // shutdownWhenStartupFinishes flag. 411  doStop(); 412  } else { 413  snapshot = new StateSnapshot(RUNNING); 414  enqueueRunningEvent(); 415  } 416  } finally { 417  monitor.leave(); 418  dispatchListenerEvents(); 419  } 420  } 421  422  /** 423  * Implementing classes should invoke this method once their service has stopped. It will cause 424  * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 425  * State#TERMINATED}. 426  * 427  * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 428  * State#STARTING}, or {@link State#RUNNING}. 429  */ 430  protected final void notifyStopped() { 431  monitor.enter(); 432  try { 433  State previous = state(); 434  switch (previous) { 435  case NEW: 436  case TERMINATED: 437  case FAILED: 438  throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 439  case RUNNING: 440  case STARTING: 441  case STOPPING: 442  snapshot = new StateSnapshot(TERMINATED); 443  enqueueTerminatedEvent(previous); 444  break; 445  } 446  } finally { 447  monitor.leave(); 448  dispatchListenerEvents(); 449  } 450  } 451  452  /** 453  * Invoke this method to transition the service to the {@link State#FAILED}. The service will 454  * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 455  * or otherwise cannot be started nor stopped. 456  */ 457  protected final void notifyFailed(Throwable cause) { 458  checkNotNull(cause); 459  460  monitor.enter(); 461  try { 462  State previous = state(); 463  switch (previous) { 464  case NEW: 465  case TERMINATED: 466  throw new IllegalStateException("Failed while in state:" + previous, cause); 467  case RUNNING: 468  case STARTING: 469  case STOPPING: 470  snapshot = new StateSnapshot(FAILED, false, cause); 471  enqueueFailedEvent(previous, cause); 472  break; 473  case FAILED: 474  // Do nothing 475  break; 476  } 477  } finally { 478  monitor.leave(); 479  dispatchListenerEvents(); 480  } 481  } 482  483  @Override 484  public final boolean isRunning() { 485  return state() == RUNNING; 486  } 487  488  @Override 489  public final State state() { 490  return snapshot.externalState(); 491  } 492  493  /** @since 14.0 */ 494  @Override 495  public final Throwable failureCause() { 496  return snapshot.failureCause(); 497  } 498  499  /** @since 13.0 */ 500  @Override 501  public final void addListener(Listener listener, Executor executor) { 502  listeners.addListener(listener, executor); 503  } 504  505  @Override 506  public String toString() { 507  return getClass().getSimpleName() + " [" + state() + "]"; 508  } 509  510  /** 511  * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 512  * #monitor}. 513  */ 514  private void dispatchListenerEvents() { 515  if (!monitor.isOccupiedByCurrentThread()) { 516  listeners.dispatch(); 517  } 518  } 519  520  private void enqueueStartingEvent() { 521  listeners.enqueue(STARTING_EVENT); 522  } 523  524  private void enqueueRunningEvent() { 525  listeners.enqueue(RUNNING_EVENT); 526  } 527  528  private void enqueueStoppingEvent(final State from) { 529  if (from == State.STARTING) { 530  listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 531  } else if (from == State.RUNNING) { 532  listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 533  } else { 534  throw new AssertionError(); 535  } 536  } 537  538  private void enqueueTerminatedEvent(final State from) { 539  switch (from) { 540  case NEW: 541  listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 542  break; 543  case STARTING: 544  listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 545  break; 546  case RUNNING: 547  listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 548  break; 549  case STOPPING: 550  listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 551  break; 552  case TERMINATED: 553  case FAILED: 554  throw new AssertionError(); 555  } 556  } 557  558  private void enqueueFailedEvent(final State from, final Throwable cause) { 559  // can't memoize this one due to the exception 560  listeners.enqueue( 561  new ListenerCallQueue.Event<Listener>() { 562  @Override 563  public void call(Listener listener) { 564  listener.failed(from, cause); 565  } 566  567  @Override 568  public String toString() { 569  return "failed({from = " + from + ", cause = " + cause + "})"; 570  } 571  }); 572  } 573  574  /** 575  * An immutable snapshot of the current state of the service. This class represents a consistent 576  * snapshot of the state and therefore it can be used to answer simple queries without needing to 577  * grab a lock. 578  */ 579  // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 580  private static final class StateSnapshot { 581  /** 582  * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 583  */ 584  final State state; 585  586  /** If true, the user requested a shutdown while the service was still starting up. */ 587  final boolean shutdownWhenStartupFinishes; 588  589  /** 590  * The exception that caused this service to fail. This will be {@code null} unless the service 591  * has failed. 592  */ 593  @CheckForNull final Throwable failure; 594  595  StateSnapshot(State internalState) { 596  this(internalState, false, null); 597  } 598  599  StateSnapshot( 600  State internalState, boolean shutdownWhenStartupFinishes, @CheckForNull Throwable failure) { 601  checkArgument( 602  !shutdownWhenStartupFinishes || internalState == STARTING, 603  "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 604  internalState); 605  checkArgument( 606  (failure != null) == (internalState == FAILED), 607  "A failure cause should be set if and only if the state is failed. Got %s and %s " 608  + "instead.", 609  internalState, 610  failure); 611  this.state = internalState; 612  this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 613  this.failure = failure; 614  } 615  616  /** @see Service#state() */ 617  State externalState() { 618  if (shutdownWhenStartupFinishes && state == STARTING) { 619  return STOPPING; 620  } else { 621  return state; 622  } 623  } 624  625  /** @see Service#failureCause() */ 626  Throwable failureCause() { 627  checkState( 628  state == FAILED, 629  "failureCause() is only valid if the service has failed, service is %s", 630  state); 631  // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED. 632  return requireNonNull(failure); 633  } 634  } 635 }