AQS源码解读(九)——Semaphore信号量原理详解

    科技2025-02-17  11

    更多JUC源码解读系列文章请持续关注JUC源码解读文章目录JDK8!


    文章目录

    前言代码示例Semaphore基本结构获取资源许可tryAcquireSharedSemaphore.NonfairSync#tryAcquireSharedSemaphore.FairSync#tryAcquireShared 释放资源许可Semaphore.Sync#tryReleaseShared 资源许可清零Semaphore#drainPermits总结

    前言

    Semaphore翻译是信号量的意思,可以控制并发访问资源的数量。

    Semaphore semaphore = new Semaphore(10); //初始化一个信号桶,也可以叫令牌桶,里面有10个资源许可 semaphore.acquire();//获取一个资源许可 semaphore.release();//释放一个资源许可

    10个资源许可被取完后,再来线程获取就会入队列阻塞。

    Semaphore是在AQS基础上实现的共享锁,获取资源和释放资源都是调用的AQS中共享锁模板方法,故只需要看tryAcquireShared和tryReleaseShared在Semaphore中的实现。

    代码示例

    首先初始化Semaphore,给资源许可5个,模拟多个线程获取资源。

    public class Test1072 { private static Semaphore semaphore = new Semaphore(5); public static void main(String[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(new MyThread(i)); thread.start(); } } static class MyThread implements Runnable{ private int i; public MyThread(int i) { this.i = i; } @Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + ", semaphore.acquire()获取资源--" + i); } catch (InterruptedException e) { e.printStackTrace(); } finally { // System.out.println(Thread.currentThread().getName() + ",semaphore.release()释放资源--" + i); // semaphore.release(); } } } } //控制台输出 //线程只获取资源不释放 Thread-0, semaphore.acquire()获取资源--0 Thread-1, semaphore.acquire()获取资源--1 Thread-2, semaphore.acquire()获取资源--2 Thread-6, semaphore.acquire()获取资源--6 Thread-3, semaphore.acquire()获取资源--3 //线程获取资源并释放资源 Thread-0, semaphore.acquire()获取资源--0 Thread-2, semaphore.acquire()获取资源--2 Thread-2,semaphore.release()释放资源--2 Thread-1, semaphore.acquire()获取资源--1 Thread-1,semaphore.release()释放资源--1 Thread-0,semaphore.release()释放资源--0 Thread-3, semaphore.acquire()获取资源--3 Thread-3,semaphore.release()释放资源--3 Thread-5, semaphore.acquire()获取资源--5 Thread-5,semaphore.release()释放资源--5 Thread-4, semaphore.acquire()获取资源--4 Thread-4,semaphore.release()释放资源--4 Thread-6, semaphore.acquire()获取资源--6 Thread-6,semaphore.release()释放资源--6 Thread-7, semaphore.acquire()获取资源--7 Thread-7,semaphore.release()释放资源--7 Thread-9, semaphore.acquire()获取资源--9 Thread-9,semaphore.release()释放资源--9 Thread-8, semaphore.acquire()获取资源--8 Thread-8,semaphore.release()释放资源--8

    在没有释放资源前,只有五个线程可以获取资源,其他线程都进入队列阻塞。当线程获取资源,执行完业务代码释放资源,则每个线程都可以获取资源,无需阻塞。

    Semaphore基本结构

    Semaphore中也有一个内部类Sync,Sync继承自AbstractQueuedSynchronizer,所以核心代码都在Sync中。从构造函数看出,Sync还有两个子类NonfairSync和FairSync,说明Semaphore实现的共享锁区分公平性。

    public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** All mechanics via AbstractQueuedSynchronizer subclass */ private final Sync sync; /** * Synchronization implementation for semaphore. Uses AQS state * to represent permits. Subclassed into fair and nonfair * versions. */ abstract static class Sync extends AbstractQueuedSynchronizer { ... ... } public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } }

    获取资源许可

    Semaphore提供了可中断获取资源许可(Semaphore#acquire()),不可中断获取资源许可(Semaphore#acquireUninterruptibly()),尝试获取资源许可(Semaphore#tryAcquire()),超时可中断获取资源许可(Semaphore#tryAcquire(long, java.util.concurrent.TimeUnit))。底层代码调用的都是AQS中共享模式获取锁的模板方法,故只需要看看tryAcquireShared在Semaphore中的实现。

    //1.可中断获取资源许可 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //2.可中断获取permits个资源许可 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } //3.不可中断获取资源许可 public void acquireUninterruptibly() { sync.acquireShared(1); } //4.不可中断获取permits个资源许可 public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } //5.尝试获取资源许可,获取成功返回true public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } //6.尝试获取permits个资源许可,获取成功返回true public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } //7.超时可中断获取资源许可,获取失败进入队列阻塞一段时间,超时还未获取返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //8.超时可中断获取permits个资源许可,获取失败进入队列阻塞一段时间,超时还未获取返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }

    tryAcquireShared

    Semaphore中获取资源许可有公平锁和非公平锁之分。

    Semaphore.NonfairSync#tryAcquireShared

    NonfairSync中的tryAcquireShared调用了Semaphore.Sync中已经实现好的nonfairTryAcquireShared。

    static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }

    获取资源许可,就是对state做减法,remaining =当前许可量—获取的许可量,remaining >= 0即可以CAS修改state资源许可数,获取成功返回remaining资源许可剩余量。CAS修改state失败会不断自旋。

    //java.util.concurrent.Semaphore.Sync#nonfairTryAcquireShared final int nonfairTryAcquireShared(int acquires) { for (;;) { //获取可用资源许可量 int available = getState(); //计算可用资源许可剩余量 int remaining = available - acquires; //remaining <0 说明资源许可耗尽 //remaining >= 0 可继续获取资源许可cas if (remaining < 0 || compareAndSetState(available, remaining)) //返回剩余许可量 return remaining; } }

    Semaphore.FairSync#tryAcquireShared

    FairSync中tryAcquireShared与NonfairSync唯一的区别就是FairSync中首先会判断当前同步队列中是否有线程也在等待获取资源许可,有则获取失败返回-1,没有则继续获取许可。CAS修改state失败会不断自旋。

    static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { //判断同步队列中是否有线程在等待获取资源许可,有则获取失败返回-1 if (hasQueuedPredecessors()) return -1; //同步队列中没有线程在等待,则可以继续获取许可,同非公平下的获取许可 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }

    释放资源许可

    释放资源许可,底层调用的是AQS中共享锁释放的模板方法releaseShared,故只需要看tryReleaseShared在Semaphore中的实现。

    //释放一个资源许可 public void release() { sync.releaseShared(1); } //释放permits个资源许可 public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }

    Semaphore.Sync#tryReleaseShared

    释放资源许可,是对state做加法,当前许可量+释放的许可量。CAS修改state失败会不断自旋。释放成功返回true,直接唤醒head后继节点(不像ReentrantReadWriteLock.ReadLock的释放锁,需要完全释放锁才唤醒head后继节点)。

    protected final boolean tryReleaseShared(int releases) { for (;;) { //获取当前资源许可量 int current = getState(); //当前许可量+释放的许可量 int next = current + releases; if (next < current) // overflow //溢出了 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }

    资源许可清零Semaphore#drainPermits

    CAS自旋清空信号量。

    public int drainPermits() { return sync.drainPermits(); } //Semaphore.Sync#drainPermits final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } }

    总结

    Semaphore中state是资源许可数量的意思,若state初始化为1,则Semaphore退化为排他锁。获取资源许可acquire是对state做减法。释放资源许可release是对state做加法。tryAcquireShared返回0代表获取资源许可成功,但是后面没有资源许可了。tryReleaseShared释放许可成功,直接唤醒同步队列head后继节点。Semaphore中state的CAS操作都进行了自旋。

    PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!

    Processed: 0.012, SQL: 8