并发编程阻塞队列

2015/03/07 JUC

并发编程阻塞队列

BlockingQueue(阻塞队列)

所谓Blocking Queue是指其中的元素数量存在界限,当队列已满时(队列元素数量达到了最大容量的临界值),对队列进行写入操作的线程将被阻塞挂起,当队列为空时(队列元素数量达到了为0的临界值),对队列进行读取的操作线程将被阻塞挂起。实际上,BlockingQueue(LinkedTransferQueue除外)的内部实现主要依赖于显式锁Lock及其与之关联的Condition。BlockingQueue的实现都是线程安全的队列,在高并发的程序开发中,可以不用担心线程安全的问题而直接使用,另外,BlockingQueue在线程池服务(ExecutorService)中主要扮演着提供线程对任务存取容器的角色。

BlockingQueue接口为多线程操作同一个队列提供的4种处理方案为:

  • 抛出异常。在通过add(e)方法向队列尾部添加元素时,如果队列已满,则抛出IllegalStateException。在通过remove()和element()方法,分别从队列头部删除或读取元素时,如果队列为空,则抛出NoSuchElementException。
  • 返回特定值。在通过offer(e)方法向队列尾部添加元素时,如果队列已满,则返回false。在通过poll()和peek()方法,分别从队列头部删除或读取元素时,如果队列为空,则返回null。
  • 线程阻塞。在通过put(e)方法向队列尾部添加元素时,如果队列已满,则当前线程进入阻塞状态,直到队列有剩余的容量来添加元素,才退出阻塞。在通过take()方法从队列头部删除并返回元素时,如果队列为空,则当前线程进入阻塞状态,直到从队列中成功删除并返回元素为止。
  • 超时。超时和以上线程阻塞有一些共同之处。两者的区别在于当线程在特定条件下进入阻塞状态后,如果超过了offer(e,time,unit)和poll(time,unit)方法的参数所设置的时间限制,那么也会退出阻塞状态,分别返回false和null。例如poll(100, TimeUnit.MILLISECONDS)表示设置的时间限制为100毫秒。

在java.util.concurrent包中,BlockingQueue接口主要有以下实现类:

  • LinkedBlockingQueue类:默认情况下,LinkedBlockingQueue的容量是没有上限的(确切地说,默认的容量为Integer.MAX_VALUE),但是也可以选择指定其最大容量。它是基于链表的队列,此队列按FIFO(先进先出)的原则存取元素。
  • ArrayBlockingQueue类:它的ArrayBlockingQueue(int capacity, boolean fair)构造方法允许指定队列的容量,并可以选择是否采用公平策略。如果参数fair被设置为true,那么等待时间最长的线程会优先访问队列(其底层实现是通过将ReentrantLock设置为使用公平策略来达到这种公平性的)。通常,公平性以降低运行性能为代价,所以只有在确实非常需要的时候才使用这种公平机制。ArrayBlockingQueue是基于数组的队列,此队列按FIFO(先进先出)的原则存取元素。
  • PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按优先级顺序来删除,该队列的容量没有上限。
  • DelayQueue:在这个队列中存放的是延期元素,也就是说这些元素必须实现java.util.concurrent.Delayed接口。只有延迟期满的元素才能被取出或删除。当一个元素的getDelay(TimeUnit unit)方法返回一个小于或等于零的值时,则表示延迟期满。DelayQueue队列的容量没有上限。

ArrayBlockingQueue

ArrayBlockingQueue是一个基于数组结构实现的FIFO阻塞队列,在构造该阻塞队列时需要指定队列中最大元素的数量(容量)。当队列已满时,若再次进行数据写入操作,则线程将会进入阻塞,一直等待直到其他线程对元素进行消费。当队列为空时,对该队列的消费线程将会进入阻塞,直到有其他线程写入数据。该阻塞队列中提供了不同形式的读写方法。

阻塞式写方法

