Semaphore使用与实现原理详解(看这篇就够了)

Semaphore使用与实现原理详解(看这篇就够了)-mikechen

Java并发编程提供了并发工具类,Semaphore就是其中典型的并发代表,今天重点详解Semaphore使用以及内部实现机制。

什么是Semaphore

Semaphore(信号量),是JUC包下的一个工具类,我们可以通过其限制执行的线程数量,达到限流的效果。

当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。

 

Semaphore的使用场景

Semaphore可以用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。

假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有10个,这时就必须控制最多只有10个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制。

  1. ublic class SemaphoreTest {
  2. private static final int COUNT = 40;
  3. private static Executor executor = Executors.newFixedThreadPool(COUNT);
  4. private static Semaphore semaphore = new Semaphore(10);
  5. public static void main(String[] args) {
  6. for (int i=0; i< COUNT; i++) {
  7. executor.execute(new ThreadTest.Task());
  8. }
  9. }
  10.  
  11. static class Task implements Runnable {
  12. @Override
  13. public void run() {
  14. try {
  15. //读取文件操作
  16. semaphore.acquire();
  17. // 存数据过程
  18. semaphore.release();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. } finally {
  22. }
  23. }
  24. }
  25. }

 

Semaphore基本使用

Semaphore的使用也是比较简单的,我们创建一个Runnable的子类,如下:

  1. private static class MyRunnable implements Runnable {
  2. // 成员属性 Semaphore对象
  3. private final Semaphore semaphore;
  4. public MyRunnable(Semaphore semaphore) {
  5. this.semaphore = semaphore;
  6. }
  7. public void run() {
  8. String threadName = Thread.currentThread().getName();
  9. // 获取许可
  10. boolean acquire = semaphore.tryAcquire();
  11. // 未获取到许可 结束
  12. if (!acquire) {
  13. System.out.println("线程【" + threadName + "】未获取到许可,结束");
  14. return;
  15. }
  16. // 获取到许可
  17. try {
  18. System.out.println("线程【" + threadName + "】获取到许可");
  19. Thread.sleep(1000);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. } finally {
  23. // 释放许可
  24. semaphore.release();
  25. System.out.println("线程【" + threadName + "】释放许可");
  26. }
  27. }
  28. }

测试方法如下:

  1. public static void main(String[] args) {
  2. Semaphore semaphore = new Semaphore(2);
  3. for (int i = 0; i <= 10; i ++) {
  4. MyRunnable runnable = new MyRunnable(semaphore);
  5. Thread thread = new Thread(runnable, "Thread-" + i);
  6. thread.start();
  7. }
  8. }

执行结果如下

Semaphore使用与实现原理详解(看这篇就够了)-mikechen

Semaphore实现原理

Semaphore的类图如下图所示:

Semaphore使用与实现原理详解(看这篇就够了)-mikechen

Sync是Semaphore的一个内部类,该类继承AQS,这个类又有公平和非公平的两个子类,这个内置的同步器实现Semaphore的功能。

Semaphore构造方法

在Semaphore中提供了两个构造方法,如下:

  1. // 指定许可数量
  2. public Semaphore(int permits) {
  3. // sync属性赋值 默认未非公平实现
  4. sync = new NonfairSync(permits);
  5. }
  6. // 指定许可数量和是否公平实现
  7. public Semaphore(int permits, boolean fair) {
  8. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  9. }

内置同步器的构造方法如下:

  1. FairSync(int permits) {
  2. super(permits);
  3. }
  4. NonfairSync(int permits) {
  5. super(permits);
  6. }
  7. Sync(int permits) {
  8. // 用state属性记录许可量
  9. setState(permits);
  10. }

这些构造方法的逻辑是比较简单的哈,相信大家还记得在AQS中有一个state属性,当创建Semaphore时会将传递过来的许可量设置到同步器的state值,并将创建的同步器对象赋值给Semaphore中的sync属性。

获取许可

在Semaphore中提供了如下的获取许可的方法

  • void acquire() throws InterruptedException 获取一个许可,会阻塞等待其他线程释放许可
  • void acquire(int permits) throws InterruptedException 获取指定的许可数 ,会阻塞等待其他线程释放
  • void acquireUninterruptibly() 获取一个许可 会阻塞等待其他线程释放许可 可被中断
  • void acquireUninterruptibly(int permits) 获取指定的许可数 会阻塞等待其他线程释放许可 可被中断
  • boolean tryAcquire() 尝试获取许可 不会进行阻塞等待
  • boolean tryAcquire(int permits) 尝试获取指定的许可数 不会阻塞等待
  • boolean tryAcquire(long timeout, TimeUnit unit) 尝试获取许可 可指定等待时间
  • boolean tryAcquire(int permits, long timeout, TimeUnit unit) 尝试获取指定的许可数 可指定等待时间

由于篇幅有限,我们这里不全部介绍了,我们只介绍下acquire()和tryAcquire()两个方法,其他的方法实现大家自行查看吧,相差也不是很大。

 tryAcquire方法

这个方法的返回值表示是否获取许可成功,不会阻塞等待其他线程释放许可,没有许可了会直接返回false,其源码如下:

  1. public boolean tryAcquire() {
  2. // 调用Semaphore.Sync中的nonfairTryAcquireShared方法
  3. return sync.nonfairTryAcquireShared(1) >= 0;
  4. }
  5. final int nonfairTryAcquireShared(int acquires) {
  6. // 自旋
  7. for (;;) {
  8. // 获取剩余的许可量
  9. int available = getState();
  10. // 扣减需要的信号量后的值
  11. int remaining = available - acquires;
  12. // 信号量不足 或者CAS替换state失败 返回扣减后的信号量值
  13. if (remaining < 0 ||
  14. compareAndSetState(available, remaining))
  15. return remaining;
  16. }
  17. }

