查看原文
其他

扔掉源码,15张图带你彻底理解Java AQS

ImportNew 2022-10-28

The following article is from 程序员jinjunzhu Author jinjunzhu

Java 中 AQS AbstractQueuedSynchronizer AQS 依赖 FIFO 队列来提供一个框架这个框架用于实现锁以及锁相关的同步器,比如信号量、事件等


在 AQS 中,主要有两部分功能:一部分是操作 state 变量,第二部分是实现排队和阻塞机制。


注意:AQS并没有实现任何同步接口,它只是提供了类似acquireInterruptible的方法,调用这些方法可以实现锁和同步器。


1 管程模型


Java 使用 MESA 管程模型来管理类的成员变量和方法,让这个类的成员变量和方法的操作是线程安全的。下图是 MESA 管程模型,里面除了定义共享变量外,还定义了条件变量和条件变量等待队列:


MESA 管程模型


Java 中 的 MESA 模型有一点改进,就是管程内部只有一个条件变量和一个等待队列。


下图是 AQS 的管程模型:


AQS 的管程模型


AQS 的管程模型依赖 AQS 中的 FIFO 队列实现入口等待队列,而 ConditionObject 则实现了条件队列,这个队列可以创建多个。


本文主要讲解入口等待队列获取锁的几种方式。


2 获取独占锁


2.1 独占,忽略 interrupts


public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}


这里的 tryAcquire 是抽象方法,有 AQS 的子类来实现,因为每个子类实现的锁是不一样的。


入队


上面的代码可以看到,获取锁失败后,会先执行 addWaiter 方法加入队列,然后执行 acquireQueued 方法自旋地获取锁直到成功。


addWaiter 代码逻辑如下图。简单说就是把 node 入队,入队后返回 node 参数给 acquireQueued 方法:


addWaiter 代码逻辑


这里需要注意:如果队列为空,则新建一个 Node 作为队头。


入队后获取锁


acquireQueued自旋获取锁逻辑如下图:


acquireQueued 自旋获取锁逻辑


这里有几个细节:


1. waitStatus


  • CANCELLED(1):当前节点取消获取锁。当等待超时或被中断(响应中断),会触发变更为此状态,进入该状态后节点状态不再变化;
  • SIGNAL(-1):后面节点等待当前节点唤醒;
  • CONDITION(-2):Condition 中使用,当前线程阻塞在 Condition。如果其他线程调用了 Condition 的 signal 方法,这个结点将从等待队列转移到同步队列队尾,等待获取同步锁;
  • PROPAGATE(-3):共享模式,前置节点唤醒后面节点后,唤醒操作无条件传播下去;
  • 0:中间状态,当前节点后面的节点已经唤醒,但是当前节点线程还没有执行完成。


2. 获取锁失败后挂起


如果前置节点不是头节点,或者前置节点是头节点但当前节点获取锁失败,这时当前节点需要挂起,分三种情况:


前置节点 waitStatus=-1


前置节点 waitStatus > 0



前置节点 waitStatus < 0,不等于 -1


3. 取消获取锁


如果获取锁抛出异常,则取消获取锁,如果当前节点是 tail 节点,分两种情况如下图:



如果当前节点不是 tail 节点,也分两种情况,如下图:


4. 对中断状态忽略


5. 如果前置节点的状态是 0 或 PROPAGATE,会被当前节点自旋过程中更新成 -1,以便之后通知当前节点。


2.2 独占 + 响应中断


对应方法 acquireInterruptibly(int arg)。


跟忽略中断(acquire 方法)不同的是要响应中断,下面两个地方响应中断:


  • 获取锁之前会检查当前线程是否中断;
  • 获取锁失败入队,在队列中自旋获取锁的过程中也会检查当前线程是否中断。


如果检查到当前线程已经中断,则抛出 InterruptedException,当前线程退出。


2.3 独占 + 响应中断 + 考虑超时


对应方法 tryAcquireNanos(int arg, long nanosTimeout)。


这个方法具备了独占 + 响应中断 + 超时的功能,下面两个地方要判断是否超时:


  • 自旋获取锁的过程中每次获取锁失败都要判断是否超时;
  • 获取锁失败 park 之前要判断超时时间是否大于自旋的阈值时间 (spinForTimeoutThreshold = 1ns)。


另外,park 线程的操作使用 parkNanos 传入阻塞时间。


3 释放独占锁


独占锁释放分两步:释放锁,唤醒后继节点。


释放锁的方法 tryRelease 是抽象的,由子类去实现。


我们看一下唤醒后继节点的逻辑。首先需要满足两个条件:


  • head 节点不等于 null;
  • head 节点 waitStatus 不等于 0。

这里有两种情况(在方法 unparkSuccessor):


  • 情况一:后继节点 waitStatus <= 0,直接唤醒后继节点,如下图:

后继节点 waitStatus <= 0


  • 情况二:后继节点为空或者 waitStatus > 0,从后往前查找最接近当前节点的节点进行唤醒,如下图:
后继节点为空或 waitStatus > 0


4 获取共享锁


之前我们讲了独占锁,这一小节我们谈共享锁。二者有什么不同呢?


