阻塞队列
它是Java util.concurrent
包下重要的数据结构,BlockingQueue
提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue
实现的。
Java
中的阻塞队列接口BlockingQueue
继承自Queue
接口:
1 | public interface BlockingQueue<E> extends Queue<E> { |
阻塞队列的对元素的增删查操作主要就是上述的这些方法。
阻塞队列的实现类
ArrayBlockingQueue
:ArrayBlockingQueue
是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。而且ArrayBlockingQueue
中的元素存在公平访问与非公平访问的区别DelayQueue
:DelayQueue
对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现java.util.concurrent.Delayed
接口。LinkedBlockingQueue
:LinkedBlockingQueue
内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用Integer.MAX_VALUE
作为上限。PriorityBlockingQueue
:PriorityBlockingQueue
是一个无界的并发队列。它使用了和类java.util.PriorityQueue
一样的排序规则。你无法向这个队列中插入null
值。所有插入到PriorityBlockingQueue
的元素必须实现java.lang.Comparable
接口。因此该队列中元素的排序就取决于你自己的Comparable
实现。SynchronousQueue
:SynchronousQueue
是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。
ArrayBlockingQueue底层分析
ArrayBlockingQueue
内部的阻塞队列是通过重入锁ReenterLock
和Condition
条件队列实现的。
先看下内部成员变量
1 | public class ArrayBlockingQueue<E> extends AbstractQueue<E> |
ArrayBlockingQueue
内部是通过数组对象items
来存储所有的数据,通过一个ReentrantLock
来同时控制添加线程与移除线程的并发访问。对于notEmpty
条件对象则是用于存放等待或唤醒调用take
方法的线程,告诉他们队列已有元素,可以执行获取操作。同理notFull
条件对象是用于等待或唤醒调用put
方法的线程,告诉它们,队列未满,可以执行添加元素的操作。takeIndex
代表的是下一个方法(take,poll,peek,remove)
被调用时获取数组元素的索引,putIndex
则代表下一个方法(put, offer, or add)
被调用时元素添加到数组中的索引。
1 | //add方法实现,间接调用了offer(e) |
LinkedBlockingQueue和ArrayBlockingQueue差异
队列大小有所不同,
ArrayBlockingQueue
是有界的初始化必须指定大小,而LinkedBlockingQueue
可以是有界的也可以是无界的(Integer.MAX_VALUE)
,对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。数据存储容器不同,
ArrayBlockingQueue
采用的是数组作为数据存储容器,而LinkedBlockingQueue
采用的则是以Node
节点作为连接对象的链表。由于
ArrayBlockingQueue
采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue
则会生成一个额外的Node
对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC
可能存在较大影响。两者的实现队列添加或移除的锁不一样,
ArrayBlockingQueue
实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock
锁,而LinkedBlockingQueue
实现的队列中的锁是分离的,其添加采用的是putLock
,移除采用的则是takeLock
,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
消费者生产者模式
生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
阻塞队列的最长使用的例子就是生产者消费者模式,也是各种实现生产者消费者模式方式中首选的方式。但是消费者生产者不一定非要用阻塞队列实现,可以看其实现的几种方式。
参考:消费者生产者模式实现
wait()和notify()方法的实现
这也是最简单最基础的实现,缓冲区满和为空时都调用wait()
方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。
1 | public class Test1 { |
可重入锁ReentrantLock的实现
通过对lock
的lock()
方法和unlock()
方法实现了对锁的显示控制,
1 | import java.util.concurrent.locks.Condition; |
阻塞队列BlockingQueue的实现
使用take()
和put()
方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象。
1 | import java.util.concurrent.ArrayBlockingQueue; |