在ArrayBlockingQueue中提供了两个阻塞式写方法,分别如下(在该队列中,无论是阻塞式写方法还是非阻塞式写方法,都不允许写入null)。

  • void put(E e):向队列的尾部插入新的数据,当队列已满时调用该方法的线程会进入阻塞,直到有其他线程对该线程执行了中断操作,或者队列中的元素被其他线程消费。
    // 构造只有两个元素容量的ArrayBlockingQueue
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
          try
          {
          queue.put("first");
          queue.put("second");
          // 执行put将会使得当前线程进入阻塞
          queue.put("third");
          } catch (InterruptedException e)
          {
          e.printStackTrace();
          }
    
  • boolean offer(E e, long timeout, TimeUnit unit):向队列尾部写入新的数据,当队列已满时执行该方法的线程在指定的时间单位内将进入阻塞,直到到了指定的超时时间后,或者在此期间有其他线程对队列数据进行了消费。当然了,对由于执行该方法而进入阻塞的线程执行中断操作也可以使当前线程退出阻塞。该方法的返回值boolean为true时表示写入数据成功,为false时表示写入数据失败。
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
    try
    {
      queue.offer("first", 10, TimeUnit.SECONDS);
      queue.offer("second", 10, TimeUnit.SECONDS);
      // 该方法会进入阻塞,10秒之后当前线程将会退出阻塞,并且对third数据的写入将会失败
      queue.offer("third", 10, TimeUnit.SECONDS);
    } catch (InterruptedException e)
    {
      e.printStackTrace();
    }
    

非阻塞式写方法

当队列已满时写入数据,如果不想使得当前线程进入阻塞,那么就可以使用非阻塞式的写操作方法。

  • boolean add(E e):向队列尾部写入新的数据,当队列已满时不会进入阻塞,但是该方法会抛出队列已满的异常。
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
    // 写入元素成功
    assert queue.add("first");
    assert queue.add("second");
    try
    {
      // 写入失败,抛出异常
      queue.add("third");
    } catch (Exception e)
    {
      // 断言异常
      assert e instanceof IllegalStateException;
    }
    
  • boolean offer(E e):向队列尾部写入新的数据,当队列已满时不会进入阻塞,并且会立即返回false。
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
    assert queue.offer("first");
    assert queue.offer("second");
    // 写入失败
    assert !queue.offer("third");
    // 第三次offer操作失败,此时队列的size为2
    assert queue.size() == 2;
    

阻塞式读方法

ArrayBlockingQueue中提供了两个阻塞式读方法,分别如下。

  • E take():从队列头部获取数据,并且该数据会从队列头部移除,当队列为空时执行take方法的线程将进入阻塞,直到有其他线程写入新的数据,或者当前线程被执行了中断操作。
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
    assert queue.offer("first");
    assert queue.offer("second");
    try
    {
      // 由于是队列,因此这里的断言语句也遵从FIFO,第一个被take出来的数据是first
      assert queue.take().equals("first");
      assert queue.take().equals("second");
      // 进入阻塞
      queue.take();
    } catch (InterruptedException e)
    {
      e.printStackTrace();
    }
    
  • E poll(long timeout, TimeUnit unit):从队列头部获取数据并且该数据会从队列头部移除,如果队列中没有任何元素时则执行该方法,当前线程会阻塞指定的时间,直到在此期间有新的数据写入,或者阻塞的当前线程被其他线程中断,当线程由于超时退出阻塞时,返回值为null。
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
    assert queue.offer("first");
    assert queue.offer("second");
    try
    {
      // FIFO
      assert queue.poll(10, TimeUnit.SECONDS).equals("first");
      assert queue.poll(10, TimeUnit.SECONDS).equals("second");
      // 10秒以后线程退出阻塞,并且返回null值。
      assert queue.poll(10, TimeUnit.SECONDS) == null;
    } catch (InterruptedException e)
    {
      e.printStackTrace();
    }
    

非阻塞式读方法

当队列为空时读取数据,如果不想使得当前线程进入阻塞,那么就可以使用非阻塞式的读操作方法。

  • E poll():从队列头部获取数据并且该数据会从队列头部移除,当队列为空时,该方法不会使得当前线程进入阻塞,而是返回null值。
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
    assert queue.offer("first");
    assert queue.offer("second");
    // FIFO
    assert queue.poll().equals("first");
    assert queue.poll().equals("second");
    // 队列为空,立即返回但是结果为null
    assert queue.poll() == null;
    
  • E peek():peek的操作类似于debug操作(仅仅debug队列头部元素,本书的第6章将讲解针对Stream的操作,大家将从中学习到针对整个Stream数据元素的peek操作),它直接从队列头部获取一个数据,但是并不能从队列头部移除数据,当队列为空时,该方法不会使得当前线程进入阻塞,而是返回null值。
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
    assert queue.offer("first");
    assert queue.offer("second");
    // 第一次peek,从队列头部读取数据
    assert queue.peek().equals("first");
    // 第二次peek,从队列头部读取数据,同第一次
    assert queue.peek().equals("first");
    // 清除数据,队列为空
    queue.clear();
    // peek操作返回结果为null
    assert queue.peek() == null;
    