4.1 共享,忽略 interrupts


对应方法 acquireShared,代码如下:


public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);}


4.2 tryAcquireShared


这里获取锁使用的方法是 tryAcquireShared,获取的是共享锁。获取共享锁跟获取独占锁不同的是,会返回一个整数值,说明如下:


  • 返回负数:获取锁失败;
  • 返回0:获取锁成功但是之后再由线程来获取共享锁时就会失败;
  • 返回正数:获取锁成功而且之后再有线程来获取共享锁时也可能会成功。所以需要把唤醒操作传播下去。


tryAcquireShared 获取锁失败后(返回负数),就需要入队后自旋获取,也就是执行方法 doAcquireShared。


4.3 doAcquireShared


怎么判断队列中等待节点是在等待共享锁呢?nextWaiter == SHARED,这个参数值是入队新建节点的时候构造函数传入的。


自旋过程中,如果获取锁成功(返回正数),首先把自己设置成新的 head 节点,然后把通知传播下去。如下图:


自旋过程中获取锁成功


之后会唤醒后面节点并保证唤醒操作可以传播下去。但是需要满足四个条件中的一个:


  • tryAcquireShared 返回值大于 0,有多余的锁,可以继续唤醒后继节点;
  • 旧的 head 节点 waitStatus < 0,应该是其他线程释放共享锁过程中把它的状态更新成了 -3;
  • 新的 head 节点 waitStatus < 0,只要不是 tail 节点,就可能是 -1。这里会造成不必要的唤醒,因为唤醒后获取不到锁只能继续入队等待
  • 当前节点的后继节点是空或者非空但正在等待共享锁。


唤醒后面节点的操作,其实就是释放共享锁,对应方法是 doReleaseShared,见释放共享锁一节。


4.4 共享 + 响应中断


对应方法acquireSharedInterruptibly(int arg)。


跟共享忽略中断(acquireShared 方法)不同的是要响应中断,下面两个地方响应中断:


  • 获取锁之前会检查当前线程是否中断。
  • 获取锁失败入队,在队列中自旋获取锁的过程中也会检查当前线程是否中断。


如果检查到当前线程已经中断,则抛出InterruptedException,当前线程退出。


4.5 共享 + 响应中断 + 考虑超时


对应方法tryAcquireSharedNanos(int arg, long nanosTimeout)。


这个方法具备了共享 + 响应中断 + 超时的功能,下面两个个地方要判断是否超时:


  • 自旋获取锁的过程中每次获取锁失败都要判断是否超时;
  • 获取锁失败 park 之前要判断超时时间是否大于自旋的阈值时间 spinForTimeoutThreshold = 1ns。


另外,park 线程的操作使用 parkNanos 传入阻塞时间。


5 释放共享锁


释放共享锁代码如下:


public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}


首先,尝试释放共享锁。tryReleaseShared 代码由子类来实现。释放成功后执行 AQS 中的 doReleaseShared 方法,是一个自旋操作。


自旋的条件是队列中至少有两个节点,这里分三种情况。


情况一


当前节点 waitStatus 是 -1,如下图:


当前节点 waitStatus 是 -1


情况二


当前节点 waitStatus 是 0(被其他线程更新成了中间状态),如下图:


当前节点 waitStatus 是 0


情况三


当前节点 waitStatus 是 -3,为什么会这样呢?


需要解释一下,head 节点唤醒后继节点之前 waitStatus 已经被更新中间态 0 了,唤醒后继节点动作还没有执行,又被其他线程更成了 -3.


也就是其他线程释放锁执行了上面情况二。这时需要先把 waitStatus 再更成 0(在方法 unparkSuccessor),如下图:


先把 waitStatus 再更成 0


6 抽象方法


上面的讲解可以看出,如果要基于 AQS 来实现并发锁,可以根据需求重写下面四个方法来实现,这四个方法在 AQS 中没有具体实现:


  • tryAcquire(int arg):获取独占锁
  • tryRelease(int arg):释放独占锁
  • tryAcquireShared(int arg):获取共享锁
  • tryReleaseShared(int arg):释放共享锁 


AQS 的子类需要重写上面的方法来修改 state 值,并且定义获取锁或者释放锁时 state值的变化。子类也可以定义自己的 state 变量,但只有更新 AQS 中的 state 变量才会对同步起作用。


还有一个判断当前线程是否持有独占锁的方法 isHeldExclusively,也可以供子类重写后使用。


总结


AQS 使用 FIFO 队列实现了一个锁相关的并发器模板,可以基于这个模板来实现各种锁,包括独占锁、共享锁、信号量等。


AQS 中有一个核心状态是 waitStatus,这个代表节点的状态,决定了当前节点的后续操作,比如是否等待唤醒,是否要唤醒后继节点。


- EOF -

推荐阅读  点击标题可跳转

1、Java 并发之 AQS 详解

2、扒一扒 ReentrantLock 以及 AQS 实现原理

3、JAVA REENTRANTLOCK、SEMAPHORE 的实现与 AQS 框架


看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

点赞和在看就是最大的支持❤️

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存