SequentialExecutor.java

/*
 * Copyright (C) 2008 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.IDLE;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUED;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.QUEUING;
import static com.google.common.util.concurrent.SequentialExecutor.WorkerRunningState.RUNNING;
import static java.lang.System.identityHashCode;

import com.google.common.annotations.GwtIncompatible;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.j2objc.annotations.RetainedWith;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckForNull;

/**
 * Executor ensuring that all Runnables submitted are executed in order, using the provided
 * Executor, and sequentially such that no two will ever be running at the same time.
 *
 * <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order.
 *
 * <p>The execution of tasks is done by one thread as long as there are tasks left in the queue.
 * When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
 * continues. See {@link QueueWorker#workOnQueue} for details.
 *
 * <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
 * If an {@code Error} is thrown, the error will propagate and execution will stop until it is
 * restarted by a call to {@link #execute}.
 */
@GwtIncompatible
@ElementTypesAreNonnullByDefault
final class SequentialExecutor implements Executor {
  private static final Logger log = Logger.getLogger(SequentialExecutor.class.getName());

  enum WorkerRunningState {
    /** Runnable is not running and not queued for execution */
    IDLE,
    /** Runnable is not running, but is being queued for execution */
    QUEUING,
    /** runnable has been submitted but has not yet begun execution */
    QUEUED,
    RUNNING,
  }

  /** Underlying executor that all submitted Runnable objects are run on. */
  private final Executor executor;

  @GuardedBy("queue")
  private final Deque<Runnable> queue = new ArrayDeque<>();

  /** see {@link WorkerRunningState} */
  @GuardedBy("queue")
  private WorkerRunningState workerRunningState = IDLE;

  /**
   * This counter prevents an ABA issue where a thread may successfully schedule the worker, the
   * worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the
   * worker, and then the first thread's call to delegate.execute() returns. Without this counter,
   * it would observe the QUEUING state and set it to QUEUED, and the worker would never be
   * scheduled again for future submissions.
   */
  @GuardedBy("queue")
  private long workerRunCount = 0;

  @RetainedWith private final QueueWorker worker = new QueueWorker();

  /** Use {@link MoreExecutors#newSequentialExecutor} */
  SequentialExecutor(Executor executor) {
    this.executor = Preconditions.checkNotNull(executor);
  }

  /**
   * Adds a task to the queue and makes sure a worker thread is running.
   *
   * <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor,
   * execution of tasks will stop until a call to this method is made.
   */
  @Override
  public void execute(final Runnable task) {
    checkNotNull(task);
    final Runnable submittedTask;
    final long oldRunCount;
    synchronized (queue) {
      // If the worker is already running (or execute() on the delegate returned successfully, and
      // the worker has yet to start) then we don't need to start the worker.
      if (workerRunningState == RUNNING || workerRunningState == QUEUED) {
        queue.add(task);
        return;
      }

      oldRunCount = workerRunCount;

      // If the worker is not yet running, the delegate Executor might reject our attempt to start
      // it. To preserve FIFO order and failure atomicity of rejected execution when the same
      // Runnable is executed more than once, allocate a wrapper that we know is safe to remove by
      // object identity.
      // A data structure that returned a removal handle from add() would allow eliminating this
      // allocation.
      submittedTask =
          new Runnable() {
            @Override
            public void run() {
              task.run();
            }

            @Override
            public String toString() {
              return task.toString();
            }
          };
      queue.add(submittedTask);
      workerRunningState = QUEUING;
    }

    try {
      executor.execute(worker);
    } catch (RuntimeException | Error t) {
      synchronized (queue) {
        boolean removed =
            (workerRunningState == IDLE || workerRunningState == QUEUING)
                && queue.removeLastOccurrence(submittedTask);
        // If the delegate is directExecutor(), the submitted runnable could have thrown a REE. But
        // that's handled by the log check that catches RuntimeExceptions in the queue worker.
        if (!(t instanceof RejectedExecutionException) || removed) {
          throw t;
        }
      }
      return;
    }

    /*
     * This is an unsynchronized read! After the read, the function returns immediately or acquires
     * the lock to check again. Since an IDLE state was observed inside the preceding synchronized
     * block, and reference field assignment is atomic, this may save reacquiring the lock when
     * another thread or the worker task has cleared the count and set the state.
     *
     * <p>When {@link #executor} is a directExecutor(), the value written to
     * {@code workerRunningState} will be available synchronously, and behaviour will be
     * deterministic.
     */
    @SuppressWarnings("GuardedBy")
    boolean alreadyMarkedQueued = workerRunningState != QUEUING;
    if (alreadyMarkedQueued) {
      return;
    }
    synchronized (queue) {
      if (workerRunCount == oldRunCount && workerRunningState == QUEUING) {
        workerRunningState = QUEUED;
      }
    }
  }