生产者消费者

高并发多线程的环境下对共享资源的访问,在绝大多数情况下都可以通过生产者消费者模式进行理论化概括化,ArrayBlocking Queue在高并发的环境中同时读写的代码如下。

// 定义阻塞队列
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 启动11个生产数据的线程,向队列的尾部写入数据
IntStream.rangeClosed(0, 10)
.boxed()
.map(i -> new Thread("P-Thread-" + i)
{
    @Override
    public void run()
    {
        while (true)
        {
            try
            {
                String data = String.valueOf(System.currentTimeMillis());
                queue.put(data);
                System.out.println(currentThread() + " produce data: " + data);
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
            } catch (InterruptedException e)
            {
                System.out.println("Received the interrupt SIGNAL.");
                break;
            }
        }
    }
}).forEach(Thread::start);

// 定义11个消费线程,从队列的头部移除数据
IntStream.rangeClosed(0, 10)
.boxed()
.map(i -> new Thread("C-Thread-" + i)
{
    @Override
    public void run()
    {
        while (true)
        {
            try
            {
                String data = queue.take();
                System.out.println(currentThread() + " consume data: " + data);
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
            } catch (InterruptedException e)
            {
                System.out.println("Received the interrupt SIGNAL.");
                break;
            }
        }
    }
}).forEach(Thread::start);

在上面的程序中,有22个针对queue的操作线程,我们并未提供对共享数据queue的线程安全保护措施,甚至没有进行任何临界值的判断与线程的挂起/唤醒动作,这一切都由该阻塞队列内部实现,因此开发者再也无需实现类似的队列,进行不同类型线程的数据交换和通信,运行上面的代码将会看到生产者与消费者在不断地交替输出。

除此之外,该阻塞队列还提供了一些其他方法,比如drainTo()排干队列中的数据到某个集合、remainingCapacity()获取剩余容量等,ArrayBlockingQueue除了实现了BlockingQueue定义的所有接口方法之外它还是Collection接口的实现类。

PriorityBlockingQueue

PriorityBlockingQueue优先级阻塞队列是一个“无边界”阻塞队列,该队列会根据某种规则(Comparator)对插入队列尾部的元素进行排序,因此该队列将不会遵循FIFO(first-in-first-out)的约束。虽然PriorityBlockingQueue同ArrayBlockingQueue都实现自同样的接口,拥有同样的方法,但是大多数方法的实现确实具有很大的差别,PriorityBlockingQueue也是线程安全的类,适用于高并发多线程的情况下。

排序且无边界的队列

只要应用程序的内存足够使用,理论上,PriorityBlockingQueue存放数据的数量是“无边界”的,在PriorityBlockingQueue内部维护了一个Object的数组,随着数据量的不断增多,该数组也会进行动态地扩容。在构造PriorityBlockingQueue时虽然提供了一个整数类型的参数,但是该参数所代表的含义与ArrayBlockingQueue完全不同,前者是构造PriorityBlockingQueue的初始容量,后者指定的整数类型参数则是ArrayBlockingQueue的最大容量。

// 创建PriorityBlockingQueue,并且制定初始容量为2
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(2);
// remainingCapacity()方法的返回始终都是Integer.MAX_VALUE0x7fffffff
assert queue.remainingCapacity() == 0x7fffffff;
// 写入4个元素进入队列
queue.offer(1);
queue.offer(10);
queue.offer(14);
queue.offer(3);
// 元素的size为4
assert queue.size() == 4;

通过上面的代码片段,我们更能理解构造PriorityBlockingQueue时指定的整数类型参数其作用只不过是队列的初始化容量,并不代表它最多能存放2个数据元素,同时方法remainingCapacity()的返回值被hard code(硬编码)为Integer.MAX_VALUE。

