AbstractQueuedSynchronizer 原理分析 - 独占模式
AQS 结构
先来看看 AQS 有哪些属性,搞清楚这些基本就知道 AQS 是什么套路了
1 | // 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的 |
AbstractQueuedSynchronizer 的等待队列示意如下所示,注意了,之后分析过程中所说的 queue,也就是阻塞队列
不包含 head !!!
不包含 head !!!
不包含 head !!!
等待队列中每个线程被包装成一个 Node 实例,数据结构是链表
1 | static final class Node { |
Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next 四个属性而已,大家先要有这个概念在心里。
上面的是基础知识,后面会多次用到,心里要时刻记着它们,心里想着这个结构图就可以了。下面,我们开始说 ReentrantLock 的公平锁。再次强调,我说的阻塞队列不包含 head 节点。
首先,我们先看下 ReentrantLock 的使用方式。
1 | // 我用个web开发中的service概念吧 |
ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。
1 | abstract static class Sync extends AbstractQueuedSynchronizer { |
Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),我们看 FairSync 部分。
1 | public ReentrantLock(boolean fair) { |
线程强锁
1 | static final class FairSync extends Sync { |
说到这里,也就明白了,多看几遍 final boolean acquireQueued(final Node node, int arg) 这个方法吧。自己推演下各个分支怎么走,哪种情况下会发生什么,走到哪里。
解锁操作
最后,就是还需要介绍下唤醒的动作了。我们知道,正常情况下,如果线程没获取到锁,线程会被 LockSupport.park(this); 挂起停止,等待被唤醒。
1 | // 唤醒的代码还是比较简单的,你如果上面加锁的都看懂了,下面都不需要看就知道怎么回事了 |
唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:
1 | private final boolean parkAndCheckInterrupt() { |
总结
总结一下吧。
1 | 在并发环境下,加锁和解锁需要以下三个部件的协调: |
示例图解析
下面属于回顾环节,用简单的示例来说一遍,如果上面的有些东西没看懂,这里还有一次帮助你理解的机会。
首先,第一个线程调用 reentrantLock.lock(),翻到最前面可以发现,tryAcquire(1) 直接就返回 true 了,结束。只是设置了 state=1,连 head 都没有初始化,更谈不上什么阻塞队列了。要是线程 1 调用 unlock() 了,才有线程 2 来,那世界就太太太平了,完全没有交集嘛,那我还要 AQS 干嘛。
如果线程 1 没有调用 unlock() 之前,线程 2 调用了 lock(), 想想会发生什么?
线程 2 会初始化 head【new Node()】,同时线程 2 也会插入到阻塞队列并挂起 (注意看这里是一个 for 循环,而且设置 head 和 tail 的部分是不 return 的,只有入队成功才会跳出循环)
1 | private Node enq(final Node node) { |
首先,是线程 2 初始化 head 节点,此时 head==tail, waitStatus==0
然后线程 2 入队:
同时我们也要看此时节点的 waitStatus,我们知道 head 节点是线程 2 初始化的,此时的 waitStatus 没有设置, java 默认会设置为 0,但是到 shouldParkAfterFailedAcquire 这个方法的时候,线程 2 会把前驱节点,也就是 head 的waitStatus设置为 -1。
那线程 2 节点此时的 waitStatus 是多少呢,由于没有设置,所以是 0;
如果线程 3 此时再进来,直接插到线程 2 的后面就可以了,此时线程 3 的 waitStatus 是 0,到 shouldParkAfterFailedAcquire 方法的时候把前驱节点线程 2 的 waitStatus 设置为 -1。
这里可以简单说下 waitStatus 中 SIGNAL(-1) 状态的意思,Doug Lea 注释的是:代表后继节点需要被唤醒。也就是说这个 waitStatus 其实代表的不是自己的状态,而是后继节点的状态,我们知道,每个 node 在入队的时候,都会把前驱节点的状态改为 SIGNAL,然后阻塞,等待被前驱唤醒。这里涉及的是两个问题:有线程取消了排队、唤醒操作。其实本质是一样的,读者也可以顺着 “waitStatus代表后继节点的状态” 这种思路去看一遍源码。
疑问
¶保留head为刚开始的new Node()不好吗?为什么要重新设置一下head呢?
1 | final Node p = node.predecessor(); |
假设不调用 setHead(node),我们假设此时 A 持有这个锁,head 是 new Node() 那个空节点。
A 持有的锁用完了,释放锁,唤醒后继节点 B。后继节点 B 从 parkAndCheckInterrupt() 这个方法返回,注意这里的 for 循环。然后调用 final Node p = node.predecessor(); 这个方法,那么这个时候,p == head 就不成立了,也就进不到 tryAcquire(arg) 这里去获取锁。
1 | for (;;) { |
另一处。既然当前线程获取到了锁,它就不应该再是阻塞队列的一员。如果没有 setHead 这一步,下面这个方法就不准确了:
1 | public final Collection<Thread> getQueuedThreads() { |
¶这个finally里的cancelAcquire 似乎永远都不会被执行吧?为什么都是要从tail往前找第一个状态是非CANCEL的节点呢,如果从head往后找第一个状态是非CANCEL的话效率会不会高一点?
1 | boolean failed = true; |
第一个问题:这里的 failed 确实永远都不会为 true,cancelAcquire() 永远不会得到执行。那为什么要这么写呢,如果你看了后面的两篇,可能会有些体会,这部分其实是用于响应中断或超时的,你可以参考 doAcquireNanos(int arg, long nanosTimeout) 或 doAcquireInterruptibly(int arg)。在这个方法中确实是没用的,这更像是模板代码吧。
这段代码的异常可能发生在 tryAcquire(arg) 这里,因为这是依赖于子类来实现的。
第二个问题:应该说的是 unparkSuccessor(Node node) 这个方法。
1 | Node s = node.next; |
首先,第一行代码先检测 head 的后继节点,只有当此时的后继节点不存在或者这个后继节点取消了才开始从后往前找,所以大部分情况下,其实不会发生从后往前遍历整个队列的情况。(后继节点取消很正常,但是某节点在入队的时候,如果发现前驱是取消状态,前驱节点是会被请出队列的)
这个问题的答案在 addWaiter(Node mode)方法中,看下面的代码:
1 | Node pred = tail; |
所以,这里存在并发问题:从前往后寻找不一定能找到刚刚加入队列的后继节点。
¶感觉Java的monitor和AQS的目的和实现思路很相似,为什么还要再实现一遍AQS呢?
monitor 功能太单一了,就是获取独占锁,
AQS 相比 monitor,功能要丰富很多,比如我们可以设置超时时间,可以用线程中断进行退出,可以选择公平/非公平模式等, 采用 AQS 可以实现很多灵活的场景
¶acquireQueued方法里面,第一次调用shouldParkAfterFailedAcquire(p, node)的时候,把前驱节点waitStatus从0改为-1,然后返回false,回到acquireQueued方法,再尝试拿一次锁,然后第二次调用shouldParkAfterFailedAcquire返回true,调用parkAndCheckInterrupt()挂起线程。那么,如果在某线程B还没有挂起之前,前驱节点的线程A发现自己waitStatus为-1直接unpark,然后刚刚的线程B才挂起。那不就没人能唤醒它了吗?它是怎么保证被唤醒的?
你应该已经把流程摸清楚了,我就说一点就可以了,你的疑问其实在 unpark() 方法上。
1、如果一个线程 park 了,那么调用 unpark(thread) 这个线程会被唤醒;
2、如果一个线程先被调用了 unpark,那么下一个 park(thread) 操作不会挂起线程。
¶为什么要先读tail,再读head,我猜是为了增加tail为null的可能性,可是增加这种可能性的目的呢?
1 | public final boolean hasQueuedPredecessors() { |
看下面这段代码,如果是第一个 node 进队列的情况:
1 | private Node enq(final Node node) { |
也就是说,先有 head 然后才有 tail。
回到 hasQueuedPredecessors:
1 | public final boolean hasQueuedPredecessors() { |
有可能的情况就是 t 为 null,h 不为 null 对吧,这个时候返回值取决于 h.next。
如果调换一下 Node t = tail; 和 Node h = head; 那么可能出现 h 为 null,t 不为 null,这个方法会返回 false。
但是其实不对的,很可能这个间隙是有节点 enq 成功的。
¶为什么共享锁的node节点是new了一个节点,独占是null呢?
1 | 其实我也不知道是为什么??? |
¶请问这个里如果同步状态tryAcquire(arg)获取失败,就构造一个同步节点通过addWaiter(Node.EXCLUSIVE)将节点添加到尾部,如果条件成立执行 selfInterrupt()会中断当前线程吗 selfInterrupt()的用途不是太明白,有的书籍上说acquire(int arg)方法对中断不敏感,不会将获取同步状态失败的线程从同步队列中移除
1 | public final void acquire(int arg) { |
acquireQueued 返回值代表的是:是否被中断过。但是,不管是否被中断过,acquireQueued 出来以后,线程的中断状态一定是 false,所以如果发生过中断,要重新设置中断状态。
虽然 acquire(int arg) 确实是不关心中断的,但是它会保持这个状态,如果客户端想要知道是否发生过中断的话,还是可以知道的。因为中断情况下,中断状态虽然中间丢过,但是 selfInterrupt() 会设置回去。
会实际受到中断影响的是另一个方法 acquireInterruptibly(int arg),这个方法会通过抛出异常的方式告诉客户端。
acquireQueued 返回值代表的是:是否被中断过。但是,不管是否被中断过,acquireQueued 出来以后,线程的中断状态一定是 false。
请问为什么说:acquireQueued 出来以后,线程的中断状态一定是 false ?
看一下 Thread.interrupted() 这个方法,这个方法用于检测中断状态,同时会清除中断状态。
¶不是很明白head 节点是当前独占锁的持有者的意思(注释也说到head一般指的是占有锁的线程),请问从何作出这个判断? 看代码感觉整个阻塞队列(包括head节点)都没有当前占有锁线程的信息。
这是隐含的信息,ReentrantLock 具有排他性,lock() 方法要么阻塞,要么顺利拿到锁返回。
当 lock() 返回的时候,我们说当前线程持有了独占锁,而此时的 head 就是当前线程。
(这里说的情况不考虑连 head 都没有初始化的场景)
这样的说法很容易让人混淆,应该是得分两种情况考虑:
1、当前已有别的线程持有锁的时候,head是指向(head.next)下次解锁时即将能持有锁的线程。
2、当持有锁的线程unlock时, head 指向的就是当前持有锁的线程 ,但这个时间非常短,因为head马上又会指向一下个即将能持有锁的线程。
AcquireQueue()里,如果tryacquire()成功,会用setHead(node)将当前获得锁的Node设为Head。
¶为什么如果不满足条件就唤醒下一位
1 | private void cancelAcquire(Node node) { |
转化一下,如果要进入到 else,需要满足:
1 | (ws = pred.waitStatus) != Node.SIGNAL |
HongJie 2018-04-24 08:00:26
转化一下,如果要进入到 else,需要满足:
(ws = pred.waitStatus) != Node.SIGNAL
&&
ws > 0 || (ws <=0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))
再将其转化为下面两种情况:
(ws = pred.waitStatus != Node.SIGNAL) && ws > 0
前驱节点处于 CANCELLED 状态,显然是需要唤醒后继节点的,这条很简单
(ws = pred.waitStatus != Node.SIGNAL) && (ws <= 0) && (!compareAndSetWaitStatus(pred, ws, Node.SIGNAL))
这种情况下,在将前驱设置为 SIGNAL 的时候失败了,我想到的一种情况是,在 CAS 之前前驱设置为了 CANCELLED
结合 acquireQueued 方法来看,假设里面的 tryAcquire 抛出异常的场景。
¶pred.thread == null
head 是 new Node() 的“空节点”,要是不做唤醒后继节点的话,那。。。你懂的
¶VM的monitor最终是使用了(如果升级为重量锁)mutex,操作系统级的支持;能否这样理解,基于AQS的ReentrantLock不需要操作系统的锁支持了,所以比较轻?而且也不会升级为重量级锁,本身只是个等待队列而已。
我里理解的ReetrantLock 公平锁 和 synchronized 的原理差不多,也有重量级锁,也有升级的步骤。
ReetrantLock 首先会采用自旋获取锁(偏向锁),之后的线程进来发现获取不到锁,就加入队列中,如果前驱节点是 head,就会尝试获取一次错,如果没有获取成功,就线程挂起(锁膨胀)。涉及到上下文切换,系统调用。
¶1队列非空,2队列第一个节点(非head)非当前线程 满足两个条件返回true, 那么h != t && (s = h.next) == null 这样一种场景怎么理解
1 | public final boolean hasQueuedPredecessors() { |
你的问题需要到 enq 找答案:
1 | private Node enq(final Node node) { |
节点入队时,先成为 tail 然后才设置前驱的 next 属性。
那么对应于你的问题,h != t && h.next == null 对应的就是某个节点现在已经是 tail 了,但是 head.next 还是 null。
虽然文中没有分析到这个分支,不过还是有些细心的读者对这个很感兴趣的,你也可以往前看下另一个读者的问题,也是针对这个方法的。
这里肯定是不会为 null 的,你可以把你认为它可能会为 null 的分析过程描述得详细一些,这样我们比较容易在一个频道上。
¶这个不是个死循环嘛?这么循环怎么能成立的呀?麻烦大神给解释一下
1 | do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0);<br>pred = pred.prev |
这里没有死循环呀,一直在沿着队列往前走,找到一个 waitStatus<=0 的节点
¶阻塞队列的头结点是什么时候初始化的呢?
- 当前队列为空
- 2.当有线程阻塞的时候
¶为什么不在锁的构造器里就先建一个阻塞队列的头结点呢?
如果没有线程竞争锁的话就是浪费一个节点的空间,Doug Lea大师的注释如下。可见大师的代码一点一滴都体现水平。
- CLH queues need a dummy header node to get started. But
- we don’t create them on construction, because it would be wasted
- effort if there is never contention. Instead, the node
- is constructed and head and tail pointers are set upon first
- contention. ``` 问:阻塞队列的头结点什么时候会被GC呢? 答:当队列里第一个Node节点得到锁后,该节点会被设置成新的头结点。那么原来“老”的头结点由于没有任何引用指向它,就会被GC回收。
¶队列中的线程节点node被唤醒后,node直接变为head,指向原head的指针pred没有被置null,原head节点没法回收吧
1 | final boolean acquireQueued(final Node node, int arg) { |
在 setHead(node) 方法里面:
1 | private void setHead(Node node) { |
¶head节点的属性是不是state,这个state表示当前锁已经被占用几次了。阻塞队列里的node的属性是waitStatus,代表着阻塞队列里节点的状态。这是两个枚举值完全不相关的属性吗?
head节点和阻塞队列其它的node都是waitStatus,state是AQS的属性,不相关的
¶刚初始化 fifo 的时候,h=new Node(); t = null; 这样子 h != t && (s = h.next) == null 不是为 true 吗, h.next == null; 因为是刚初始化的节点,这样子不是没有 Predecessors 嘛。那这个明明没有Predecessors 却返回 true ;不是错了吗 是我理解错误吗?
只要 head!=tail 就说明有新的节点进来到队列的尾部了,如果 h.next == null, 说明正在初始化节点中,如果不是初始化中的话,只要 Head 的下一个节点不是 刚进来的 thread 的 的Node,如果是的话就说明没有正在等待的节点,
对了先补充下这个方法的意思,查询是否有任何线程等待获取比当前线程更长的时间。
true如果有排队线前面的当前线程,并 false如果当前线程在队列或队列的头部是空的
¶对于 cancelAcquire() 的 unparkSuccessor(node); 会不会唤醒两次呢。 如果 pred == head;然后又刚好 pred 线程也要出同步块了 也调用了 unparkSuccessor(node); 那这个时候 node.next 是不是 会两次 unpark() 呢,因为 unpark 会抵消 park() ,所以 在第一次 unpark 将线程唤醒了,第二次 unpark 将许可证置为可用的.那么下次 condition 的 waiter 会不会 被抵消掉呢?
你的问题挺有趣的,我没有仔细去推演每一步,不过我觉得其实这也不是什么大问题。仔细想想,即使真的是两次 unpark 了(假设真的如此),大不了就是后面会出现一次 park 直接返回。
对于 park 方法,我们本来就是要假设它有可能会无故返回的,被中断或者系统的假唤醒,所以这些代码往往都在循环体中。
¶获取锁、释放锁,阻塞队列的Node数量不会减少吗?好像没看到在哪里减少阻塞队列里面Node的数量?
因为我们通常并不关心阻塞队列中到底有多少 Node
获取到锁的节点会变成head,在那里会把原来的head移出队列
在acquireQueued里面获取到同步状态,下面两条语句让Head出列。
setHead(node);
p.next = null;
¶在acquireQueued里会做自旋操作,如果前边节点是head就尝试获取锁,如果前面节点不是head,做后面的挂起或者不挂起. 这样的话,只有一个节点(就是head后面的那个节点)才会尝试获取锁,这样何来竞争锁的说法呢?
1 | 因为非公平锁的实现里面,每个新来的线程,是不管队列是否为空,都会先去做一次抢锁的。 |
¶在unparkSuccessor()方法中,如果s == null || s.waitStatus > 0,会从阻塞队列的尾部开始一直向前找,找到最前面一个waitStatus == -1的节点,将其唤醒。我的问题是,唤醒之后,该线程从acquireQueued(Node node, int arg)方法的for循环中醒过来,继续循环,只有当自己的前驱节点为head节点即node.predecessor() == head时,才会去tryAcquire()抢锁,可是在从后往前找的过程中,并没有看到把当前找寻的目标节点和head节点链接上的步骤啊,这样不就不能判断node.predecessor() == head为true吗?也就不能抢锁了…
问题在 acquireQueued 这个方法中。
我不清楚你具体的疑惑在哪里,但是你可以看看嵌套在死循环中的 shouldParkAfterFailedAcquire。
我觉得 AQS 里面有个比较重要的点是,它总是先保证节点能安全进入到队列中,至于你说的这种边界问题,都在“被动”发现 waitStatus 不对的时候去“纠正”它。