
什么是CyclicBarrier
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier),它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,它也是AQS多线程同步操作的一个具体实现。

CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 和 CountDownLatch 很像,但CyclicBarrier与CountDownLatch最主要的区别就在于,CyclicBarrier的等待是可以循环的,等待完之后还可以重新适用。
CyclicBarrier怎么使用

通过上图我们可以看到,CyclicBarrier主要有两个构造方法,如下:
1.CyclicBarrier的构造函数
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
CyclicBarrier有两个构造函数,第一个只接受一个参数parties,parties 表示需要统一行动的线程个数,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
第二个参数叫做barrierAction,其中barrierAction是一个Runnable,这个参数的意思是,线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
2.CyclicBarrier的核心方法
CyclicBarrier的核心方法是await(),有两种方式:一个是带时间参数的,一个是不带时间参数的,await本质上调用了lock.newCondition().await()方法。
1.await()方法
//非定时等待
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
//定时等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
- 线程调用 await() 表示自己已经到达栅栏
- BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
可以看到不管是定时等待还是非定时等待,它们都调用了dowait方法。
2.dowait方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//检查当前栅栏是否被打翻
if (g.broken) {
throw new BrokenBarrierException();
}
//检查当前线程是否被中断
if (Thread.interrupted()) {
//如果当前线程被中断会做以下三件事
//1.打翻当前栅栏
//2.唤醒拦截的所有线程
//3.抛出中断异常
breakBarrier();
throw new InterruptedException();
}
//每次都将计数器的值减1
int index = --count;
//计数器的值减为0则需唤醒所有线程并转换到下一代
if (index == 0) {
boolean ranAction = false;
try {
//唤醒所有线程前先执行指定的任务
final Runnable command = barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
//唤醒所有线程并转到下一代
nextGeneration();
return 0;
} finally {
//确保在任务未成功执行时能将所有线程唤醒
if (!ranAction) {
breakBarrier();
}
}
}
//如果计数器不为0则执行此循环
for (;;) {
try {
//根据传入的参数来决定是定时等待还是非定时等待
if (!timed) {
trip.await();
}else if (nanos > 0L) {
nanos = trip.awaitNanos(nanos);
}
} catch (InterruptedException ie) {
//若当前线程在等待期间被中断则打翻栅栏唤醒其他线程
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作
Thread.currentThread().interrupt();
}
}
//如果线程因为打翻栅栏操作而被唤醒则抛出异常
if (g.broken) {
throw new BrokenBarrierException();
}
//如果线程因为换代操作而被唤醒则返回计数器的值
if (g != generation) {
return index;
}
//如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
CyclicBarrier的使用源码案例
举个报旅行团旅行的例子,出发时,导游会在机场收了护照和签证,办理集体出境手续,所以,要等大家都到齐才能出发,出发前再把护照和签证发到大家手里。
这就是典型的CyclicBarrier的使用场景,到齐了后再安排任务,源码案例如下:
1.旅行任务
/**
* 旅行
* @author mikechen
*/
public class TravelTask implements Runnable{
private CyclicBarrier cyclicBarrier;
private String name;
private int arriveTime;//赶到的时间
public TravelTask(CyclicBarrier cyclicBarrier,String name,int arriveTime){
this.cyclicBarrier = cyclicBarrier;
this.name = name;
this.arriveTime = arriveTime;
}
@Override
public void run() {
try {
//模拟达到需要花的时间
Thread.sleep(arriveTime * 1000);
System.out.println(name +"到达集合点");
cyclicBarrier.await();
System.out.println(name +"开始旅行啦~~");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
2.导游
/**
* 导游:负责都到达目的地时,跟游客发放护照和签证
* @author mikechen
*/
public class TourGuideTask implements Runnable{
@Override
public void run() {
System.out.println("****导游分发护照签证****");
try {
//模拟发护照签证需要2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.客户端
public class Client {
public static void main(String[] args) throws Exception{
CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new TourGuideTask());
Executor executor = Executors.newFixedThreadPool(3);
executor.execute(new TravelTask(cyclicBarrier,"mikechen01",5));
executor.execute(new TravelTask(cyclicBarrier,"mikechne02",3));
executor.execute(new TravelTask(cyclicBarrier,"mikechen03",1));
}
执行结果如下:

CyclicBarrier的实现原理
而 CyclicBarrier 基于 Condition 来实现的。因为 CyclicBarrier 的源码相对来说简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。
在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理,下面我们先看看CyclicBarrier有哪些成员变量。
//同步操作锁
private final ReentrantLock lock = new ReentrantLock();
//线程拦截器
private final Condition trip = lock.newCondition();
//每次拦截的线程数
private final int parties;
//换代前执行的任务
private final Runnable barrierCommand;
//表示栅栏的当前代
private Generation generation = new Generation();
//计数器
private int count;
//静态内部类Generation
private static class Generation {
boolean broken = false;
}
上面贴出了CyclicBarrier所有的成员变量,可以看到CyclicBarrier内部是通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count,parties表示每次拦截的线程数,该值在构造时进行赋值。
count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。
CyclicBarrier有一个静态内部类Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待。
barrierCommand表示换代前执行的任务,当count减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barrierCommand来执行自己的任务。
我用一图来描绘下 CyclicBarrier 里面的一些概念:

CyclicBarrier和CountDownLatch的区别
- CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
- CountDownLatch.await 一般阻塞工作线程,所有的进行预备工作的线程执行 countDown,而 CyclicBarrier 通过工作线程调用 await 从而自行阻塞,直到所有工作线程达到指定屏障,所有的线程才会返回各自执行自己的工作。
- 在控制多个线程同时运行上,CountDownLatch 可以不限线程数量,而 CyclicBarrier 是固定线程数。
- CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
- CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
mikechen睿哥
10年+大厂架构经验,资深技术专家,就职于阿里巴巴、淘宝、百度等一线互联网大厂。