了解什么是阻塞队列!

阻塞队列的作用

BlockingQueue是一个接口:

public interface BlockingQueue<E> extends Queue<E>{...}

继承Queue接口,是队列的一种。BlockingQueue是线程安全的。

因为阻塞队列是线程安全的,那么可以通过队列往另一个线程传递数据。

并发队列关系图


Java提供的线程安全的队列中,BlockingQueue接口的实现类都是阻塞队列。非阻塞并发队列的典型就是ConcurrentLinkedQueue,这个类不会让线程阻塞,利用CAS保证线程安全。

除此之外还有一个Deque接口,这是双端队列(double-ended-queue)。

阻塞队列的特点

主要突出在“阻塞”两字上。阻塞功能使得生产者和消费者能力平衡。

take

用于获取并移除队列的头结点。当队列里没有数据时,会阻塞线程,一旦队列中有了数据就立刻解除阻塞!

put

用于添加数据,队列满了,阻塞当前线程。

容量

LinkedBlockingQueue的上限是Interger.MAX_VALUE。还有些的队列是有界的,比如ArrayBlockingQueue(不会扩容)。

队列的常用的八个方法

  1. 抛出异常:add、remove、element
  2. 返回结果但是不抛异常:offer、poll、peek
  3. 阻塞:put、take

add、remove、element

  • add
    数量超出队列上限抛出异常java.lang.IllegalStateException:Queue full
  • remove
    队列为空,无法remove,抛出异常java.util.NoSuchElementException
  • element
    该方法是只返回头结点,但是不删除!空队列抛出异常java.util.NoSuchElementException

offer、poll、peek

这一组并不是抛出异常,而是返回提示信息

  • offer
    用来插入一个元素,并返回值来提示是否插入成功。
  • poll
    和remove对应,队列为空只会返回null,不会抛出异常
  • peek
    和element对应,返回null。
  • 带超时时间的offer和poll
offer(E e, long timeout, TimeUnit unit)
poll(long timeout, TimeUnit unit)

put、take

同上!

总结

常见的阻塞队列

BlockingQueue实现类都在JUC包下!

ArrayBlockingQueue

典型的有界队列,使用ReentrantLock实现线程安全,内部用数组存储元素。

ArrayBlockingQueue(int capacity, boolean fair)

通过构造函数传入容量(后续不能扩容)和是否公平。

LinkedBlockingQueue

顾名思义,使用链表的BlockingQueue,如果不指定大小,则就是最大值integer.MAX_VALUE

SynchronousQueue

这是一个容量为0的队列,因此它意味着每次取数据或者放数据都到阻塞吗,等有线程来放或者取!

SynchronousQueue的容量是0不是1,它不存储数据,所做的只有直接传递(direct handoff)

当然由于容量是0,很多方法就变得返回值就是null或者抛出异常。

PriorityBlockingQueue(重要)

优先队列,无界。通过自定义类实现compareTo方法指定排序规则,或者初始化传入Comparator。
因为无界,所以put方法永远不会阻塞。

DelayQueue(特殊)

具有延迟功能。但是放入的元素必须实现Delayed接口。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

delayed接口继承Comparable。因此拥有比较和排序功能。getDelay方法返回“还剩下多长时间的延迟时间才会执行”,返回0或者负数表示任务已经过期。

元素根据延迟时间放入队列不同位置,越靠近队列头代表越早过期!

DelayQueue内部使用PriorityQueue进行排序

阻塞和非阻塞队列的并发安全原理

以ArrayBlockingQueue为例分析阻塞队列线程安全的原理。

ArrayBlockingQueu源码

属性:

// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;


// 以下3个是控制并发用的工具
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

读写都需要先获取ReentrantLock。读操作,队列为空时会进入读线程专属的notEmpty的condition中。

put

public void put(E e) throws InterruptedException {
    //检查插入元素是否为空
    checkNotNull(e);
    //上锁
    final ReentrantLock lock = this.lock;
    //这是可中断的锁
    lock.lockInterruptibly();
    try {
        //检查是否满了count就是队列界
        while (count == items.length)
        //condition
        notFull.await();
        //    插入
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

非阻塞队列ConcurrentLinkedQueue

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

while循环+CAS+native方法(乐观锁)的方式实现线程安全!

阻塞队列的选择

在之前的线程池中,会传入阻塞队列!
参考文章:


线程池对阻塞队列的选择

归纳

  • 功能:比如排序选择PriorityBlockingQueue
  • 容量:容量固定选择ArrayBlockingQueue,无界的LinkedBlockingQueue
  • 扩容
  • 内存结构:链表和数组的对比
  • 性能:LinkedBlockingQueue有两把锁,粒度更细。SynchronousQueue只需要直接传递,性能更高!
Last modification:April 17th, 2020 at 12:56 am