阻塞队列以及生产者消费者的实现

阻塞队列

它是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

Java中的阻塞队列接口BlockingQueue继承自Queue接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public interface BlockingQueue<E> extends Queue<E> {
//将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
//在成功时返回 true,如果此队列已满,则抛IllegalStateException。
boolean add(E e);
//将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
// 将指定的元素插入此队列的尾部,如果该队列已满,
//则在到达指定的等待时间之前等待可用的空间,该方法可中断
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//将指定的元素插入此队列的尾部,如果该队列已满,则一直等到(阻塞)。
void put(E e) throws InterruptedException;
//获取并移除此队列的头部,如果没有元素则等待(阻塞),
//直到有元素将唤醒等待线程执行该操作
E take() throws InterruptedException;
//获取并移除此队列的头部,在指定的等待时间前一直等到获取元素, //超过时间方法将结束
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//从此队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o);
}
//除了上述方法还有继承自Queue接口的方法
//获取但不移除此队列的头元素,没有则跑异常NoSuchElementException
E element();

//获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek();

//获取并移除此队列的头,如果此队列为空,则返回 null。
E poll();

上述方法呢,对应的操作如果没有办法执行,也会报不一样的错误。主要有抛异常,特定值,阻塞,超时这四种,可以看下面的介绍
/*
插入方法:
add(E e) : 添加成功返回true,失败抛IllegalStateException异常
offer(E e) : 成功返回 true,如果此队列已满,则返回 false。
put(E e) :将元素插入此队列的尾部,如果该队列已满,则一直阻塞

删除方法:
remove(Object o) :移除指定元素,成功返回true,失败返回false
poll() : 获取并移除此队列的头元素,若队列为空,则返回 null
take():获取并移除此队列头元素,若没有元素则一直阻塞。

检查方法
element() :获取但不移除此队列的头元素,没有元素则抛异常
peek() :获取但不移除此队列的头;若队列为空,则返回 null。
*/

阻塞队列的对元素的增删查操作主要就是上述的这些方法。

阻塞队列的实现类

  • ArrayBlockingQueueArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。而且ArrayBlockingQueue中的元素存在公平访问与非公平访问的区别
  • DelayQueueDelayQueue对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed接口。
  • LinkedBlockingQueueLinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
  • PriorityBlockingQueuePriorityBlockingQueue是一个无界的并发队列。它使用了和类 java.util.PriorityQueue一样的排序规则。你无法向这个队列中插入 null值。所有插入到 PriorityBlockingQueue的元素必须实现java.lang.Comparable接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。
  • SynchronousQueueSynchronousQueue是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

ArrayBlockingQueue底层分析

ArrayBlockingQueue内部的阻塞队列是通过重入锁ReenterLockCondition条件队列实现的。

先看下内部成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存储数据的数组 */
final Object[] items;

/**获取数据的索引,主要用于take,poll,peek,remove方法 */
int takeIndex;

/**添加数据的索引,主要用于 put, offer, or add 方法*/
int putIndex;

/** 队列元素的个数 */
int count;

/** 控制并非访问的锁 */
final ReentrantLock lock;

/**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作 */
private final Condition notEmpty;

/**notFull条件对象,用于通知put方法队列未满,可执行添加操作 */
private final Condition notFull;

// 迭代器
transient Itrs itrs = null;
}

ArrayBlockingQueue内部是通过数组对象items来存储所有的数据,通过一个ReentrantLock来同时控制添加线程与移除线程的并发访问。对于notEmpty条件对象则是用于存放等待或唤醒调用take方法的线程,告诉他们队列已有元素,可以执行获取操作。同理notFull条件对象是用于等待或唤醒调用put方法的线程,告诉它们,队列未满,可以执行添加元素的操作。takeIndex代表的是下一个方法(take,poll,peek,remove)被调用时获取数组元素的索引,putIndex则代表下一个方法(put, offer, or add)被调用时元素添加到数组中的索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//add方法实现,间接调用了offer(e)
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

