Coverage Summary for Class: ServiceManager (com.google.common.util.concurrent)

Class Method, % Line, %
ServiceManager 0% (0/17) 0% (0/51)
ServiceManager$1 0% (0/3) 0% (0/3)
ServiceManager$2 0% (0/3) 0% (0/3)
ServiceManager$EmptyServiceManagerWarning 0% (0/1) 0% (0/1)
ServiceManager$FailedService 0% (0/1) 0% (0/3)
ServiceManager$Listener 0% (0/1) 0% (0/1)
ServiceManager$NoOpService 0% (0/3) 0% (0/3)
ServiceManager$ServiceListener 0% (0/6) 0% (0/26)
ServiceManager$ServiceManagerState 0% (0/16) 0% (0/112)
ServiceManager$ServiceManagerState$1 0% (0/2) 0% (0/2)
ServiceManager$ServiceManagerState$2 0% (0/3) 0% (0/3)
ServiceManager$ServiceManagerState$AwaitHealthGuard 0% (0/2) 0% (0/6)
ServiceManager$ServiceManagerState$StoppedGuard 0% (0/2) 0% (0/3)
Total 0% (0/60) 0% (0/217)


1 /* 2  * Copyright (C) 2012 The Guava Authors 3  * 4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5  * in compliance with the License. You may obtain a copy of the License at 6  * 7  * http://www.apache.org/licenses/LICENSE-2.0 8  * 9  * Unless required by applicable law or agreed to in writing, software distributed under the License 10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11  * or implied. See the License for the specific language governing permissions and limitations under 12  * the License. 13  */ 14  15 package com.google.common.util.concurrent; 16  17 import static com.google.common.base.Preconditions.checkArgument; 18 import static com.google.common.base.Preconditions.checkNotNull; 19 import static com.google.common.base.Preconditions.checkState; 20 import static com.google.common.base.Predicates.equalTo; 21 import static com.google.common.base.Predicates.in; 22 import static com.google.common.base.Predicates.instanceOf; 23 import static com.google.common.base.Predicates.not; 24 import static com.google.common.util.concurrent.Internal.toNanosSaturated; 25 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 26 import static com.google.common.util.concurrent.Service.State.FAILED; 27 import static com.google.common.util.concurrent.Service.State.NEW; 28 import static com.google.common.util.concurrent.Service.State.RUNNING; 29 import static com.google.common.util.concurrent.Service.State.STARTING; 30 import static com.google.common.util.concurrent.Service.State.STOPPING; 31 import static com.google.common.util.concurrent.Service.State.TERMINATED; 32 import static java.util.concurrent.TimeUnit.MILLISECONDS; 33  34 import com.google.common.annotations.GwtIncompatible; 35 import com.google.common.base.Function; 36 import com.google.common.base.MoreObjects; 37 import com.google.common.base.Stopwatch; 38 import com.google.common.collect.Collections2; 39 import com.google.common.collect.ImmutableCollection; 40 import com.google.common.collect.ImmutableList; 41 import com.google.common.collect.ImmutableMap; 42 import com.google.common.collect.ImmutableSet; 43 import com.google.common.collect.ImmutableSetMultimap; 44 import com.google.common.collect.Lists; 45 import com.google.common.collect.Maps; 46 import com.google.common.collect.MultimapBuilder; 47 import com.google.common.collect.Multimaps; 48 import com.google.common.collect.Multiset; 49 import com.google.common.collect.Ordering; 50 import com.google.common.collect.SetMultimap; 51 import com.google.common.util.concurrent.Service.State; 52 import com.google.errorprone.annotations.CanIgnoreReturnValue; 53 import com.google.errorprone.annotations.concurrent.GuardedBy; 54 import com.google.j2objc.annotations.J2ObjCIncompatible; 55 import com.google.j2objc.annotations.WeakOuter; 56 import java.lang.ref.WeakReference; 57 import java.time.Duration; 58 import java.util.Collections; 59 import java.util.EnumSet; 60 import java.util.List; 61 import java.util.Map; 62 import java.util.Map.Entry; 63 import java.util.concurrent.Executor; 64 import java.util.concurrent.TimeUnit; 65 import java.util.concurrent.TimeoutException; 66 import java.util.logging.Level; 67 import java.util.logging.Logger; 68  69 /** 70  * A manager for monitoring and controlling a set of {@linkplain Service services}. This class 71  * provides methods for {@linkplain #startAsync() starting}, {@linkplain #stopAsync() stopping} and 72  * {@linkplain #servicesByState inspecting} a collection of {@linkplain Service services}. 73  * Additionally, users can monitor state transitions with the {@linkplain Listener listener} 74  * mechanism. 75  * 76  * <p>While it is recommended that service lifecycles be managed via this class, state transitions 77  * initiated via other mechanisms do not impact the correctness of its methods. For example, if the 78  * services are started by some mechanism besides {@link #startAsync}, the listeners will be invoked 79  * when appropriate and {@link #awaitHealthy} will still work as expected. 80  * 81  * <p>Here is a simple example of how to use a {@code ServiceManager} to start a server. 82  * 83  * <pre>{@code 84  * class Server { 85  * public static void main(String[] args) { 86  * Set<Service> services = ...; 87  * ServiceManager manager = new ServiceManager(services); 88  * manager.addListener(new Listener() { 89  * public void stopped() {} 90  * public void healthy() { 91  * // Services have been initialized and are healthy, start accepting requests... 92  * } 93  * public void failure(Service service) { 94  * // Something failed, at this point we could log it, notify a load balancer, or take 95  * // some other action. For now we will just exit. 96  * System.exit(1); 97  * } 98  * }, 99  * MoreExecutors.directExecutor()); 100  * 101  * Runtime.getRuntime().addShutdownHook(new Thread() { 102  * public void run() { 103  * // Give the services 5 seconds to stop to ensure that we are responsive to shutdown 104  * // requests. 105  * try { 106  * manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS); 107  * } catch (TimeoutException timeout) { 108  * // stopping timed out 109  * } 110  * } 111  * }); 112  * manager.startAsync(); // start all the services asynchronously 113  * } 114  * } 115  * }</pre> 116  * 117  * <p>This class uses the ServiceManager's methods to start all of its services, to respond to 118  * service failure and to ensure that when the JVM is shutting down all the services are stopped. 119  * 120  * @author Luke Sandberg 121  * @since 14.0 122  */ 123 @GwtIncompatible 124 @ElementTypesAreNonnullByDefault 125 public final class ServiceManager implements ServiceManagerBridge { 126  private static final Logger logger = Logger.getLogger(ServiceManager.class.getName()); 127  private static final ListenerCallQueue.Event<Listener> HEALTHY_EVENT = 128  new ListenerCallQueue.Event<Listener>() { 129  @Override 130  public void call(Listener listener) { 131  listener.healthy(); 132  } 133  134  @Override 135  public String toString() { 136  return "healthy()"; 137  } 138  }; 139  private static final ListenerCallQueue.Event<Listener> STOPPED_EVENT = 140  new ListenerCallQueue.Event<Listener>() { 141  @Override 142  public void call(Listener listener) { 143  listener.stopped(); 144  } 145  146  @Override 147  public String toString() { 148  return "stopped()"; 149  } 150  }; 151  152  /** 153  * A listener for the aggregate state changes of the services that are under management. Users 154  * that need to listen to more fine-grained events (such as when each particular {@linkplain 155  * Service service} starts, or terminates), should attach {@linkplain Service.Listener service 156  * listeners} to each individual service. 157  * 158  * @author Luke Sandberg 159  * @since 15.0 (present as an interface in 14.0) 160  */ 161  public abstract static class Listener { 162  /** 163  * Called when the service initially becomes healthy. 164  * 165  * <p>This will be called at most once after all the services have entered the {@linkplain 166  * State#RUNNING running} state. If any services fail during start up or {@linkplain 167  * State#FAILED fail}/{@linkplain State#TERMINATED terminate} before all other services have 168  * started {@linkplain State#RUNNING running} then this method will not be called. 169  */ 170  public void healthy() {} 171  172  /** 173  * Called when the all of the component services have reached a terminal state, either 174  * {@linkplain State#TERMINATED terminated} or {@linkplain State#FAILED failed}. 175  */ 176  public void stopped() {} 177  178  /** 179  * Called when a component service has {@linkplain State#FAILED failed}. 180  * 181  * @param service The service that failed. 182  */ 183  public void failure(Service service) {} 184  } 185  186  /** 187  * An encapsulation of all of the state that is accessed by the {@linkplain ServiceListener 188  * service listeners}. This is extracted into its own object so that {@link ServiceListener} could 189  * be made {@code static} and its instances can be safely constructed and added in the {@link 190  * ServiceManager} constructor without having to close over the partially constructed {@link 191  * ServiceManager} instance (i.e. avoid leaking a pointer to {@code this}). 192  */ 193  private final ServiceManagerState state; 194  195  private final ImmutableList<Service> services; 196  197  /** 198  * Constructs a new instance for managing the given services. 199  * 200  * @param services The services to manage 201  * @throws IllegalArgumentException if not all services are {@linkplain State#NEW new} or if there 202  * are any duplicate services. 203  */ 204  public ServiceManager(Iterable<? extends Service> services) { 205  ImmutableList<Service> copy = ImmutableList.copyOf(services); 206  if (copy.isEmpty()) { 207  // Having no services causes the manager to behave strangely. Notably, listeners are never 208  // fired. To avoid this we substitute a placeholder service. 209  logger.log( 210  Level.WARNING, 211  "ServiceManager configured with no services. Is your application configured properly?", 212  new EmptyServiceManagerWarning()); 213  copy = ImmutableList.<Service>of(new NoOpService()); 214  } 215  this.state = new ServiceManagerState(copy); 216  this.services = copy; 217  WeakReference<ServiceManagerState> stateReference = new WeakReference<>(state); 218  for (Service service : copy) { 219  service.addListener(new ServiceListener(service, stateReference), directExecutor()); 220  // We check the state after adding the listener as a way to ensure that our listener was added 221  // to a NEW service. 222  checkArgument(service.state() == NEW, "Can only manage NEW services, %s", service); 223  } 224  // We have installed all of our listeners and after this point any state transition should be 225  // correct. 226  this.state.markReady(); 227  } 228  229  /** 230  * Registers a {@link Listener} to be {@linkplain Executor#execute executed} on the given 231  * executor. The listener will not have previous state changes replayed, so it is suggested that 232  * listeners are added before any of the managed services are {@linkplain Service#startAsync 233  * started}. 234  * 235  * <p>{@code addListener} guarantees execution ordering across calls to a given listener but not 236  * across calls to multiple listeners. Specifically, a given listener will have its callbacks 237  * invoked in the same order as the underlying service enters those states. Additionally, at most 238  * one of the listener's callbacks will execute at once. However, multiple listeners' callbacks 239  * may execute concurrently, and listeners may execute in an order different from the one in which 240  * they were registered. 241  * 242  * <p>RuntimeExceptions thrown by a listener will be caught and logged. Any exception thrown 243  * during {@code Executor.execute} (e.g., a {@code RejectedExecutionException}) will be caught and 244  * logged. 245  * 246  * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 247  * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 248  * documentation. 249  * 250  * @param listener the listener to run when the manager changes state 251  * @param executor the executor in which the listeners callback methods will be run. 252  */ 253  public void addListener(Listener listener, Executor executor) { 254  state.addListener(listener, executor); 255  } 256  257  /** 258  * Initiates service {@linkplain Service#startAsync startup} on all the services being managed. It 259  * is only valid to call this method if all of the services are {@linkplain State#NEW new}. 260  * 261  * @return this 262  * @throws IllegalStateException if any of the Services are not {@link State#NEW new} when the 263  * method is called. 264  */ 265  @CanIgnoreReturnValue 266  public ServiceManager startAsync() { 267  for (Service service : services) { 268  State state = service.state(); 269  checkState(state == NEW, "Service %s is %s, cannot start it.", service, state); 270  } 271  for (Service service : services) { 272  try { 273  state.tryStartTiming(service); 274  service.startAsync(); 275  } catch (IllegalStateException e) { 276  // This can happen if the service has already been started or stopped (e.g. by another 277  // service or listener). Our contract says it is safe to call this method if 278  // all services were NEW when it was called, and this has already been verified above, so we 279  // don't propagate the exception. 280  logger.log(Level.WARNING, "Unable to start Service " + service, e); 281  } 282  } 283  return this; 284  } 285  286  /** 287  * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy}. The manager 288  * will become healthy after all the component services have reached the {@linkplain State#RUNNING 289  * running} state. 290  * 291  * @throws IllegalStateException if the service manager reaches a state from which it cannot 292  * become {@linkplain #isHealthy() healthy}. 293  */ 294  public void awaitHealthy() { 295  state.awaitHealthy(); 296  } 297  298  /** 299  * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy} for no more 300  * than the given time. The manager will become healthy after all the component services have 301  * reached the {@linkplain State#RUNNING running} state. 302  * 303  * @param timeout the maximum time to wait 304  * @throws TimeoutException if not all of the services have finished starting within the deadline 305  * @throws IllegalStateException if the service manager reaches a state from which it cannot 306  * become {@linkplain #isHealthy() healthy}. 307  * @since 28.0 308  */ 309  public void awaitHealthy(Duration timeout) throws TimeoutException { 310  awaitHealthy(toNanosSaturated(timeout), TimeUnit.NANOSECONDS); 311  } 312  313  /** 314  * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy} for no more 315  * than the given time. The manager will become healthy after all the component services have 316  * reached the {@linkplain State#RUNNING running} state. 317  * 318  * @param timeout the maximum time to wait 319  * @param unit the time unit of the timeout argument 320  * @throws TimeoutException if not all of the services have finished starting within the deadline 321  * @throws IllegalStateException if the service manager reaches a state from which it cannot 322  * become {@linkplain #isHealthy() healthy}. 323  */ 324  @SuppressWarnings("GoodTime") // should accept a java.time.Duration 325  public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException { 326  state.awaitHealthy(timeout, unit); 327  } 328  329  /** 330  * Initiates service {@linkplain Service#stopAsync shutdown} if necessary on all the services 331  * being managed. 332  * 333  * @return this 334  */ 335  @CanIgnoreReturnValue 336  public ServiceManager stopAsync() { 337  for (Service service : services) { 338  service.stopAsync(); 339  } 340  return this; 341  } 342  343  /** 344  * Waits for the all the services to reach a terminal state. After this method returns all 345  * services will either be {@linkplain Service.State#TERMINATED terminated} or {@linkplain 346  * Service.State#FAILED failed}. 347  */ 348  public void awaitStopped() { 349  state.awaitStopped(); 350  } 351  352  /** 353  * Waits for the all the services to reach a terminal state for no more than the given time. After 354  * this method returns all services will either be {@linkplain Service.State#TERMINATED 355  * terminated} or {@linkplain Service.State#FAILED failed}. 356  * 357  * @param timeout the maximum time to wait 358  * @throws TimeoutException if not all of the services have stopped within the deadline 359  * @since 28.0 360  */ 361  public void awaitStopped(Duration timeout) throws TimeoutException { 362  awaitStopped(toNanosSaturated(timeout), TimeUnit.NANOSECONDS); 363  } 364  365  /** 366  * Waits for the all the services to reach a terminal state for no more than the given time. After 367  * this method returns all services will either be {@linkplain Service.State#TERMINATED 368  * terminated} or {@linkplain Service.State#FAILED failed}. 369  * 370  * @param timeout the maximum time to wait 371  * @param unit the time unit of the timeout argument 372  * @throws TimeoutException if not all of the services have stopped within the deadline 373  */ 374  @SuppressWarnings("GoodTime") // should accept a java.time.Duration 375  public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException { 376  state.awaitStopped(timeout, unit); 377  } 378  379  /** 380  * Returns true if all services are currently in the {@linkplain State#RUNNING running} state. 381  * 382  * <p>Users who want more detailed information should use the {@link #servicesByState} method to 383  * get detailed information about which services are not running. 384  */ 385  public boolean isHealthy() { 386  for (Service service : services) { 387  if (!service.isRunning()) { 388  return false; 389  } 390  } 391  return true; 392  } 393  394  /** 395  * Provides a snapshot of the current state of all the services under management. 396  * 397  * <p>N.B. This snapshot is guaranteed to be consistent, i.e. the set of states returned will 398  * correspond to a point in time view of the services. 399  * 400  * @since 29.0 (present with return type {@code ImmutableMultimap} since 14.0) 401  */ 402  @Override 403  public ImmutableSetMultimap<State, Service> servicesByState() { 404  return state.servicesByState(); 405  } 406  407  /** 408  * Returns the service load times. This value will only return startup times for services that 409  * have finished starting. 410  * 411  * @return Map of services and their corresponding startup time in millis, the map entries will be 412  * ordered by startup time. 413  */ 414  public ImmutableMap<Service, Long> startupTimes() { 415  return state.startupTimes(); 416  } 417  418  /** 419  * Returns the service load times. This value will only return startup times for services that 420  * have finished starting. 421  * 422  * @return Map of services and their corresponding startup time, the map entries will be ordered 423  * by startup time. 424  * @since NEXT 425  */ 426  @J2ObjCIncompatible 427  public ImmutableMap<Service, Duration> startupDurations() { 428  return ImmutableMap.copyOf(Maps.transformValues(startupTimes(), Duration::ofMillis)); 429  } 430  431  @Override 432  public String toString() { 433  return MoreObjects.toStringHelper(ServiceManager.class) 434  .add("services", Collections2.filter(services, not(instanceOf(NoOpService.class)))) 435  .toString(); 436  } 437  438  /** 439  * An encapsulation of all the mutable state of the {@link ServiceManager} that needs to be 440  * accessed by instances of {@link ServiceListener}. 441  */ 442  private static final class ServiceManagerState { 443  final Monitor monitor = new Monitor(); 444  445  @GuardedBy("monitor") 446  final SetMultimap<State, Service> servicesByState = 447  MultimapBuilder.enumKeys(State.class).linkedHashSetValues().build(); 448  449  @GuardedBy("monitor") 450  final Multiset<State> states = servicesByState.keys(); 451  452  @GuardedBy("monitor") 453  final Map<Service, Stopwatch> startupTimers = Maps.newIdentityHashMap(); 454  455  /** 456  * These two booleans are used to mark the state as ready to start. 457  * 458  * <p>{@link #ready}: is set by {@link #markReady} to indicate that all listeners have been 459  * correctly installed 460  * 461  * <p>{@link #transitioned}: is set by {@link #transitionService} to indicate that some 462  * transition has been performed. 463  * 464  * <p>Together, they allow us to enforce that all services have their listeners installed prior 465  * to any service performing a transition, then we can fail in the ServiceManager constructor 466  * rather than in a Service.Listener callback. 467  */ 468  @GuardedBy("monitor") 469  boolean ready; 470  471  @GuardedBy("monitor") 472  boolean transitioned; 473  474  final int numberOfServices; 475  476  /** 477  * Controls how long to wait for all the services to either become healthy or reach a state from 478  * which it is guaranteed that it can never become healthy. 479  */ 480  final Monitor.Guard awaitHealthGuard = new AwaitHealthGuard(); 481  482  @WeakOuter 483  final class AwaitHealthGuard extends Monitor.Guard { 484  AwaitHealthGuard() { 485  super(ServiceManagerState.this.monitor); 486  } 487  488  @Override 489  @GuardedBy("ServiceManagerState.this.monitor") 490  public boolean isSatisfied() { 491  // All services have started or some service has terminated/failed. 492  return states.count(RUNNING) == numberOfServices 493  || states.contains(STOPPING) 494  || states.contains(TERMINATED) 495  || states.contains(FAILED); 496  } 497  } 498  499  /** Controls how long to wait for all services to reach a terminal state. */ 500  final Monitor.Guard stoppedGuard = new StoppedGuard(); 501  502  @WeakOuter 503  final class StoppedGuard extends Monitor.Guard { 504  StoppedGuard() { 505  super(ServiceManagerState.this.monitor); 506  } 507  508  @Override 509  @GuardedBy("ServiceManagerState.this.monitor") 510  public boolean isSatisfied() { 511  return states.count(TERMINATED) + states.count(FAILED) == numberOfServices; 512  } 513  } 514  515  /** The listeners to notify during a state transition. */ 516  final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 517  518  /** 519  * It is implicitly assumed that all the services are NEW and that they will all remain NEW 520  * until all the Listeners are installed and {@link #markReady()} is called. It is our caller's 521  * responsibility to only call {@link #markReady()} if all services were new at the time this 522  * method was called and when all the listeners were installed. 523  */ 524  ServiceManagerState(ImmutableCollection<Service> services) { 525  this.numberOfServices = services.size(); 526  servicesByState.putAll(NEW, services); 527  } 528  529  /** 530  * Attempts to start the timer immediately prior to the service being started via {@link 531  * Service#startAsync()}. 532  */ 533  void tryStartTiming(Service service) { 534  monitor.enter(); 535  try { 536  Stopwatch stopwatch = startupTimers.get(service); 537  if (stopwatch == null) { 538  startupTimers.put(service, Stopwatch.createStarted()); 539  } 540  } finally { 541  monitor.leave(); 542  } 543  } 544  545  /** 546  * Marks the {@link State} as ready to receive transitions. Returns true if no transitions have 547  * been observed yet. 548  */ 549  void markReady() { 550  monitor.enter(); 551  try { 552  if (!transitioned) { 553  // nothing has transitioned since construction, good. 554  ready = true; 555  } else { 556  // This should be an extremely rare race condition. 557  List<Service> servicesInBadStates = Lists.newArrayList(); 558  for (Service service : servicesByState().values()) { 559  if (service.state() != NEW) { 560  servicesInBadStates.add(service); 561  } 562  } 563  throw new IllegalArgumentException( 564  "Services started transitioning asynchronously before " 565  + "the ServiceManager was constructed: " 566  + servicesInBadStates); 567  } 568  } finally { 569  monitor.leave(); 570  } 571  } 572  573  void addListener(Listener listener, Executor executor) { 574  listeners.addListener(listener, executor); 575  } 576  577  void awaitHealthy() { 578  monitor.enterWhenUninterruptibly(awaitHealthGuard); 579  try { 580  checkHealthy(); 581  } finally { 582  monitor.leave(); 583  } 584  } 585  586  void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException { 587  monitor.enter(); 588  try { 589  if (!monitor.waitForUninterruptibly(awaitHealthGuard, timeout, unit)) { 590  throw new TimeoutException( 591  "Timeout waiting for the services to become healthy. The " 592  + "following services have not started: " 593  + Multimaps.filterKeys(servicesByState, in(ImmutableSet.of(NEW, STARTING)))); 594  } 595  checkHealthy(); 596  } finally { 597  monitor.leave(); 598  } 599  } 600  601  void awaitStopped() { 602  monitor.enterWhenUninterruptibly(stoppedGuard); 603  monitor.leave(); 604  } 605  606  void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException { 607  monitor.enter(); 608  try { 609  if (!monitor.waitForUninterruptibly(stoppedGuard, timeout, unit)) { 610  throw new TimeoutException( 611  "Timeout waiting for the services to stop. The following " 612  + "services have not stopped: " 613  + Multimaps.filterKeys(servicesByState, not(in(EnumSet.of(TERMINATED, FAILED))))); 614  } 615  } finally { 616  monitor.leave(); 617  } 618  } 619  620  ImmutableSetMultimap<State, Service> servicesByState() { 621  ImmutableSetMultimap.Builder<State, Service> builder = ImmutableSetMultimap.builder(); 622  monitor.enter(); 623  try { 624  for (Entry<State, Service> entry : servicesByState.entries()) { 625  if (!(entry.getValue() instanceof NoOpService)) { 626  builder.put(entry); 627  } 628  } 629  } finally { 630  monitor.leave(); 631  } 632  return builder.build(); 633  } 634  635  ImmutableMap<Service, Long> startupTimes() { 636  List<Entry<Service, Long>> loadTimes; 637  monitor.enter(); 638  try { 639  loadTimes = Lists.newArrayListWithCapacity(startupTimers.size()); 640  // N.B. There will only be an entry in the map if the service has started 641  for (Entry<Service, Stopwatch> entry : startupTimers.entrySet()) { 642  Service service = entry.getKey(); 643  Stopwatch stopwatch = entry.getValue(); 644  if (!stopwatch.isRunning() && !(service instanceof NoOpService)) { 645  loadTimes.add(Maps.immutableEntry(service, stopwatch.elapsed(MILLISECONDS))); 646  } 647  } 648  } finally { 649  monitor.leave(); 650  } 651  Collections.sort( 652  loadTimes, 653  Ordering.natural() 654  .onResultOf( 655  new Function<Entry<Service, Long>, Long>() { 656  @Override 657  public Long apply(Entry<Service, Long> input) { 658  return input.getValue(); 659  } 660  })); 661  return ImmutableMap.copyOf(loadTimes); 662  } 663  664  /** 665  * Updates the state with the given service transition. 666  * 667  * <p>This method performs the main logic of ServiceManager in the following steps. 668  * 669  * <ol> 670  * <li>Update the {@link #servicesByState()} 671  * <li>Update the {@link #startupTimers} 672  * <li>Based on the new state queue listeners to run 673  * <li>Run the listeners (outside of the lock) 674  * </ol> 675  */ 676  void transitionService(final Service service, State from, State to) { 677  checkNotNull(service); 678  checkArgument(from != to); 679  monitor.enter(); 680  try { 681  transitioned = true; 682  if (!ready) { 683  return; 684  } 685  // Update state. 686  checkState( 687  servicesByState.remove(from, service), 688  "Service %s not at the expected location in the state map %s", 689  service, 690  from); 691  checkState( 692  servicesByState.put(to, service), 693  "Service %s in the state map unexpectedly at %s", 694  service, 695  to); 696  // Update the timer 697  Stopwatch stopwatch = startupTimers.get(service); 698  if (stopwatch == null) { 699  // This means the service was started by some means other than ServiceManager.startAsync 700  stopwatch = Stopwatch.createStarted(); 701  startupTimers.put(service, stopwatch); 702  } 703  if (to.compareTo(RUNNING) >= 0 && stopwatch.isRunning()) { 704  // N.B. if we miss the STARTING event then we may never record a startup time. 705  stopwatch.stop(); 706  if (!(service instanceof NoOpService)) { 707  logger.log(Level.FINE, "Started {0} in {1}.", new Object[] {service, stopwatch}); 708  } 709  } 710  // Queue our listeners 711  712  // Did a service fail? 713  if (to == FAILED) { 714  enqueueFailedEvent(service); 715  } 716  717  if (states.count(RUNNING) == numberOfServices) { 718  // This means that the manager is currently healthy. N.B. If other threads call isHealthy 719  // they are not guaranteed to get 'true', because any service could fail right now. 720  enqueueHealthyEvent(); 721  } else if (states.count(TERMINATED) + states.count(FAILED) == numberOfServices) { 722  enqueueStoppedEvent(); 723  } 724  } finally { 725  monitor.leave(); 726  // Run our executors outside of the lock 727  dispatchListenerEvents(); 728  } 729  } 730  731  void enqueueStoppedEvent() { 732  listeners.enqueue(STOPPED_EVENT); 733  } 734  735  void enqueueHealthyEvent() { 736  listeners.enqueue(HEALTHY_EVENT); 737  } 738  739  void enqueueFailedEvent(final Service service) { 740  listeners.enqueue( 741  new ListenerCallQueue.Event<Listener>() { 742  @Override 743  public void call(Listener listener) { 744  listener.failure(service); 745  } 746  747  @Override 748  public String toString() { 749  return "failed({service=" + service + "})"; 750  } 751  }); 752  } 753  754  /** Attempts to execute all the listeners in {@link #listeners}. */ 755  void dispatchListenerEvents() { 756  checkState( 757  !monitor.isOccupiedByCurrentThread(), 758  "It is incorrect to execute listeners with the monitor held."); 759  listeners.dispatch(); 760  } 761  762  @GuardedBy("monitor") 763  void checkHealthy() { 764  if (states.count(RUNNING) != numberOfServices) { 765  IllegalStateException exception = 766  new IllegalStateException( 767  "Expected to be healthy after starting. The following services are not running: " 768  + Multimaps.filterKeys(servicesByState, not(equalTo(RUNNING)))); 769  for (Service service : servicesByState.get(State.FAILED)) { 770  exception.addSuppressed(new FailedService(service)); 771  } 772  throw exception; 773  } 774  } 775  } 776  777  /** 778  * A {@link Service} that wraps another service and times how long it takes for it to start and 779  * also calls the {@link ServiceManagerState#transitionService(Service, State, State)}, to record 780  * the state transitions. 781  */ 782  private static final class ServiceListener extends Service.Listener { 783  final Service service; 784  // We store the state in a weak reference to ensure that if something went wrong while 785  // constructing the ServiceManager we don't pointlessly keep updating the state. 786  final WeakReference<ServiceManagerState> state; 787  788  ServiceListener(Service service, WeakReference<ServiceManagerState> state) { 789  this.service = service; 790  this.state = state; 791  } 792  793  @Override 794  public void starting() { 795  ServiceManagerState state = this.state.get(); 796  if (state != null) { 797  state.transitionService(service, NEW, STARTING); 798  if (!(service instanceof NoOpService)) { 799  logger.log(Level.FINE, "Starting {0}.", service); 800  } 801  } 802  } 803  804  @Override 805  public void running() { 806  ServiceManagerState state = this.state.get(); 807  if (state != null) { 808  state.transitionService(service, STARTING, RUNNING); 809  } 810  } 811  812  @Override 813  public void stopping(State from) { 814  ServiceManagerState state = this.state.get(); 815  if (state != null) { 816  state.transitionService(service, from, STOPPING); 817  } 818  } 819  820  @Override 821  public void terminated(State from) { 822  ServiceManagerState state = this.state.get(); 823  if (state != null) { 824  if (!(service instanceof NoOpService)) { 825  logger.log( 826  Level.FINE, 827  "Service {0} has terminated. Previous state was: {1}", 828  new Object[] {service, from}); 829  } 830  state.transitionService(service, from, TERMINATED); 831  } 832  } 833  834  @Override 835  public void failed(State from, Throwable failure) { 836  ServiceManagerState state = this.state.get(); 837  if (state != null) { 838  // Log before the transition, so that if the process exits in response to server failure, 839  // there is a higher likelihood that the cause will be in the logs. 840  boolean log = !(service instanceof NoOpService); 841  /* 842  * We have already exposed startup exceptions to the user in the form of suppressed 843  * exceptions. We don't need to log those exceptions again. 844  */ 845  log &= from != State.STARTING; 846  if (log) { 847  logger.log( 848  Level.SEVERE, 849  "Service " + service + " has failed in the " + from + " state.", 850  failure); 851  } 852  state.transitionService(service, from, FAILED); 853  } 854  } 855  } 856  857  /** 858  * A {@link Service} instance that does nothing. This is only useful as a placeholder to ensure 859  * that the {@link ServiceManager} functions properly even when it is managing no services. 860  * 861  * <p>The use of this class is considered an implementation detail of ServiceManager and as such 862  * it is excluded from {@link #servicesByState}, {@link #startupTimes}, {@link #toString} and all 863  * logging statements. 864  */ 865  private static final class NoOpService extends AbstractService { 866  @Override 867  protected void doStart() { 868  notifyStarted(); 869  } 870  871  @Override 872  protected void doStop() { 873  notifyStopped(); 874  } 875  } 876  877  /** This is never thrown but only used for logging. */ 878  private static final class EmptyServiceManagerWarning extends Throwable {} 879  880  private static final class FailedService extends Throwable { 881  FailedService(Service service) { 882  super( 883  service.toString(), 884  service.failureCause(), 885  false /* don't enable suppression */, 886  false /* don't calculate a stack trace. */); 887  } 888  } 889 }