根据我们的理解,既然是优先级排序队列,为何在构造PriorityBlockingQueue时并未指定任何数据排序相关的接口呢?事实上,如果没有显示地指定Comparator,那么它将只支持实现了Comparable接口的数据类型。在上例中,Integer类型是Comparable的子类,因此我们并不需要指定Comparator,默认情况下,优先级最小的数据元素将被放在队列头部,优先级最大的数据元素将被放在队列尾部。

assert queue.poll() == 1;
assert queue.poll() == 3;
assert queue.poll() == 10;
assert queue.poll() == 14;

如果在创建PriorityBlockingQueue队列的时候既没有指定Comparator,同时数据元素也不是Comparable接口的子类,那么这种情况下,会出现类型转换的运行时异常。

...省略
// PriorityBlockingQueue源码
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    // 强制类型转换,如果不是Comparable接口子类,转换时将会出现异常
    Comparable<? super T> key = (Comparable<? super T>) x;
    ...省略
    array[k] = key;
}
...省略

不存在阻塞写方法

由于PriorityBlockingQueue是“无边界”的队列,因此将不存在对队列上限临界值的控制,在PriorityBlockingQueue中,添加数据元素的所有方法都等价于offer方法,从队列的尾部添加数据,但是该数据会根据排序规则对数据进行排序。

...省略
public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e); // never  block
}

public void put(E e) {
    offer(e); // never  block
}
...省略

优先级队列读方法

优先级队列添加元素的方法不存在阻塞(由于是“无边界”的),但是针对优先级队列元素的读方法则与ArrayBlockingQueue类似

LinkedBlockingQueue

ArrayBlockingQueue是基于数组实现的FIFO“有边界”队列,PriorityBlockingQueue也是基于数组实现的,但它是“无边界”的优先级队列,由于存在对数据元素的排序规则,因此PriorityBlockingQueue并不能提供FIFO的约束担保(当然,如果想要使其具备FIFO的特性,需要约束PriorityBlockingQueue的排序规则为R,并且对其写入数据的顺序也为R,这样就可以保证FIFO),LinkedBlockingQueue是“可选边界”基于链表实现的FIFO队列。截至目前,阻塞队列都是通过显式锁Lock进行共享数据的同步,以及与Lock关联的Condition进行线程间通知,因此该队列也适用于高并发的多线程环境中,是线程安全的类。

LinkedBlockingQueue队列的边界可选性是通过构造函数来决定的,当我们在创建LinkedBlockingQueue对象时,使用的是默认的构造函数,那么该队列的最大容量将为Integer的最大值(所谓的“无边界”),当然开发者可以通过指定队列最大容量(有边界)的方式创建队列。

// 无参构造函数
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
// LinkedBlockingQueue"无边界"
assert queue.remainingCapacity() == Integer.MAX_VALUE;

// 构造LinkedBlockingQueue时指定边界
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
assert queue.remainingCapacity() == 10;

在使用方式上,LinkedBlockingQueue与ArrayBlockingQueue极其相似。

DelayQueue

DelayQueue也是一个实现了BlockingQueue接口的“无边界”阻塞队列,但是该队列却是非常有意思和特殊的一个队列(存入DelayQueue中的数据元素会被延迟单位时间后才能消费),在DelayQueue中,元素也会根据优先级进行排序,这种排序可以是基于数据元素过期时间而进行的(比如,你可以将最快过期的数据元素排到队列头部,最晚过期的数据元素排到队尾)。

对于存入DelayQueue中的元素是有一定要求的:元素类型必须是Delayed接口的子类,存入DelayQueue中的元素需要重写getDelay(TimeUnit unit)方法用于计算该元素距离过期的剩余时间,如果在消费DelayQueue时发现并没有任何一个元素到达过期时间,那么对该队列的读取操作会立即返回null值,或者使得消费线程进入阻塞。

DelayQueue的基本使用

从前文的描述中我们可以得知,DelayQueue中的元素都必须是Delayed接口的子类,该接口继承自Comparable接口,并且定义了一个唯一的接口方法getDelay,如下所示。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

所以,首先需要实现Delayed接口,并且重写getDelay方法和compareTo方法。

