更多JUC源码解读系列文章请持续关注JUC源码解读文章目录JDK8!
何为共享锁?共享锁就是多个线程可以共享一把锁,如ReentrantReadWriteLock的ReadLock是共享锁,Semaphore是共享锁,CountDownLatch是共享锁,且这三个都是基于AQS实现的。
在AQS中共享锁和独占锁一样,也实现了一套通用的模板,子类只需要实现如何获取锁(tryAcquireShared),如何释放锁(tryReleaseShared)的逻辑。
共享模式获取锁的过程和独占锁非常类似,都是先获取锁,失败之后进入同步队列操作。tryAcquireShared在AQS中没有给出具体实现,在子类ReentrantReadWriteLock.ReadLock、Semaphore和CountDownLatch中相应实现了获取共享锁的逻辑。
tryAcquireShared返回值小于0,表示获取共享锁失败,从而进入同步队列操作doAcquireShared。
public final void acquireShared(int arg) { //tryAcquireShared 返回-1获取锁失败,返回值大于1或者0获取锁成功 if (tryAcquireShared(arg) < 0) //获取锁失败,进入队列操作 doAcquireShared(arg); }doAcquireShared的逻辑和独占锁acquireQueued非常类似,入队列、自旋、阻塞等。基本流程如下:
新建共享节点node,并入队列(addWaiter(Node.SHARED))。自旋判断新节点node前驱是否是head,是则获取锁(tryAcquireShared),获取锁成功并传播唤醒后继(setHeadAndPropagate)。新节点node前驱不是head,则判断是否应该阻塞shouldParkAfterFailedAcquire。判断应该阻塞,则阻塞当前线程parkAndCheckInterrupt。被其他线程唤醒,或中断导致唤醒,则自旋重复2、3、4、5。 private void doAcquireShared(int arg) { //创建一个读节点,并入队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { //如果前继节点是head,则尝试获取锁 int r = tryAcquireShared(arg); if (r >= 0) { //获取锁成功,设置新head和共享传播(唤醒下一个共享节点) setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } /** * p不是头结点 or 获取锁失败,判断是否应该被阻塞 * 前继节点的ws = SIGNAL 时应该被阻塞 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }自旋判断新节点node前驱是head,且尝试获取共享锁成功,则需要传播唤醒共享后继节点。基本流程如下:
node出队列,并成为新的head。判断是否满足传播条件。如若node还有后继且后继是共享节点,则执行唤醒操作doReleaseShared。(共享锁的传播模式比较复杂,如需深入了解,请阅读拙作《AQS源码解读(七)——从PROPAGATE和setHeadAndPropagate()分析共享锁的传播性》)
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //设置node为新head setHead(node); //一连串判断满足后,唤醒共享后继 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { //唤醒后继共享节点 Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }共享锁判断判断是否应该阻塞和阻塞线程的代码和独占锁的完全一样,不过多赘述。shouldParkAfterFailedAcquire主要做了如下几步:
判断node前驱状态是否为SIGNAL,是则直接返回true。node前驱状态不是SIGNAL,有可能是ws>0,说明前驱取消了,自旋跳过取消的节点,并寻找链接一个正常的前驱。node前驱状态不是SIGNAL,有可能是0(初始化状态)或PROPAGATE(传播状态),修改node前驱状态为SIGNAL。 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. * node拿锁失败,前继节点的状态是SIGNAL,node节点可以放心的阻塞, * 因为下次会被唤醒 */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. * pred节点被取消了,跳过pred */ do { //pred = pred.prev; //node.pred = pred; node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //跳过取消节点,给node找一个正常的前驱,然后再循环一次 pred.next = node; } else { /* 0 -3 * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }shouldParkAfterFailedAcquire判断返回true,则调用parkAndCheckInterrupt阻塞当前线程。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }acquireSharedInterruptibly在代码结构上与doAcquireShared类似,不同之处在于doAcquireShared不响应中断,acquireSharedInterruptibly响应中断。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) //被打断 抛出异常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //获取锁失败 doAcquireSharedInterruptibly(arg); }doAcquireSharedInterruptibly逻辑上也和doAcquireShared类似,但是doAcquireSharedInterruptibly在线程阻塞期间,被中断导致唤醒,将抛出异常InterruptedException,响应中断,此时failed=true,finally块中会执行取消节点的操作,取消过程和独占锁一样。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //新建节点入队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //node节点的前驱节点是head,获取锁 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //在阻塞期间因为中断而唤醒,抛异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }和独占锁的可超时获取锁类似,共享模式超时获取锁,不仅可以响应中断,还可以将线程阻塞一段时间,自动唤醒。tryAcquireSharedNanos可传入一个纳秒单位的时间nanosTimeout,可超时的逻辑在doAcquireSharedNanos中。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }doAcquireSharedNanos和doAcquireShared逻辑类似,但是不仅可以响应中断,同时还可以让线程阻塞一段时间自动唤醒,如果超时了还没获取锁则返回false。
doAcquireSharedNanos还有一个非常不同之处,就是即使shouldParkAfterFailedAcquire判断应该阻塞了,也有可能不阻塞,还会再自旋一段时间,这个自旋的时长有一个阈值spinForTimeoutThreshold = 1000L,1000纳秒,自旋了1000纳秒后还没有获取锁,且此时也判断应该阻塞了,就让线程休眠一段时间。
线程唤醒,有可能是自动唤醒,有可能是被其他释放锁的线程唤醒,还有可能是线程中断导致唤醒,如果是线程中断唤醒,则需要响应中断,抛出异常InterruptedException;如果没有中断,则继续循环刚才的流程(判断前驱是否是head,判断是否超时,判断是否需要阻塞等)。
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; //判断是否应该阻塞 //在阈值spinForTimeoutThreshold范围自旋一定时长, if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //阻塞一定时间,时间到可自动唤醒 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) //线程如若发生中断,响应中断抛出异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }共享模式释放锁的逻辑很简单,tryReleaseShared释放锁成功后,调用doReleaseShared唤醒后继节点。tryReleaseShared需要子类自行实现。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //读锁释放才唤醒后继节点 doReleaseShared(); return true; } return false; }共享锁释放和在AQS同步队列中的共享节点获取锁,传播唤醒时都调用了doReleaseShared,其唤醒后继是一个自旋的过程(for(;;)),其基本流程如下:
获取head,判断队列是否为空。不为空,则继续判断head的状态是否是SIGNAL,即是否可以唤醒后继。head的状态为SIGNAL,则将状态置为0。head状态置为0成功,则唤醒后节点线程unparkSuccessor,失败则自旋重试。若head状态为0,则将其置为PROPAGATE,使得head具有传播唤醒的特性。如若自旋的过程中head变了,则继续自旋唤醒后继,head没有变,跳出自旋。 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //SIGNAL --> 0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒后继节点的线程 unparkSuccessor(h); } else if (ws == 0 && //0 --> PROPAGATE !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } /** * we must loop in case a new node is added * while we are doing this */ if (h == head) // loop if head changed //head没有变则break break; } }注意:共享锁的传播性,较为复杂,单独撰写一篇文章详细讲解,如需了解,请移步《AQS源码解读(六)——从PROPAGATE和setHeadAndPropagate()分析共享锁的传播性》。
因为共享锁的源码和独占锁的源码有重复部分,这里没有过多赘述,如需了解,请移步《AQS源码解读(二)——从acquireQueued探索独占锁实现原理,如何阻塞?如何唤醒?》
PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!