阻塞队列BlockingQueue(7大阻塞队列及原理详解)

阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen

Java并发编程里阻塞队列是一个比较重要的知识点,大家熟知的线程池也使用到了阻塞队列,下面我就来详解阻塞队列@mikechen

什么是阻塞队列

队列比较好理解,数据结构中我们都接触过,先进先出的一种数据结构,那什么是阻塞队列呢?从名字可以看出阻塞队列其实也就是队列的一种特殊情况。

举个例子来说明一下吧,我们去餐馆吃饭,一个接一个的下单,这时候就是一个普通的队列,万一这家店生意好,餐馆挤满了人,这时候肯定不能把顾客赶出去,于是餐馆就在旁边设置了一个休息等待区,这就是一个阻塞队列了,我们使用一张图来演示一下:
阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen

 

为什么需要阻塞队列

在多线程领域所谓的阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动唤醒。

使用阻塞队列的好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮你一手包办了。

在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己取控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

 

阻塞队列的特点

阻塞队列区别于其他类型的队列的最主要的特点就是“阻塞”这两个字,所以下面重点介绍阻塞功能。

阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。

实现阻塞最重要的两个方法是 take 方法和 put 方法,下面分别谈谈:

take 方法

take 方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。过程如图所示:

阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen

put 方法

put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。过程如图所示:

阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen

以上过程中的阻塞和解除阻塞,都是 BlockingQueue 完成的,不需要我们自己处理。

是否有界(容量有多大)

此外,阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。

无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,约为 2 的 31 次方,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。

但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。

阻塞队列应用场景

阻塞队列应用最广泛的是生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。

阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen

简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。

 

阻塞队列有哪些?

  1. ArrayBlockingQueue:由数组结构组成的有界阻塞队列
  2. LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
  3. PriorityBlockingQueue:支持优先级排序的无界阻塞队列
  4. DelayBlockingQueue:使用优先级队列实现的延迟无界阻塞队列
  5. SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列,只存一个元素
  6. LinkedTransferQueue:由链表结构组成的无界阻塞队列
  7. LinkedBlockingDeque:由链表结构组成的双向阻塞队列

加粗的是常用的阻塞队列,下面主要谈谈常用的阻塞队列:

1.ArrayBlockingQueue

ArrayBlockingQueue 是最典型的有界队列,其内部是用数组存储元素的,利用 ReentrantLock 实现线程安全。

在创建它的时候就需要指定它的容量,之后也不可以再扩容了,在构造函数中我们同样可以指定是否是公平的,代码如下

ArrayBlockingQueue(int capacity, boolean fair)

第一个参数是容量,第二个参数是是否公平,正如 ReentrantLock 一样,如果 ArrayBlockingQueue 被设置为非公平的,那么就存在插队的可能。

如果设置为公平的,那么等待了最长时间的线程会被优先处理,其他线程不允许插队,不过这样的公平策略同时会带来一定的性能损耗,因为非公平的吞吐量通常会高于公平的情况。

2.LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的阻塞队列,先进先出,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁(takeLock和putLock)的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,即LinkedBlockingQueue是读写分离的,读写操作可以并行执行。入队时,从队尾入队,由last指针记录;出队时,由队首出队,由head指针记录。

 

3.SynchronousQueue

SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入。

同理,每次放数据的时候也会阻塞,直到有消费者来取

需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。

由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

 

阻塞队列核心方法

BlockQueue接口继承自collection接口,的主要实现方法比较多,我们分类来看一下:
阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen
方法就这些,根据插入和取出两种类型的操作,具体分为下面几些类型:
阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen

  • 抛出异常:这时候插入和取出在不能立即被执行的时候就会抛出异常。
  • 特殊值:插入和取出在不能被立即执行的情况下会返回一个特殊的值(true 或者 false)
  • 阻塞:插入和取出操作在不能被立即执行时会阻塞线程,直到条件成熟,被其他线程唤醒
  • 超时:插入和取出操作在不能立即执行的时候会被阻塞一定的时候,如果在指定的时间内没有被执行,那么会返回一个特殊值。

 