// 继承自Delayed接口
class DelayedEntry implements Delayed
{
    // 元素数据内容
    private final String value;
    // 用于计算失效时间
    private final long time;

    private DelayedEntry(String value, long delayTime)
    {
        this.value = value;
    // 该元素可在(当前时间+delayTime)毫秒后消费,也就是说延迟消费delayTime毫秒
        this.time = delayTime + System.currentTimeMillis();
    }
    // 重写getDelay方法,返回当前元素的延迟时间还剩余(remaining)多少个时间单位
    @Override
    public long getDelay(TimeUnit unit)
    {
        long delta = time - System.currentTimeMillis();
        return unit.convert(delta, TimeUnit.MILLISECONDS);
    }

    public String getValue()
    {
        return value;
    }
    // 重写compareTo方法,根据我们所实现的代码可以看出,队列头部的元素是最早即将失效的数据元素
    @Override
    public int compareTo(Delayed o)
    {
        if (this.time < ((DelayedEntry) o).time)
        {
            return -1;
        } else if (this.time > ((DelayedEntry) o).time)
        {
            return 1;
        } else
            return 0;
    }
    @Override
    public String toString()
    {
        return "DelayedEntry{" +
                "value='" + value + '\" +
                ", time=" + time +
                '}';
    }
}

在DelayQueue中,每一个元素都必须是Delayed接口的子类,在上面的代码中,我们实现的DelayedEntry就是Delayed的子类,现在我们可以在DelayQueue中正常地存取DelayedEntry了。

// 定义DelayQueue,无需指定容量,因为DelayQueue是一个"无边界"的阻塞队列
DelayQueue<DelayedEntry> delayQueue = new DelayQueue<>();
// 存入数据A,数据A将在10000毫秒后过期,或者说会被延期10000毫秒后处理
delayQueue.put(new DelayedEntry("A", 10 * 1000L));
// 存入数据A,数据B将在5000毫秒后过期,或者说会被延期5000毫秒后处理
delayQueue.put(new DelayedEntry("B", 5 * 1000L));
// 记录时间戳
final long timestamp = System.currentTimeMillis();
// 非阻塞读方法,立即返回null,原因是当前AB元素不会有一个到达过期时间
assert delayQueue.poll() == null;


// take方法会阻塞5000毫秒左右,因为此刻队列中最快达到过期条件的数据B只能在5000毫秒以后
DelayedEntry value = delayQueue.take();
// 断言队列头部的元素为B
assert value.getValue().equals("B");
// 耗时5000毫秒或以上
assert (System.currentTimeMillis() - timestamp) >= 5_000L;

// 再次执行take操作
value = delayQueue.take();
// 断言队列头部的元素为A
assert value.getValue().equals("A");
// 耗时在10000毫秒或以上
assert (System.currentTimeMillis() - timestamp) >= 10_000L;

读取DelayQueue中的数据

DelayQueue队列区别于我们之前学习过的队列,其中之一就是存入该队列的元素必须是Delayed的子类,除此之外队列中的数据元素会被延迟(Delay)消费,这也正是延迟队列名称的由来。与PriorityBlockingQueue一样,DelayQueue中有关增加元素的所有方法都等价于offer(E e),并不存在针对队列临界值上限的控制,因此也不存在阻塞写的情况(多线程争抢导致的线程阻塞另当别论)但是对该队列中数据元素的消费(延迟消费)则有别于本节中接触过的其他阻塞队列。

  • remainingCapacity()方法始终返回Integer.MAX_VALUE
    DelayQueue<DelayedEntry> delayQueue = new DelayQueue<>();
    assert delayQueue.size() == 0;
    assert delayQueue.remainingCapacity() == Integer.MAX_VALUE;
    delayQueue.put(new DelayedEntry("A", 10 * 1000L));
    delayQueue.put(new DelayedEntry("B", 5 * 1000L));
    assert delayQueue.size() == 2;
    assert delayQueue.remainingCapacity() == Integer.MAX_VALUE;
    
  • peek():非阻塞读方法,立即返回但并不移除DelayQueue的头部元素,当队列为空时返回null。 ```java DelayQueue delayQueue = new DelayQueue<>(); // 队列为空时,peek方法立即返回null assert delayQueue.peek()==null; delayQueue.put(new DelayedEntry("A", 10 * 1000L)); delayQueue.put(new DelayedEntry("B", 5 * 1000L));

// 队列不为空时,peek方法不会出现延迟,而且立即返回队列头部的元素,但不移除 assert delayQueue.peek().getValue().equals(“B”);


- poll():非阻塞读方法,当队列为空或者队列头部元素还未到达过期时间时返回值为null,否则将会从队列头部立即将元素移除并返回。
```java
DelayQueue<DelayedEntry> delayQueue = new DelayQueue<>();
// 队列为空,立即返回null
assert delayQueue.poll() == null;

