在Java并发编程里阻塞队列是一个比较重要的知识点,大家熟知的线程池也使用到了阻塞队列,下面我就来详解阻塞队列@mikechen
什么是阻塞队列
队列比较好理解,数据结构中我们都接触过,先进先出的一种数据结构,那什么是阻塞队列呢?从名字可以看出阻塞队列其实也就是队列的一种特殊情况。
举个例子来说明一下吧,我们去餐馆吃饭,一个接一个的下单,这时候就是一个普通的队列,万一这家店生意好,餐馆挤满了人,这时候肯定不能把顾客赶出去,于是餐馆就在旁边设置了一个休息等待区,这就是一个阻塞队列了,我们使用一张图来演示一下:
为什么需要阻塞队列
在多线程领域所谓的阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动唤醒。
使用阻塞队列的好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮你一手包办了。
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己取控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
阻塞队列的特点
阻塞队列区别于其他类型的队列的最主要的特点就是“阻塞”这两个字,所以下面重点介绍阻塞功能。
阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。
实现阻塞最重要的两个方法是 take 方法和 put 方法,下面分别谈谈:
take 方法
take 方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。过程如图所示:
put 方法
put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。过程如图所示:
以上过程中的阻塞和解除阻塞,都是 BlockingQueue 完成的,不需要我们自己处理。
是否有界(容量有多大)
此外,阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。
无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,约为 2 的 31 次方,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。
但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。
阻塞队列应用场景
阻塞队列应用最广泛的是生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。
简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。
阻塞队列有哪些?
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列
- DelayBlockingQueue:使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列,只存一个元素
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
加粗的是常用的阻塞队列,下面主要谈谈常用的阻塞队列:
1.ArrayBlockingQueue
ArrayBlockingQueue 是最典型的有界队列,其内部是用数组存储元素的,利用 ReentrantLock 实现线程安全。
在创建它的时候就需要指定它的容量,之后也不可以再扩容了,在构造函数中我们同样可以指定是否是公平的,代码如下
ArrayBlockingQueue(int capacity, boolean fair)
第一个参数是容量,第二个参数是是否公平,正如 ReentrantLock 一样,如果 ArrayBlockingQueue 被设置为非公平的,那么就存在插队的可能。
如果设置为公平的,那么等待了最长时间的线程会被优先处理,其他线程不允许插队,不过这样的公平策略同时会带来一定的性能损耗,因为非公平的吞吐量通常会高于公平的情况。
2.LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表实现的阻塞队列,先进先出,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁(takeLock和putLock)的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,即LinkedBlockingQueue是读写分离的,读写操作可以并行执行。入队时,从队尾入队,由last指针记录;出队时,由队首出队,由head指针记录。
3.SynchronousQueue
SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入。
同理,每次放数据的时候也会阻塞,直到有消费者来取
需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。
由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
阻塞队列核心方法
BlockQueue接口继承自collection接口,的主要实现方法比较多,我们分类来看一下:
方法就这些,根据插入和取出两种类型的操作,具体分为下面几些类型:
- 抛出异常:这时候插入和取出在不能立即被执行的时候就会抛出异常。
- 特殊值:插入和取出在不能被立即执行的情况下会返回一个特殊的值(true 或者 false)
- 阻塞:插入和取出操作在不能被立即执行时会阻塞线程,直到条件成熟,被其他线程唤醒
- 超时:插入和取出操作在不能立即执行的时候会被阻塞一定的时候,如果在指定的时间内没有被执行,那么会返回一个特殊值。
阻塞队列的原理
我们以ArrayBlockingQueue为例,以下源码均来自jdk1.8。还是以变量、构造函数、普通函数的顺序来看:
1、变量
变量的作用基本上就是这样,我们再来接着看构造函数
2、构造函数
上面的这些其实都是为了给其他操作做铺垫。
3、put函数
首先检查是否为空,从这个方法中我们可以看到,首先检查队列是否为空,然后获取锁,判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。当被其他线程唤醒时,通过enqueue(e)方法插入元素,最后解锁。
我们按照这个源码来看,真正实现插入操作的是enqueue,我们跟进去看看:
就几行代码,就是一个正常的移动数组插入的过程,不过最后还要再通知一下队列,插入了元素,此时的队列就不为空了。
4、take元素
还是看源码
take的这个操作根据put反过来看就可以,真正实现的是dequeue,跟进去看看:
取出的时候也是一样,数组少一个元素,数量少一,最后通过队列不为空,其他的就不详述了。
最后我们看看使用,我们举一个生产者消费者的例子。
阻塞队列使用案例
生产者消费者模式的实现方式超级多,比如volatile、CAS、AtomicInteger等等,这次我们就使用阻塞队列来实现一下:
1.生产者:生成数据放入到队列中,供消费者消费。
/** * @Description: 生产者 * @author: mikechen */ public class Producer implements Runnable { private final BlockingQueue queue; public Producer(BlockingQueue q) { this.queue = q; } @Override public void run() { try { while (true) { //将生产的对象放入阻塞队列中,供消费者消费 queue.put(produce()); Thread.sleep(3000); } } catch (Exception e) { e.printStackTrace(); } } /** * 生产方法 * * @return */ public Object produce() { double num = Math.random(); System.out.println(Thread.currentThread().getName() + "生产了随机数 " + num); return num; } }
2.消费者:从队列中获取数据,进行消费
/** * @Description: 消费者 * @author: mikechen */ public class Consumer implements Runnable { private final BlockingQueue queue; public Consumer(BlockingQueue q) { this.queue = q; } @Override public void run() { try { while (true) { //从阻塞队列中取出元素并进行消费 consume(queue.take()); } } catch (Exception e) { e.printStackTrace(); } } /** * 消费方法 * * @param o */ public void consume(Object o) { System.out.println(Thread.currentThread().getName() + "消费者消费了" + o.toString()); } }
3.主函数调用方法
/** * @Description: 使用BlockingQueue实现的简单生产者-消费者模式 * @author: mikechen */ public class Main { public static void main(String[] args) { //阻塞队列 BlockingQueue queue = new LinkedBlockingQueue(); //实例化生产者 Producer producer = new Producer(queue); //实例化消费者1 Consumer consumer1 = new Consumer(queue); //实例化消费者2 Consumer consumer2 = new Consumer(queue); //启动生产者线程 new Thread(producer).start(); //启动消费者1线程 new Thread(consumer1).start(); //启动消费者2线程 new Thread(consumer2).start(); } }
陈睿mikechen
10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!
后台回复【面试】即可获取《史上最全阿里Java面试题总结》,后台回复【架构】,即可获取《阿里架构师进阶专题全部合集》