这个方法的逻辑比较简单,判断许可量是否充足,充足的话CAS修改state的值。判断分配所需数量后的值是否大于等于0。

 acquire方法

这个方法没有返回值,当许可不足时会阻塞线程等待其他线程释放许可,其源码如下:

  1. public void acquire() throws InterruptedException {
  2. // 调用AQS中的方法
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. // AQS中的方法
  6. public final void acquireSharedInterruptibly(int arg)
  7. throws InterruptedException {
  8. if (Thread.interrupted())
  9. throw new InterruptedException();
  10. // tryAcquireShared是个模板方法,需要子类去实现
  11. if (tryAcquireShared(arg) < 0)
  12. doAcquireSharedInterruptibly(arg);
  13. }

在acquire的方法中,会调用到AQS中的acquireSharedInterruptibly方法,在这个方法中用到了模板方法模式,tryAcquireShared方法是一个模板方法,需要子类去实现。接下来我们分别看看在Semaphore中的公平和非公平模式都是如何实现的。

公平模式下的tryAcquireShared方法实现如下:

  1. protected int tryAcquireShared(int acquires) {
  2. // 自旋
  3. for (;;) {
  4. // 判断是否已经存在阻塞的线程
  5. if (hasQueuedPredecessors())
  6. return -1;
  7. int available = getState();
  8. int remaining = available - acquires;
  9. if (remaining < 0 ||
  10. compareAndSetState(available, remaining))
  11. return remaining;
  12. }
  13. }

非公平模式下的tryAcquireShared方法实现如下:

  1. protected int tryAcquireShared(int acquires) {
  2. return nonfairTryAcquireShared(acquires);
  3. }
  4. final int nonfairTryAcquireShared(int acquires) {
  5. // 自旋
  6. for (;;) {
  7. // 获取剩余的许可量
  8. int available = getState();
  9. // 扣减需要的信号量后的值
  10. int remaining = available - acquires;
  11. // 信号量不足 或者CAS替换state失败 返回扣减后的信号量值
  12. if (remaining < 0 ||
  13. compareAndSetState(available, remaining))
  14. return remaining;
  15. }
  16. }

公平和非公平模式的实现的区别是在公平模式的实现中会先判断是否已经存在阻塞的线程了,存在的话不会再去竞争获取许可了。

AQS.doAcquireSharedInterruptibly方法的逻辑如下:

  1. private void doAcquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. // 添加到AQS的阻塞队列中
  4. final Node node = addWaiter(Node.SHARED);
  5. boolean failed = true;
  6. try {
  7. for (;;) {
  8. // 获取节点的前置节点
  9. final Node p = node.predecessor();
  10. // 前置节点是头节点
  11. if (p == head) {
  12. // 尝试获取许可
  13. int r = tryAcquireShared(arg);
  14. if (r >= 0) {
  15. setHeadAndPropagate(node, r);
  16. p.next = null; // help GC
  17. failed = false;
  18. return;
  19. }
  20. }
  21. // 清楚无用的节点并挂起当前线程
  22. if (shouldParkAfterFailedAcquire(p, node) &&
  23. parkAndCheckInterrupt())
  24. throw new InterruptedException();
  25. }
  26. } finally {
  27. if (failed)
  28. // 获取失败移除节点
  29. cancelAcquire(node);
  30. }
  31. }

释放许可

释放许可的方法为release方法,其源码如下:

  1. public void release() {
  2. sync.releaseShared(1);
  3. }
  4. // AQS中的方法
  5. public final boolean releaseShared(int arg) {
  6. // 模板方法 需要子类实现
  7. if (tryReleaseShared(arg)) {
  8. doReleaseShared();
  9. // 成功
  10. return true;
  11. }
  12. // 失败
  13. return false;
  14. }

Semaphore.Sync实现的tryReleaseShared方法逻辑如下:

  1. protected final boolean tryReleaseShared(int releases) {
  2. // 自旋
  3. for (;;) {
  4. // 获取当前的许可量
  5. int current = getState();
  6. // 加上需要释放的量
  7. int next = current + releases;
  8. if (next < current) // overflow
  9. throw new Error("Maximum permit count exceeded");
  10. // CAS修改state
  11. if (compareAndSetState(current, next))
  12. return true;
  13. }
  14. }

AQS.doreleaseShared的逻辑如下:

  1. private void doReleaseShared() {
  2. // 自旋
  3. for (;;) {
  4. Node h = head;
  5. if (h != null && h != tail) {
  6. int ws = h.waitStatus;
  7. if (ws == Node.SIGNAL) {
  8. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  9. continue; // loop to recheck cases
  10. // 唤醒下个节点 LockSupport.unpark
  11. unparkSuccessor(h);
  12. }
  13. else if (ws == 0 &&
  14. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  15. continue; // loop on failed CAS
  16. }
  17. if (h == head) // loop if head changed
  18. break;
  19. }
  20. }

 

Semaphore总结

Semaphore主要用于控制当前活动线程数目,就如同停车场系统一般,而Semaphore则相当于看守的人,用于控制总共允许停车的停车位的个数,而对于每辆车来说就如同一个线程,线程需要通过acquire()方法获取许可,而release()释放许可。

如果许可数达到最大活动数,那么调用acquire()之后,便进入等待队列,等待已获得许可的线程释放许可,从而使得多线程能够合理的运行。

评论交流
    说说你的看法
欢迎您,新朋友,感谢参与互动!