// 队列中存入数据
delayQueue.put(new DelayedEntry("A", 10 * 1000L));
delayQueue.put(new DelayedEntry("B", 5 * 1000L));
// 队列不为空,但是队头元素并未达到超时时间,立即返回null
assert delayQueue.poll() == null;

// 休眠5秒,使得头部元素到达超时时间
TimeUnit.SECONDS.sleep(5);
// 立即返回元素B
assert delayQueue.poll().getValue().equals("B");
  • poll(long timeout, TimeUnit unit):最大阻塞单位时间,当达到阻塞时间后,此刻为空或者队列头部元素还未达到过期时间时返回值为null,否则将会立即从队列头部将元素移除并返回。 ```java DelayQueue delayQueue = new DelayQueue<>(); // 队列为空,该方法会阻塞10毫秒并且返回null assert delayQueue.poll(10, TimeUnit.MILLISECONDS) == null;

delayQueue.put(new DelayedEntry(“A”, 10 * 1000L)); delayQueue.put(new DelayedEntry(“B”, 5 * 1000L));

// 队列不为空,但是队头元素在10毫秒内不会达到过期时间 assert delayQueue.poll(10, TimeUnit.MILLISECONDS) == null; // 休眠5秒 TimeUnit.SECONDS.sleep(5);

// 移除并返回B assert delayQueue.poll(10, TimeUnit.MILLISECONDS) .getValue().equals(“B”);


- take():阻塞式的读取方法,该方法会一直阻塞到队列中有元素,并且队列中的头部元素已达到过期时间,然后将其从队列中移除并返回。

## SynchronousQueue
SynchronousQueue也是实现自BlockingQueue的一个阻塞队列,每一次对其的写入操作必须等待(阻塞)其他线程进行对应的移除操作,SynchronousQueue的内部并不会涉及容量、获取size,就连peek方法的返回值永远都将会是null,除此之外还有更多的方法在SynchronousQueue中也都未提供对应的支持(列举如下),因此在使用的过程中需要引起注意,否则会使得程序的运行出现不符合预期的错误。
- clear():清空队列的方法在SynchronousQueue中不起任何作用。
- contains(Object o):永远返回false。
- containsAll(Collection<?> c):等价于c是否为空的判断。
- isEmpty():永远返回true。
- iterator():返回一个空的迭代器。
- peek():永远返回null。
- remainingCapacity():始终返回0。
- remove(Object o):不做任何删除,并且始终返回false。
- removeAll(Collection<?> c):不做任何删除,始终返回false。
- retainAll(Collection<?> c):始终返回false。
- size():返回值始终为0。
- spliterator():返回一个空的Spliterator(关于Spliterator,我们会在本书的第6章的6.3.2节“Spliterator详解”一节进行详细介绍)。
- toArray()及toArray(T[] a)方法同样也不支持。

看起来好多方法在SynchronousQueue中都不提供对应的支持,那么SynchronousQueue是一个怎样的队列呢?简单来说,我们可以借助于SynchronousQueue在两个线程间进行线程安全的数据交换。

尽管SynchronousQueue是一个队列,但是它的主要作用在于在两个线程之间进行数据交换,区别于Exchanger的主要地方在于(站在使用的角度)SynchronousQueue所涉及的一对线程一个更加专注于数据的生产,另一个更加专注于数据的消费(各司其职),而Exchanger则更加强调一对线程数据的交换。打开Exchanger的官方文档,可以看到如下的一句话:
```java
An Exchanger may be viewed as a bidirectional form of a {@link SynchronousQueue}. Exchanger 可以看作一个双向的SynchronousQueue

