什么是CyclicBarrier
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier),它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,它也是AQS多线程同步操作的一个具体实现。
CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 和 CountDownLatch 很像,但CyclicBarrier与CountDownLatch最主要的区别就在于,CyclicBarrier的等待是可以循环的,等待完之后还可以重新适用。
CyclicBarrier怎么使用
通过上图我们可以看到,CyclicBarrier主要有两个构造方法,如下:
1.CyclicBarrier的构造函数
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
CyclicBarrier有两个构造函数,第一个只接受一个参数parties,parties 表示需要统一行动的线程个数,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
第二个参数叫做barrierAction,其中barrierAction是一个Runnable,这个参数的意思是,线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
2.CyclicBarrier的核心方法
CyclicBarrier的核心方法是await(),有两种方式:一个是带时间参数的,一个是不带时间参数的,await本质上调用了lock.newCondition().await()方法。
1.await()方法
//非定时等待 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } //定时等待 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
- 线程调用 await() 表示自己已经到达栅栏
- BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
可以看到不管是定时等待还是非定时等待,它们都调用了dowait方法。
2.dowait方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //检查当前栅栏是否被打翻 if (g.broken) { throw new BrokenBarrierException(); } //检查当前线程是否被中断 if (Thread.interrupted()) { //如果当前线程被中断会做以下三件事 //1.打翻当前栅栏 //2.唤醒拦截的所有线程 //3.抛出中断异常 breakBarrier(); throw new InterruptedException(); } //每次都将计数器的值减1 int index = --count; //计数器的值减为0则需唤醒所有线程并转换到下一代 if (index == 0) { boolean ranAction = false; try { //唤醒所有线程前先执行指定的任务 final Runnable command = barrierCommand; if (command != null) { command.run(); } ranAction = true; //唤醒所有线程并转到下一代 nextGeneration(); return 0; } finally { //确保在任务未成功执行时能将所有线程唤醒 if (!ranAction) { breakBarrier(); } } } //如果计数器不为0则执行此循环 for (;;) { try { //根据传入的参数来决定是定时等待还是非定时等待 if (!timed) { trip.await(); }else if (nanos > 0L) { nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作 Thread.currentThread().interrupt(); } } //如果线程因为打翻栅栏操作而被唤醒则抛出异常 if (g.broken) { throw new BrokenBarrierException(); } //如果线程因为换代操作而被唤醒则返回计数器的值 if (g != generation) { return index; } //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
CyclicBarrier的使用源码案例
举个报旅行团旅行的例子,出发时,导游会在机场收了护照和签证,办理集体出境手续,所以,要等大家都到齐才能出发,出发前再把护照和签证发到大家手里。
这就是典型的CyclicBarrier的使用场景,到齐了后再安排任务,源码案例如下:
1.旅行任务
/** * 旅行 * @author mikechen */ public class TravelTask implements Runnable{ private CyclicBarrier cyclicBarrier; private String name; private int arriveTime;//赶到的时间 public TravelTask(CyclicBarrier cyclicBarrier,String name,int arriveTime){ this.cyclicBarrier = cyclicBarrier; this.name = name; this.arriveTime = arriveTime; } @Override public void run() { try { //模拟达到需要花的时间 Thread.sleep(arriveTime * 1000); System.out.println(name +"到达集合点"); cyclicBarrier.await(); System.out.println(name +"开始旅行啦~~"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
2.导游
/** * 导游:负责都到达目的地时,跟游客发放护照和签证 * @author mikechen */ public class TourGuideTask implements Runnable{ @Override public void run() { System.out.println("****导游分发护照签证****"); try { //模拟发护照签证需要2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.客户端
public class Client { public static void main(String[] args) throws Exception{ CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new TourGuideTask()); Executor executor = Executors.newFixedThreadPool(3); executor.execute(new TravelTask(cyclicBarrier,"mikechen01",5)); executor.execute(new TravelTask(cyclicBarrier,"mikechne02",3)); executor.execute(new TravelTask(cyclicBarrier,"mikechen03",1)); }
执行结果如下:
CyclicBarrier的实现原理
而 CyclicBarrier 基于 Condition 来实现的。因为 CyclicBarrier 的源码相对来说简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。
在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理,下面我们先看看CyclicBarrier有哪些成员变量。
//同步操作锁
private final ReentrantLock lock = new ReentrantLock();
//线程拦截器
private final Condition trip = lock.newCondition();
//每次拦截的线程数
private final int parties;
//换代前执行的任务
private final Runnable barrierCommand;
//表示栅栏的当前代
private Generation generation = new Generation();
//计数器
private int count;
//静态内部类Generation
private static class Generation {
boolean broken = false;
}
上面贴出了CyclicBarrier所有的成员变量,可以看到CyclicBarrier内部是通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count,parties表示每次拦截的线程数,该值在构造时进行赋值。
count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。
CyclicBarrier有一个静态内部类Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待。
barrierCommand表示换代前执行的任务,当count减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barrierCommand来执行自己的任务。
我用一图来描绘下 CyclicBarrier 里面的一些概念:
CyclicBarrier和CountDownLatch的区别
- CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
- CountDownLatch.await 一般阻塞工作线程,所有的进行预备工作的线程执行 countDown,而 CyclicBarrier 通过工作线程调用 await 从而自行阻塞,直到所有工作线程达到指定屏障,所有的线程才会返回各自执行自己的工作。
- 在控制多个线程同时运行上,CountDownLatch 可以不限线程数量,而 CyclicBarrier 是固定线程数。
- CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
- CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
mikechen睿哥
mikechen睿哥,十余年BAT架构经验,资深技术专家,就职于阿里、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!
后台回复【面试】即可获取《史上最全阿里Java面试题总结》,后台回复【架构】,即可获取《阿里架构师进阶专题全部合集》