阻塞队列的原理

我们以ArrayBlockingQueue为例,以下源码均来自jdk1.8。还是以变量、构造函数、普通函数的顺序来看:

1、变量
阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen
变量的作用基本上就是这样,我们再来接着看构造函数

2、构造函数
阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen
上面的这些其实都是为了给其他操作做铺垫。

3、put函数
阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen
首先检查是否为空,从这个方法中我们可以看到,首先检查队列是否为空,然后获取锁,判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。当被其他线程唤醒时,通过enqueue(e)方法插入元素,最后解锁。

我们按照这个源码来看,真正实现插入操作的是enqueue,我们跟进去看看:
阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen
就几行代码,就是一个正常的移动数组插入的过程,不过最后还要再通知一下队列,插入了元素,此时的队列就不为空了。

4、take元素

还是看源码
阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen
take的这个操作根据put反过来看就可以,真正实现的是dequeue,跟进去看看:
阻塞队列BlockingQueue(7大阻塞队列及原理详解)-mikechen
取出的时候也是一样,数组少一个元素,数量少一,最后通过队列不为空,其他的就不详述了。

最后我们看看使用,我们举一个生产者消费者的例子。

阻塞队列使用案例

生产者消费者模式的实现方式超级多,比如volatile、CAS、AtomicInteger等等,这次我们就使用阻塞队列来实现一下:

1.生产者:生成数据放入到队列中,供消费者消费。

/**
 * @Description: 生产者
 * @author: mikechen
 */
public class Producer implements Runnable {

    private final BlockingQueue queue;

    public Producer(BlockingQueue q) {
        this.queue = q;
    }

    @Override
    public void run() {
        try {
            while (true) {
                //将生产的对象放入阻塞队列中,供消费者消费
                queue.put(produce());
                Thread.sleep(3000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 生产方法
     *
     * @return
     */
    public Object produce() {
        double num = Math.random();
        System.out.println(Thread.currentThread().getName() + "生产了随机数 " + num);
        return num;
    }
}

 

2.消费者:从队列中获取数据,进行消费

/**
 * @Description: 消费者
 * @author: mikechen
 */
public class Consumer implements Runnable {

    private final BlockingQueue queue;

    public Consumer(BlockingQueue q) {
        this.queue = q;
    }

    @Override
    public void run() {
        try {
            while (true) {
                //从阻塞队列中取出元素并进行消费
                consume(queue.take());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 消费方法
     *
     * @param o
     */
    public void consume(Object o) {
        System.out.println(Thread.currentThread().getName() + "消费者消费了" + o.toString());
    }
}

3.主函数调用方法

/**
 * @Description: 使用BlockingQueue实现的简单生产者-消费者模式
 * @author: mikechen
 */
public class Main {
    public static void main(String[] args) {
        //阻塞队列
        BlockingQueue queue = new LinkedBlockingQueue();
        //实例化生产者
        Producer producer = new Producer(queue);
        //实例化消费者1
        Consumer consumer1 = new Consumer(queue);
        //实例化消费者2
        Consumer consumer2 = new Consumer(queue);
        //启动生产者线程
        new Thread(producer).start();
        //启动消费者1线程
        new Thread(consumer1).start();
        //启动消费者2线程
        new Thread(consumer2).start();

    }
}

作者简介

陈睿|mikechen,10年+大厂架构经验,BAT资深面试官,就职于阿里巴巴、淘宝、百度等一线互联网大厂。

👇阅读更多mikechen架构文章👇

阿里架构 |双11秒杀 |分布式架构 |负载均衡 |单点登录 |微服务 |云原生 |高并发 |架构师

以上

关注作者「mikechen」公众号,获取更多技术干货!

后台回复架构,即可获取《阿里架构师进阶专题全部合集》,后台回复面试即可获取《史上最全阿里Java面试题总结

评论交流
    说说你的看法