SynchronousQueue在日常的开发使用中并不是很常见,即使在JDK内部,该队列也仅用于ExecutorService中的Cache Thread Pool创建,本节只是简单了解一下SynchronousQueue的基本使用方法即可。

// 定义String类型的SynchronousQueue
SynchronousQueue<String> queue = new SynchronousQueue<>();

// 启动两个线程,向queue中写入数据
IntStream.rangeClosed(0, 1).forEach(i ->
        new Thread(() ->
        {
            try
            {
        // 若没有对应的数据消费线程,则put方法将会导致当前线程进入阻塞
                queue.put(currentThread().getName());
                System.out.println(currentThread() + " put element " + currentThread().getName());
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }).start()
);

// 启动两个线程从queue中消费数据
IntStream.rangeClosed(0, 1).forEach(i ->
        new Thread(() ->
        {
            try
            {
        // 若没有对应的数据生产线程,则take方法将会导致当前线程进入阻塞
                String value = queue.take();
                System.out.println(currentThread() + " take " + value);
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }).start()
);
// 运行上面的程序将得到如下所示的输出结果
Thread[Thread-2,5,main] take Thread-0
Thread[Thread-0,5,main] put element Thread-0
Thread[Thread-3,5,main] take Thread-1
Thread[Thread-1,5,main] put element Thread-1

LinkedBlockingDeque

LinkedBlockingDeque是一个基于链表实现的双向(Double Ended Queue,Deque)阻塞队列,双向队列支持在队尾写入数据,读取移除数据;在队头写入数据,读取移除数据。LinkedBlockingDeque实现自BlockingDeque(BlockingDeque又是BlockingQueue的子接口),并且支持可选“边界”,与LinkedBlockingQueue一样,对边界的指定在构造LinkedBlockingDeque时就已经确定了。

既然是双向队列,那么LinkedBlockingDeque所提供的操作方法要比单向队列丰富很多,为了节省篇幅,此处将不再展开介绍,对逐个方法进行讲述,读者可以通过阅读JDK官方文档或者其他方式进行学习和掌握,官方文档地址如下。

https:// docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingDeque.html

LinkedTransferQueue

TransferQueue是一个继承了BlockingQueue的接口,并且增加了若干新的方法。LinkedTransferQueue是TransferQueue接口的实现类,其定义为一个无界的队列,具有FIFO的特性。

继承自BlockingQueue的方法在使用方法上与本节中学过的其他BlockingQueue并没有太大的区别(SynchronousQueue除外),因此我们只介绍继承自TransferQueue的方法,看看TransferQueue为其赋予了怎样的新特性。

transfer方法

当某个线程执行了transfer方法后将会进入阻塞,直到有其他线程对transfer的数据元素进行了poll或者take,否则当前线程会将该数据元素插入队列尾部,并且等待其他线程对其进行消费。这段文字描述包含了一些非常苛刻的要求,首先,LinkedTransferQueue是一个队列,是可以存放无限(Integer.MAX_VALUE)数据元素的队列,因此允许同时有多个线程将数据元素插入队列尾部;其次当线程A通过transfer方法将元素E插入队列尾部时,即使此时此刻有其他线程也对该队列进行着消费操作,如果元素E未被消费,那么线程A同样也会进入阻塞直到元素E被其他线程消费。下面看一个简单的代码片段了解一下transfer方法的特性。

// 定义LinkedTransferQueue
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
// 通过不同的方法在队列尾部插入三个数据元素
queue.add("hello");
queue.offer("world");
queue.put("Java");
// 此时该队列的数据元素为(队尾)Java->world->hello
new Thread(() ->
{
    try
    {
        // 创建匿名线程,并且执行transfer方法
        queue.transfer("Alex");
    } catch (InterruptedException e)
    {
        e.printStackTrace();
    }
    System.out.println("current thread exit.");
}).start();
// 此刻队列的数据元素为(队尾)Alex->Java->world->hello
TimeUnit.SECONDS.sleep(2);
// 执行take方法从队列头部移除消费元素hello,但是匿名线程仍旧被阻塞
System.out.println(queue.take());
// 在队尾插入新的数据元素(队尾)Scala->Alex->Java->world
queue.put("Scala");
// 执行poll方法从队列头部移除消费元素world,匿名线程继续被阻塞
System.out.println(queue.poll());
// 执行take方法从队列头部移除消费元素Java,匿名线程继续阻塞中
System.out.println(queue.take());
// 执行take方法从队列头部移除消费元素Alex,匿名线程退出阻塞
System.out.println(queue.take());

上面程序的运行与我们在代码注释中的分析完全一致,这就是transfer方法的主要特性,非常类似于SynchronousQueue的put方法,但是不同于SynchronousQueue的地方在于LinkedTransferQueue存在容量,允许无限多个数据元素的插入,而前者则不支持。

tryTransfer方法

与transfer方法不同的是,tryTransfer方法并不会使得执行线程进入阻塞,如果当前并没有线程等待对元素E的消费(poll或者take),那么执行tryTransfer方法会立即返回失败,并且元素E也不会插入队列的尾部(transfer不成功),否则返回成功。

LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
queue.add("hello");
queue.offer("world");
new Thread(() ->
{
    // 立即返回false
    assert !queue.tryTransfer("Alex");
    System.out.println("current thread exit.");
}).start();
TimeUnit.SECONDS.sleep(2);
// Alex并未插入至队尾
assert queue.size() == 2;

tryTransfer还有一个重载方法,支持最大超时时间的设定,在设定的最大超时时间内,如果没有其他线程对transfer的数据元素进行消费,那么元素E将不会被插入队列尾部,并且退出阻塞,如果在单位时间内有其他线程消费transfer的元素数据,则返回成功并退出阻塞。

LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
queue.add("hello");
queue.offer("world");
new Thread(() ->
{
    try
    {
    // 在单位时间(3秒)内如果有其他线程对Alex进行消费,则退出阻塞,成功插入队尾
        assert queue.tryTransfer("Alex", 3, TimeUnit.SECONDS);
    } catch (InterruptedException e)
    {
        e.printStackTrace();
    }
    System.out.println("current thread exit.");
}).start();
TimeUnit.SECONDS.sleep(2);
assert queue.take().equals("hello");
assert queue.take().equals("world");
// 主线程成功消费数据元素Alex
assert queue.take().equals("Alex");

其他monitor方法

在TransferQueue中还提供了两个与monitor相关的方法,主要用于获取当前是否有消费者线程在等待消费TransferQueue中的数据。

LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
// 启动三个线程消费queue中的元素(从头部开始)
for (int i = 0; i < 3; i++)
{
    new Thread(() ->
    {
        try
        {
            System.out.println(queue.take());
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread() + " consume data over.");
    }).start();
}
// 休眠1秒,确保3个线程均已启动且阻塞
TimeUnit.SECONDS.sleep(1);
// 断言正在等待消费的线程以及数量
assert queue.hasWaitingConsumer();
assert queue.getWaitingConsumerCount() == 3;
// 插入一条数据至队列
queue.offer("test");
assert queue.hasWaitingConsumer();
assert queue.getWaitingConsumerCount() == 2;

关于LinkedTransferQueue的使用就介绍这么多了,对比前文中所介绍过的其他阻塞队列,LinkedTransferQueue更像是一个集成了LinkedBlockingQueue和SynchronousQueue特性的阻塞队列,它们所具备的特点在LinkedTransferQueue中都可以得到体现,通过学习我们实际上可以看出,LinkedTransferQueue相比较于SynchronousQueue可以存储更多的元素数据,在支持LinkedBlockingQueue所有方法的同时又有比它更好的性能表现,因为在LinkedTransferQueue中没有使用到锁,同步操作均是由CAS算法和LockSupport提供的。

BlockingQueue总结

7种BlockingQueue,每一种BlockingQueue都有是线程安全的队列,非常适合应用于高并发多线程的应用程序中,虽然每一种阻塞队列在使用和实现上都有各自不同的特点,但是它们也存在着诸多的共性。比如,它们都有阻塞式的操作方法;它们都存在边界的概念(不管是有边界、无边界还是可选边界);它们都只允许非空的元素数据存入等。在使用中,程序开发者需要根据它们所具备的特性做出正确的选择,在合理的地方解决与之对应的问题。

Search

    微信好友

    博士的沙漏

    Table of Contents