Java多线程之ReentrantLock(二)源码解读

    科技2025-07-21  8

    其实,在我们Java中也有一个使用了Java语言开发的一把锁,名字叫ReentrantLock。它位于我们的java.uilt.concurrent包下的一个线程安全的工具类。

    接上一篇讲,上一篇我们介绍了ReentrantLock的基本用法,并且也通过ReentrantLock底层实现的原理自己手写了一把ReentrantLock。那么本期我们就开始深入的来看一看ReentrantLock核心源码到底是什么样的。 首先我们来看ReentrantLock的类结构,

    不知道大家有沒有注意到,我昨天我昨天说ReentrantLock是基于我们的AbstractQueueSynchronize这个类实现的一把锁,但是为什么我在这里看不到呢?好像关系图当中并没有显示出来ReentrantLock有继承AQS啊。其实ReentrantLock有两个内部类一个是Segement,这个东西就是我们jdk1.7当中ConcurrentHashMap所用到的锁,而Sync就是我们ReentrantLock实现加锁解锁逻辑的关键所在。

    一.ReentrantLock类内部结构简介

    首先,我们的ReentrantLock内部的一个抽象类叫Sync,而实现Sync的有两个子类,第一个是NonfairSync,第二个是FairSync。我们先看一下Sync当中的代码。

    abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }

    由于在这里我只是简介,所以不会对它的方法做过多详细的介绍。我们看一下Sync内部的样子,首先第一个抽象方法lock,这个不用我多说,就是我们加锁的方法。而tryRelease就是释放锁逻辑。而剩下的一部分逻辑由他的两个子类实现。所以它这里用的设计模式是:模板方法

    二.AQS简单介绍

    在开始讲ReentrantLock之前呢,我们需要先对AQS里的方法有一些了解。那上一篇博客我有提到CLH队列,那么我们就先从CLH队列开始说起。

    1.CLH队列

    1)Node节点属性

    static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter;

    这么一大堆东西肯定第一次看这玩意的人都懵了,这是个啥????

    对于我们的ReentrantLock来讲,我们只需要关注prev、next、thread、waitStatus还有EXCLUSIVE,其他的我们可以先不用理解。 prev和next这个不用我多解释,构成双向链表的必要条件。那么他的thread其实就是指向的被加入到队列当中的线程,而waitStatus则是需要对该节点性质进行一个判断。当我们初始化的时候waitStatus就是0,然后在根据我们节点的性质去给他设置不同的值,也就是 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; 这四个东西。我们今天知道CANCELLED和SIGNAL就够了,剩下的我们还不需要去了解它。 CANCELLED:代表出现异常,可能是中断引起的。需要废弃结束 SIGNAL:表示节点线程可以被唤醒 还有一个就是我刚刚说的 初始化的时候 waitStatus是0。上面这些大家看不懂没关系,我们待会会给大家解释清楚。那么这个EXCLUSIVE其实就是我们节点的性质,他是一个互斥的还是一个共享的。一会看ReentrantLock源码的时候,我们就会看到他在加锁的过程当中会传一个EXCLUSIVE过来表示这个是一个互斥的。 我们继续往下看

    private transient volatile Node head; private transient volatile Node tail; private volatile int state;

    head和tail我这里也不过多解释,主要是state。这个东西很重要,之所以ReentrantLock能够保证我们的可重入性质,也是因为state存在的原因。回到刚刚我说的可重入性代码那里,当我们第一个线程进来通过lock方法拿到锁了之后,state就会+1,再拿一次也会+1。当我们释放锁的时候state就会-1。注意,在我们进行+1操作的时候,实际上使用的是CAS操作来进行+1的,这和我上一篇博客所做的操作是一样的,因为在拿锁的时候可能会产生并发问题,所以我们需要用CAS操作来保证线程安全。 在了解了AQS之后,我们就可以开始今天的ReentrantLock源码解读了

    三.ReentrantLock源码解读

    在刚刚,我有提到过我们的ReentrantLock有一个NonFairSync和FairSync的类,这两个类实现的逻辑大同小异,所以我将会对公平锁的实现逻辑进行一个解读,非公平锁的大家有兴趣可以自己研究一下。

    1.FairSync源码解析

    1).lock方法

    final void lock() { acquire(1); }

    首先是我们第一个方法,这个方法我觉得应该都知道,其实就是拿锁的方法,它这个方法里面会调用acquire并传一个1过去,这个1其实就是当我一个线程进来拿锁,我要对state增加多少。比如现在有一个线程T1,调用了lock方法。那么我这时候的state就是1。而如果我传一个2进去,那么这时候我的state就是2。主要是因为AQS集成了很多的工具类,比如Hystrix当中用到的Semaphore,又比如我们线程池当中的Worker,他们都是实现了AQS当中的抽象方法。所以AQS中有很多东西不会写死,但是AQS的代码可读性非常差!

    2.acquire方法

    public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

    这个方法了通过一个if判断如果判断成立,则会调用selfInterrupt方法,这个方法其实就是让当前线程直接中断操作 对多线程不了解的小伙伴可以去了解一下interrupt这里不做过多的说明。那么我们先来看他第一个判断条件

    tryAcquire
    protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

    看到这里,我相信你已经猜到了,没错!这里就是拿锁的逻辑,我上一篇博客在手写ReentrantLock的是时候是不是跟他的拿锁逻辑差不多。

    首先先获取当前线程和一个计数器(其实正确说法叫信号量,不过这里为了便于理解我暂且称之它为计数器) 第一个if逻辑其实就是拿锁,先通过CAS操作对state+1 如果成功了就设置当前执行的线程给变量exclusiveOwnerThread而这个变量的作用就是用来判断是不是重入操作。当tryAcquire返回为true的时候就不会进入acquire方法的if逻辑里面。而第二个方法就是判断我们需要拿锁的线程是不是重入,如果是就将我们的state加一个1 第一个判断条件讲完了我们看第二个条件

    acquireQueued
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

    这个方法会接收两个参数,Node就是代表我们的节点,而这个Node参数的接收其实是通过addWaiter这个方法获取到的,所以我们先来看一下addWaiter

    private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

    首先会通过Node的构造方法构造一个节点,因为我的所有排队的线程都是放到CLH队列里面,所以第一个参数肯定是我需要排队的线程。而这个mode就是标注我这个节点的性质是一个互斥的。接下来就会进入到入队操作。但是在入队之前会先判断一下通过tail这个方法判断,我当前这个CLH队列是否是一个初始队列,如果是一个初始的队列那我肯定此时的tail肯定是为null的。说明此时这个线程是第一个进来的线程,所以要对我们队列进行一个初始化,而调用初始化的时候我们顺便在把当前节点给他放入到CLH队列当中,所以会调用enq方法。

    private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

    我们可以看到因为在队列初始化的时候,我们是很可能出现并发问题的,所以在入队操作的时候都会涉及到CAS操作,而我们CAS操作如果返回的是false那么接下来的操作都没法去做,所以需要通过自旋来保证下面的操作可以被执行。 这时候又出现一个新的问题了,既然在初始化的过程当中会出现并发问题需要用到自旋,但是为什么在入队操作的时候反而只用了CAS入队而没有用自旋呢? 答案就在acquireQueued方法里面,我们回到这个方法里来看一下

    final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

    是不是发现这里用到了自旋的操作,也就是说,如果addWaiter里CAS入队失败了之后,那么返回的node依然是一个原始Node。 注意,接下来是重点,因为接下来的操作非常的绕,所以我们重点讲一下下面的操作。我们贴上for循环里面的代码

    for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; }

    我们一句话一句话开始看,首先第一个。获取前驱节点

    final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }

    如果前驱节点为null 说明我们在addWaiter当中的操作肯定出现了问题,所以这时候需要抛出一个空指针异常。如果存在前驱节点,则返回前驱节点。 接下来就是if判断。 首先第一个判断,p == head。这个可能有点绕,我们需要再贴上刚刚enq方法里的代码来帮助我们分析。

    private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

    我们再根据这段代码画个图。 那么再进过上面的流程的时候,我们的节点会变成这样一个东西 此时的tail 已经是node 而此时的head依旧是new Node()那个空的head。所以通过

    这个方法获取到的p是我们初始化时候赋值的tail,而tail又是在初始化的时候通过head赋值过来的,所以虽然我们node设置为了tail但是同样也给node的prev设置了初始化时候的tail。所以这时候我们的prev == head 既 p == head那么这个if判断的第一个条件是会成立的,也就会执行拿锁操作。这时候则会设置头结点并返回一个false给acquire方法当中的if判断,那么此时就不会调用interrupted()方法来中断线程。因为我要尽量避免阻塞操作,加入我是队列里第一个,那么我此时直接阻塞是一件毫无意义的事情。因为如果我立马阻塞住自己,但是此时锁刚被释放,那么还需要一个释放锁的操作通过LockSupport.unpark()方法来对我进行解锁,其实是没有必要的,所以当我们是第一个节点的时候,会再次尝试去拿一下锁。

    我们接着往下看 这里又是一个if判断没关系,我们先看第一个条件

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

    他这个方法会去判断前驱节点的waitStatus,通过前驱节点的waitStatus来对自己进行一个操作。为什么要用前驱节点呢?主要是 释放锁的时候会通过当前拿到锁正在运行的,也就是我们的头结点的waitStatus进行判断,如果是SIGNAL状态,就表示可以唤醒,则唤醒下一个节点执行相应逻辑。而如果waitStatus大于0 就说明出现了异常或者之类的操作导致要被取消,而CANCELLED就是1 所以大于0 则会执行逻辑。 假设说我们现在反悔的是true,那么他就会执行LockSupport,park方法阻塞住自己并且清除掉我的中断信号。

    为什么再需要一次清除自我中断的标记呢? 因为我要先去判断一下,如果有中断下面的逻辑我也不执行了,直接跳出来,那么这样我们就很好避免强制杀死锁产生的一系列的问题并且ReentrantLock内部有一个方法lockInterruptibly()他是依赖于interrupter进行阻塞和释放锁的

    如果这是因为一些操作出现问题抛出异常,比如中断操作,那么肯定就会执行finally代码快中的cancelAcquire。 这个方法会将自身标记为CANCELLED状态并且清除掉所有无效节点。

    以上就是我们加锁过程中的主要的源码解析了,整体下来还是比较绕,建议大家可以先看一遍源代码,跟着源代码的思路画一遍流程图,再整合一下会有帮助与你缕清你的思路。

    2).unlock方法

    由于刚刚加锁的方法我讲的非常详细,所以unlock方法有一些重复的我就不会再讲,我们直接看他的release方法

    看到没,将当前的head节点取出来传给unparkSuccessor方法, 而我们的unparkSuccessor方法则会对该节点进行一个判断,如果waitStatus小于0则使用cas更新等待状态,并执行unpark方法唤醒我们的head节点的后继节点。 文章到这里就结束了,喜欢的话记得点赞收藏转发哦~!

    Processed: 0.039, SQL: 8