  /** Worker that runs tasks from {@link #queue} until it is empty. */
  private final class QueueWorker implements Runnable {
    @CheckForNull Runnable task;

    @Override
    public void run() {
      try {
        workOnQueue();
      } catch (Error e) {
        synchronized (queue) {
          workerRunningState = IDLE;
        }
        throw e;
        // The execution of a task has ended abnormally.
        // We could have tasks left in the queue, so should perhaps try to restart a worker,
        // but then the Error will get delayed if we are using a direct (same thread) executor.
      }
    }

    /**
     * Continues executing tasks from {@link #queue} until it is empty.
     *
     * <p>The thread's interrupt bit is cleared before execution of each task.
     *
     * <p>If the Thread in use is interrupted before or during execution of the tasks in {@link
     * #queue}, the Executor will complete its tasks, and then restore the interruption. This means
     * that once the Thread returns to the Executor that this Executor composes, the interruption
     * will still be present. If the composed Executor is an ExecutorService, it can respond to
     * shutdown() by returning tasks queued on that Thread after {@link #worker} drains the queue.
     */
    private void workOnQueue() {
      boolean interruptedDuringTask = false;
      boolean hasSetRunning = false;
      try {
        while (true) {
          synchronized (queue) {
            // Choose whether this thread will run or not after acquiring the lock on the first
            // iteration
            if (!hasSetRunning) {
              if (workerRunningState == RUNNING) {
                // Don't want to have two workers pulling from the queue.
                return;
              } else {
                // Increment the run counter to avoid the ABA problem of a submitter marking the
                // thread as QUEUED after it already ran and exhausted the queue before returning
                // from execute().
                workerRunCount++;
                workerRunningState = RUNNING;
                hasSetRunning = true;
              }
            }
            task = queue.poll();
            if (task == null) {
              workerRunningState = IDLE;
              return;
            }
          }
          // Remove the interrupt bit before each task. The interrupt is for the "current task" when
          // it is sent, so subsequent tasks in the queue should not be caused to be interrupted
          // by a previous one in the queue being interrupted.
          interruptedDuringTask |= Thread.interrupted();
          try {
            task.run();
          } catch (RuntimeException e) {
            log.log(Level.SEVERE, "Exception while executing runnable " + task, e);
          } finally {
            task = null;
          }
        }
      } finally {
        // Ensure that if the thread was interrupted at all while processing the task queue, it
        // is returned to the delegate Executor interrupted so that it may handle the
        // interruption if it likes.
        if (interruptedDuringTask) {
          Thread.currentThread().interrupt();
        }
      }
    }

    @SuppressWarnings("GuardedBy")
    @Override
    public String toString() {
      Runnable currentlyRunning = task;
      if (currentlyRunning != null) {
        return "SequentialExecutorWorker{running=" + currentlyRunning + "}";
      }
      return "SequentialExecutorWorker{state=" + workerRunningState + "}";
    }
  }

  @Override
  public String toString() {
    return "SequentialExecutor@" + identityHashCode(this) + "{" + executor + "}";
  }
}