//offer方法
public boolean offer(E e) {
checkNotNull(e);//检查元素是否为null
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
if (count == items.length)//判断队列是否满
return false;
else {
enqueue(e);//添加元素到队列
return true;
}
} finally {
lock.unlock();
}
}
//入队操作
private void enqueue(E x) {
//获取当前数组
final Object[] items = this.items;
//通过putIndex索引对数组进行赋值
items[putIndex] = x;
//索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;//队列中元素数量加1
//唤醒调用take()方法的线程,执行元素获取操作。
notEmpty.signal();
}
//put方法,阻塞时可中断,put方法是一个阻塞的方法,如果队列元素已满,那么当前线程将会被notFull条件对象挂起加到等待队列中,直到队列有空档才会唤醒执行添加操作。但如果队列没有满,那么就直接调用enqueue(e)方法将元素加入到数组队列中。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//该方法可中断
try {
//当队列元素个数与数组长度相等时,无法添加元素
while (count == items.length)
//将当前调用线程挂起,添加到notFull条件队列中等待唤醒
notFull.await();
enqueue(e);//如果队列没有满直接添加。。
} finally {
lock.unlock();
}
}
// 其余方法分析,可以看 https://blog.csdn.net/javazejian/article/details/77410889

LinkedBlockingQueue和ArrayBlockingQueue差异

  • 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。

  • 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。

  • 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。

  • 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

消费者生产者模式

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

阻塞队列的最长使用的例子就是生产者消费者模式,也是各种实现生产者消费者模式方式中首选的方式。但是消费者生产者不一定非要用阻塞队列实现,可以看其实现的几种方式。

参考:消费者生产者模式实现

wait()和notify()方法的实现

这也是最简单最基础的实现,缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class Test1 {
private static Integer count = 0;
private static final Integer FULL = 10;
private static String LOCK = "lock";

public static void main(String[] args) {
Test1 test1 = new Test1();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (LOCK) {
while (count == FULL) {
try {
LOCK.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
count++;
System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
LOCK.notifyAll();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (LOCK) {
while (count == 0) {
try {
LOCK.wait();
} catch (Exception e) {
}
}
count--;
System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count);
LOCK.notifyAll();
}
}
}
}
}

可重入锁ReentrantLock的实现

通过对locklock()方法和unlock()方法实现了对锁的显示控制,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Test2 {
private static Integer count = 0;
private static final Integer FULL = 10;
//创建一个锁对象
private Lock lock = new ReentrantLock();
//创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
Test2 test2 = new Test2();
new Thread(test2.new Producer()).start();
new Thread(test2.new Consumer()).start();
new Thread(test2.new Producer()).start();
new Thread(test2.new Consumer()).start();
new Thread(test2.new Producer()).start();
new Thread(test2.new Consumer()).start();
new Thread(test2.new Producer()).start();
new Thread(test2.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
//获取锁
lock.lock();
try {
while (count == FULL) {
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count++;
System.out.println(Thread.currentThread().getName()
+ "生产者生产,目前总共有" + count);
//唤醒消费者
notEmpty.signal();
} finally {
//释放锁
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
lock.lock();
try {
while (count == 0) {
try {
notEmpty.await();
} catch (Exception e) {
e.printStackTrace();
}
}
count--;
System.out.println(Thread.currentThread().getName()
+ "消费者消费,目前总共有" + count);
notFull.signal();
} finally {
lock.unlock();
}
}
}
}
}

阻塞队列BlockingQueue的实现

使用take()put()方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Test3 {
private static Integer count = 0;
//创建一个阻塞队列
final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
Test3 test3 = new Test3();
new Thread(test3.new Producer()).start();
new Thread(test3.new Consumer()).start();
new Thread(test3.new Producer()).start();
new Thread(test3.new Consumer()).start();
new Thread(test3.new Producer()).start();
new Thread(test3.new Consumer()).start();
new Thread(test3.new Producer()).start();
new Thread(test3.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
try {
blockingQueue.put(1);
count++;
System.out.println(Thread.currentThread().getName()
+ "生产者生产,目前总共有" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
blockingQueue.take();
count--;
System.out.println(Thread.currentThread().getName()
+ "消费者消费,目前总共有" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}