CyclicBarrier原理及用法详解(含使用场景案例)

CyclicBarrier原理及用法详解(含使用场景案例)-mikechen的互联网架构

什么是CyclicBarrier

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier),它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,它也是AQS多线程同步操作的一个具体实现。

CyclicBarrier原理及用法详解(含使用场景案例)-mikechen的互联网架构

CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 和 CountDownLatch 很像,但CyclicBarrier与CountDownLatch最主要的区别就在于,CyclicBarrier的等待是可以循环的,等待完之后还可以重新适用。

 

CyclicBarrier怎么使用

CyclicBarrier原理及用法详解(含使用场景案例)-mikechen的互联网架构

通过上图我们可以看到,CyclicBarrier主要有两个构造方法,如下:

1.CyclicBarrier的构造函数

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

CyclicBarrier有两个构造函数,第一个只接受一个参数parties,parties 表示需要统一行动的线程个数,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

第二个参数叫做barrierAction,其中barrierAction是一个Runnable,这个参数的意思是,线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

2.CyclicBarrier的核心方法

CyclicBarrier的核心方法是await(),有两种方式:一个是带时间参数的,一个是不带时间参数的,await本质上调用了lock.newCondition().await()方法。

1.await()方法

//非定时等待
public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}

//定时等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  return dowait(true, unit.toNanos(timeout));
}
  • 线程调用 await() 表示自己已经到达栅栏
  • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时

可以看到不管是定时等待还是非定时等待,它们都调用了dowait方法。

2.dowait方法

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    final Generation g = generation;
    //检查当前栅栏是否被打翻
    if (g.broken) {
      throw new BrokenBarrierException();
    }
    //检查当前线程是否被中断
    if (Thread.interrupted()) {
      //如果当前线程被中断会做以下三件事
      //1.打翻当前栅栏
      //2.唤醒拦截的所有线程
      //3.抛出中断异常
      breakBarrier();
      throw new InterruptedException();
    }
    //每次都将计数器的值减1
    int index = --count;
    //计数器的值减为0则需唤醒所有线程并转换到下一代
    if (index == 0) {
      boolean ranAction = false;
      try {
        //唤醒所有线程前先执行指定的任务
        final Runnable command = barrierCommand;
        if (command != null) {
          command.run();
        }
        ranAction = true;
        //唤醒所有线程并转到下一代
        nextGeneration();
        return 0;
      } finally {
        //确保在任务未成功执行时能将所有线程唤醒
        if (!ranAction) {
          breakBarrier();
        }
      }
    }

    //如果计数器不为0则执行此循环
    for (;;) {
      try {
        //根据传入的参数来决定是定时等待还是非定时等待
        if (!timed) {
          trip.await();
        }else if (nanos > 0L) {
          nanos = trip.awaitNanos(nanos);
        }
      } catch (InterruptedException ie) {
        //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程
        if (g == generation && ! g.broken) {
          breakBarrier();
          throw ie;
        } else {
          //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作
          Thread.currentThread().interrupt();
        }
      }
      //如果线程因为打翻栅栏操作而被唤醒则抛出异常
      if (g.broken) {
        throw new BrokenBarrierException();
      }
      //如果线程因为换代操作而被唤醒则返回计数器的值
      if (g != generation) {
        return index;
      }
      //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
      if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
      }
    }
  } finally {
    lock.unlock();
  }
}

 

CyclicBarrier的使用源码案例

举个报旅行团旅行的例子,出发时,导游会在机场收了护照和签证,办理集体出境手续,所以,要等大家都到齐才能出发,出发前再把护照和签证发到大家手里。

这就是典型的CyclicBarrier的使用场景,到齐了后再安排任务,源码案例如下:

1.旅行任务

/**
 * 旅行
 * @author mikechen
 */
public class TravelTask implements Runnable{

    private CyclicBarrier cyclicBarrier;
    private String name;
    private int arriveTime;//赶到的时间

    public TravelTask(CyclicBarrier cyclicBarrier,String name,int arriveTime){
        this.cyclicBarrier = cyclicBarrier;
        this.name = name;
        this.arriveTime = arriveTime;
    }

    @Override
    public void run() {
        try {
            //模拟达到需要花的时间
            Thread.sleep(arriveTime * 1000);
            System.out.println(name +"到达集合点");
            cyclicBarrier.await();
            System.out.println(name +"开始旅行啦~~");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

2.导游

/**
 * 导游:负责都到达目的地时,跟游客发放护照和签证
 * @author mikechen
 */
public class TourGuideTask implements Runnable{

    @Override
    public void run() {
        System.out.println("****导游分发护照签证****");
        try {
            //模拟发护照签证需要2秒
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3.客户端

public class Client {

    public static void main(String[] args) throws Exception{

        CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new TourGuideTask());
        Executor executor = Executors.newFixedThreadPool(3);
  
        executor.execute(new TravelTask(cyclicBarrier,"mikechen01",5));
        executor.execute(new TravelTask(cyclicBarrier,"mikechne02",3));
        executor.execute(new TravelTask(cyclicBarrier,"mikechen03",1));
    }

执行结果如下:

CyclicBarrier原理及用法详解(含使用场景案例)-mikechen的互联网架构

CyclicBarrier的实现原理

而 CyclicBarrier 基于 Condition 来实现的。因为 CyclicBarrier 的源码相对来说简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。

在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理,下面我们先看看CyclicBarrier有哪些成员变量。

//同步操作锁
private final ReentrantLock lock = new ReentrantLock();
//线程拦截器
private final Condition trip = lock.newCondition();
//每次拦截的线程数
private final int parties;
//换代前执行的任务
private final Runnable barrierCommand;
//表示栅栏的当前代
private Generation generation = new Generation();
//计数器
private int count;

//静态内部类Generation
private static class Generation {
  boolean broken = false;
}

上面贴出了CyclicBarrier所有的成员变量,可以看到CyclicBarrier内部是通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count,parties表示每次拦截的线程数,该值在构造时进行赋值。

count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。

CyclicBarrier有一个静态内部类Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待。

barrierCommand表示换代前执行的任务,当count减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barrierCommand来执行自己的任务。

我用一图来描绘下 CyclicBarrier 里面的一些概念:

CyclicBarrier原理及用法详解(含使用场景案例)-mikechen的互联网架构

 

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
  • CountDownLatch.await 一般阻塞工作线程,所有的进行预备工作的线程执行 countDown,而 CyclicBarrier 通过工作线程调用 await 从而自行阻塞,直到所有工作线程达到指定屏障,所有的线程才会返回各自执行自己的工作。
  • 在控制多个线程同时运行上,CountDownLatch 可以不限线程数量,而 CyclicBarrier 是固定线程数。
  • CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
  • CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。

以上!

关注「mikechen的互联网架构」公众号,第一时间获知最新架构技术干货!

在公众号菜单栏对话框回复【合集】,即可获取mikechen原创《阿里架构师进阶从0到1全部合集》,助力大家2023高薪进阶。

评论交流
    说说你的看法