Coverage Summary for Class: Queues (com.google.common.collect)

Class Class, % Method, % Line, %
Queues 0% (0/1) 0% (0/23) 0% (0/79)


1 /* 2  * Copyright (C) 2011 The Guava Authors 3  * 4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5  * in compliance with the License. You may obtain a copy of the License at 6  * 7  * http://www.apache.org/licenses/LICENSE-2.0 8  * 9  * Unless required by applicable law or agreed to in writing, software distributed under the License 10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11  * or implied. See the License for the specific language governing permissions and limitations under 12  * the License. 13  */ 14  15 package com.google.common.collect; 16  17 import com.google.common.annotations.Beta; 18 import com.google.common.annotations.GwtCompatible; 19 import com.google.common.annotations.GwtIncompatible; 20 import com.google.common.base.Preconditions; 21 import com.google.errorprone.annotations.CanIgnoreReturnValue; 22 import java.util.ArrayDeque; 23 import java.util.Collection; 24 import java.util.Deque; 25 import java.util.PriorityQueue; 26 import java.util.Queue; 27 import java.util.concurrent.ArrayBlockingQueue; 28 import java.util.concurrent.BlockingQueue; 29 import java.util.concurrent.ConcurrentLinkedQueue; 30 import java.util.concurrent.LinkedBlockingDeque; 31 import java.util.concurrent.LinkedBlockingQueue; 32 import java.util.concurrent.PriorityBlockingQueue; 33 import java.util.concurrent.SynchronousQueue; 34 import java.util.concurrent.TimeUnit; 35  36 /** 37  * Static utility methods pertaining to {@link Queue} and {@link Deque} instances. Also see this 38  * class's counterparts {@link Lists}, {@link Sets}, and {@link Maps}. 39  * 40  * @author Kurt Alfred Kluever 41  * @since 11.0 42  */ 43 @GwtCompatible(emulated = true) 44 public final class Queues { 45  private Queues() {} 46  47  // ArrayBlockingQueue 48  49  /** 50  * Creates an empty {@code ArrayBlockingQueue} with the given (fixed) capacity and nonfair access 51  * policy. 52  */ 53  @GwtIncompatible // ArrayBlockingQueue 54  public static <E> ArrayBlockingQueue<E> newArrayBlockingQueue(int capacity) { 55  return new ArrayBlockingQueue<E>(capacity); 56  } 57  58  // ArrayDeque 59  60  /** 61  * Creates an empty {@code ArrayDeque}. 62  * 63  * @since 12.0 64  */ 65  public static <E> ArrayDeque<E> newArrayDeque() { 66  return new ArrayDeque<E>(); 67  } 68  69  /** 70  * Creates an {@code ArrayDeque} containing the elements of the specified iterable, in the order 71  * they are returned by the iterable's iterator. 72  * 73  * @since 12.0 74  */ 75  public static <E> ArrayDeque<E> newArrayDeque(Iterable<? extends E> elements) { 76  if (elements instanceof Collection) { 77  return new ArrayDeque<E>((Collection<? extends E>) elements); 78  } 79  ArrayDeque<E> deque = new ArrayDeque<E>(); 80  Iterables.addAll(deque, elements); 81  return deque; 82  } 83  84  // ConcurrentLinkedQueue 85  86  /** Creates an empty {@code ConcurrentLinkedQueue}. */ 87  @GwtIncompatible // ConcurrentLinkedQueue 88  public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue() { 89  return new ConcurrentLinkedQueue<E>(); 90  } 91  92  /** 93  * Creates a {@code ConcurrentLinkedQueue} containing the elements of the specified iterable, in 94  * the order they are returned by the iterable's iterator. 95  */ 96  @GwtIncompatible // ConcurrentLinkedQueue 97  public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue( 98  Iterable<? extends E> elements) { 99  if (elements instanceof Collection) { 100  return new ConcurrentLinkedQueue<E>((Collection<? extends E>) elements); 101  } 102  ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>(); 103  Iterables.addAll(queue, elements); 104  return queue; 105  } 106  107  // LinkedBlockingDeque 108  109  /** 110  * Creates an empty {@code LinkedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE}. 111  * 112  * @since 12.0 113  */ 114  @GwtIncompatible // LinkedBlockingDeque 115  public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque() { 116  return new LinkedBlockingDeque<E>(); 117  } 118  119  /** 120  * Creates an empty {@code LinkedBlockingDeque} with the given (fixed) capacity. 121  * 122  * @throws IllegalArgumentException if {@code capacity} is less than 1 123  * @since 12.0 124  */ 125  @GwtIncompatible // LinkedBlockingDeque 126  public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(int capacity) { 127  return new LinkedBlockingDeque<E>(capacity); 128  } 129  130  /** 131  * Creates a {@code LinkedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE}, containing 132  * the elements of the specified iterable, in the order they are returned by the iterable's 133  * iterator. 134  * 135  * @since 12.0 136  */ 137  @GwtIncompatible // LinkedBlockingDeque 138  public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(Iterable<? extends E> elements) { 139  if (elements instanceof Collection) { 140  return new LinkedBlockingDeque<E>((Collection<? extends E>) elements); 141  } 142  LinkedBlockingDeque<E> deque = new LinkedBlockingDeque<E>(); 143  Iterables.addAll(deque, elements); 144  return deque; 145  } 146  147  // LinkedBlockingQueue 148  149  /** Creates an empty {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE}. */ 150  @GwtIncompatible // LinkedBlockingQueue 151  public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue() { 152  return new LinkedBlockingQueue<E>(); 153  } 154  155  /** 156  * Creates an empty {@code LinkedBlockingQueue} with the given (fixed) capacity. 157  * 158  * @throws IllegalArgumentException if {@code capacity} is less than 1 159  */ 160  @GwtIncompatible // LinkedBlockingQueue 161  public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(int capacity) { 162  return new LinkedBlockingQueue<E>(capacity); 163  } 164  165  /** 166  * Creates a {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE}, containing 167  * the elements of the specified iterable, in the order they are returned by the iterable's 168  * iterator. 169  * 170  * @param elements the elements that the queue should contain, in order 171  * @return a new {@code LinkedBlockingQueue} containing those elements 172  */ 173  @GwtIncompatible // LinkedBlockingQueue 174  public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(Iterable<? extends E> elements) { 175  if (elements instanceof Collection) { 176  return new LinkedBlockingQueue<E>((Collection<? extends E>) elements); 177  } 178  LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>(); 179  Iterables.addAll(queue, elements); 180  return queue; 181  } 182  183  // LinkedList: see {@link com.google.common.collect.Lists} 184  185  // PriorityBlockingQueue 186  187  /** 188  * Creates an empty {@code PriorityBlockingQueue} with the ordering given by its elements' natural 189  * ordering. 190  * 191  * @since 11.0 (but the bound of {@code E} was changed from {@code Object} to {@code Comparable} 192  * in 15.0) 193  */ 194  @GwtIncompatible // PriorityBlockingQueue 195  public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue() { 196  return new PriorityBlockingQueue<E>(); 197  } 198  199  /** 200  * Creates a {@code PriorityBlockingQueue} containing the given elements. 201  * 202  * <p><b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue}, 203  * this priority queue will be ordered according to the same ordering. 204  * 205  * @since 11.0 (but the bound of {@code E} was changed from {@code Object} to {@code Comparable} 206  * in 15.0) 207  */ 208  @GwtIncompatible // PriorityBlockingQueue 209  public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue( 210  Iterable<? extends E> elements) { 211  if (elements instanceof Collection) { 212  return new PriorityBlockingQueue<E>((Collection<? extends E>) elements); 213  } 214  PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>(); 215  Iterables.addAll(queue, elements); 216  return queue; 217  } 218  219  // PriorityQueue 220  221  /** 222  * Creates an empty {@code PriorityQueue} with the ordering given by its elements' natural 223  * ordering. 224  * 225  * @since 11.0 (but the bound of {@code E} was changed from {@code Object} to {@code Comparable} 226  * in 15.0) 227  */ 228  public static <E extends Comparable> PriorityQueue<E> newPriorityQueue() { 229  return new PriorityQueue<E>(); 230  } 231  232  /** 233  * Creates a {@code PriorityQueue} containing the given elements. 234  * 235  * <p><b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue}, 236  * this priority queue will be ordered according to the same ordering. 237  * 238  * @since 11.0 (but the bound of {@code E} was changed from {@code Object} to {@code Comparable} 239  * in 15.0) 240  */ 241  public static <E extends Comparable> PriorityQueue<E> newPriorityQueue( 242  Iterable<? extends E> elements) { 243  if (elements instanceof Collection) { 244  return new PriorityQueue<E>((Collection<? extends E>) elements); 245  } 246  PriorityQueue<E> queue = new PriorityQueue<E>(); 247  Iterables.addAll(queue, elements); 248  return queue; 249  } 250  251  // SynchronousQueue 252  253  /** Creates an empty {@code SynchronousQueue} with nonfair access policy. */ 254  @GwtIncompatible // SynchronousQueue 255  public static <E> SynchronousQueue<E> newSynchronousQueue() { 256  return new SynchronousQueue<E>(); 257  } 258  259  /** 260  * Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested {@code 261  * numElements} elements are not available, it will wait for them up to the specified timeout. 262  * 263  * @param q the blocking queue to be drained 264  * @param buffer where to add the transferred elements 265  * @param numElements the number of elements to be waited for 266  * @param timeout how long to wait before giving up 267  * @return the number of elements transferred 268  * @throws InterruptedException if interrupted while waiting 269  * @since 28.0 270  */ 271  @Beta 272  @CanIgnoreReturnValue 273  @GwtIncompatible // BlockingQueue 274  public static <E> int drain( 275  BlockingQueue<E> q, Collection<? super E> buffer, int numElements, java.time.Duration timeout) 276  throws InterruptedException { 277  // TODO(b/126049426): Consider using saturateToNanos(timeout) instead. 278  return drain(q, buffer, numElements, timeout.toNanos(), TimeUnit.NANOSECONDS); 279  } 280  281  /** 282  * Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested {@code 283  * numElements} elements are not available, it will wait for them up to the specified timeout. 284  * 285  * @param q the blocking queue to be drained 286  * @param buffer where to add the transferred elements 287  * @param numElements the number of elements to be waited for 288  * @param timeout how long to wait before giving up, in units of {@code unit} 289  * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter 290  * @return the number of elements transferred 291  * @throws InterruptedException if interrupted while waiting 292  */ 293  @Beta 294  @CanIgnoreReturnValue 295  @GwtIncompatible // BlockingQueue 296  @SuppressWarnings("GoodTime") // should accept a java.time.Duration 297  public static <E> int drain( 298  BlockingQueue<E> q, 299  Collection<? super E> buffer, 300  int numElements, 301  long timeout, 302  TimeUnit unit) 303  throws InterruptedException { 304  Preconditions.checkNotNull(buffer); 305  /* 306  * This code performs one System.nanoTime() more than necessary, and in return, the time to 307  * execute Queue#drainTo is not added *on top* of waiting for the timeout (which could make 308  * the timeout arbitrarily inaccurate, given a queue that is slow to drain). 309  */ 310  long deadline = System.nanoTime() + unit.toNanos(timeout); 311  int added = 0; 312  while (added < numElements) { 313  // we could rely solely on #poll, but #drainTo might be more efficient when there are multiple 314  // elements already available (e.g. LinkedBlockingQueue#drainTo locks only once) 315  added += q.drainTo(buffer, numElements - added); 316  if (added < numElements) { // not enough elements immediately available; will have to poll 317  E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS); 318  if (e == null) { 319  break; // we already waited enough, and there are no more elements in sight 320  } 321  buffer.add(e); 322  added++; 323  } 324  } 325  return added; 326  } 327  328  /** 329  * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, Duration)}, but with a 330  * different behavior in case it is interrupted while waiting. In that case, the operation will 331  * continue as usual, and in the end the thread's interruption status will be set (no {@code 332  * InterruptedException} is thrown). 333  * 334  * @param q the blocking queue to be drained 335  * @param buffer where to add the transferred elements 336  * @param numElements the number of elements to be waited for 337  * @param timeout how long to wait before giving up 338  * @return the number of elements transferred 339  * @since 28.0 340  */ 341  @Beta 342  @CanIgnoreReturnValue 343  @GwtIncompatible // BlockingQueue 344  public static <E> int drainUninterruptibly( 345  BlockingQueue<E> q, 346  Collection<? super E> buffer, 347  int numElements, 348  java.time.Duration timeout) { 349  // TODO(b/126049426): Consider using saturateToNanos(timeout) instead. 350  return drainUninterruptibly(q, buffer, numElements, timeout.toNanos(), TimeUnit.NANOSECONDS); 351  } 352  353  /** 354  * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)}, but 355  * with a different behavior in case it is interrupted while waiting. In that case, the operation 356  * will continue as usual, and in the end the thread's interruption status will be set (no {@code 357  * InterruptedException} is thrown). 358  * 359  * @param q the blocking queue to be drained 360  * @param buffer where to add the transferred elements 361  * @param numElements the number of elements to be waited for 362  * @param timeout how long to wait before giving up, in units of {@code unit} 363  * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter 364  * @return the number of elements transferred 365  */ 366  @Beta 367  @CanIgnoreReturnValue 368  @GwtIncompatible // BlockingQueue 369  @SuppressWarnings("GoodTime") // should accept a java.time.Duration 370  public static <E> int drainUninterruptibly( 371  BlockingQueue<E> q, 372  Collection<? super E> buffer, 373  int numElements, 374  long timeout, 375  TimeUnit unit) { 376  Preconditions.checkNotNull(buffer); 377  long deadline = System.nanoTime() + unit.toNanos(timeout); 378  int added = 0; 379  boolean interrupted = false; 380  try { 381  while (added < numElements) { 382  // we could rely solely on #poll, but #drainTo might be more efficient when there are 383  // multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once) 384  added += q.drainTo(buffer, numElements - added); 385  if (added < numElements) { // not enough elements immediately available; will have to poll 386  E e; // written exactly once, by a successful (uninterrupted) invocation of #poll 387  while (true) { 388  try { 389  e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS); 390  break; 391  } catch (InterruptedException ex) { 392  interrupted = true; // note interruption and retry 393  } 394  } 395  if (e == null) { 396  break; // we already waited enough, and there are no more elements in sight 397  } 398  buffer.add(e); 399  added++; 400  } 401  } 402  } finally { 403  if (interrupted) { 404  Thread.currentThread().interrupt(); 405  } 406  } 407  return added; 408  } 409  410  /** 411  * Returns a synchronized (thread-safe) queue backed by the specified queue. In order to guarantee 412  * serial access, it is critical that <b>all</b> access to the backing queue is accomplished 413  * through the returned queue. 414  * 415  * <p>It is imperative that the user manually synchronize on the returned queue when accessing the 416  * queue's iterator: 417  * 418  * <pre>{@code 419  * Queue<E> queue = Queues.synchronizedQueue(MinMaxPriorityQueue.<E>create()); 420  * ... 421  * queue.add(element); // Needn't be in synchronized block 422  * ... 423  * synchronized (queue) { // Must synchronize on queue! 424  * Iterator<E> i = queue.iterator(); // Must be in synchronized block 425  * while (i.hasNext()) { 426  * foo(i.next()); 427  * } 428  * } 429  * }</pre> 430  * 431  * <p>Failure to follow this advice may result in non-deterministic behavior. 432  * 433  * <p>The returned queue will be serializable if the specified queue is serializable. 434  * 435  * @param queue the queue to be wrapped in a synchronized view 436  * @return a synchronized view of the specified queue 437  * @since 14.0 438  */ 439  public static <E> Queue<E> synchronizedQueue(Queue<E> queue) { 440  return Synchronized.queue(queue, null); 441  } 442  443  /** 444  * Returns a synchronized (thread-safe) deque backed by the specified deque. In order to guarantee 445  * serial access, it is critical that <b>all</b> access to the backing deque is accomplished 446  * through the returned deque. 447  * 448  * <p>It is imperative that the user manually synchronize on the returned deque when accessing any 449  * of the deque's iterators: 450  * 451  * <pre>{@code 452  * Deque<E> deque = Queues.synchronizedDeque(Queues.<E>newArrayDeque()); 453  * ... 454  * deque.add(element); // Needn't be in synchronized block 455  * ... 456  * synchronized (deque) { // Must synchronize on deque! 457  * Iterator<E> i = deque.iterator(); // Must be in synchronized block 458  * while (i.hasNext()) { 459  * foo(i.next()); 460  * } 461  * } 462  * }</pre> 463  * 464  * <p>Failure to follow this advice may result in non-deterministic behavior. 465  * 466  * <p>The returned deque will be serializable if the specified deque is serializable. 467  * 468  * @param deque the deque to be wrapped in a synchronized view 469  * @return a synchronized view of the specified deque 470  * @since 15.0 471  */ 472  public static <E> Deque<E> synchronizedDeque(Deque<E> deque) { 473  return Synchronized.deque(deque, null); 474  } 475 }