Java 并发之 AQS
Java 并发之 AQS
AQS 简介
AQS 是 AbstractQueuedSynchronizer
的缩写,即 队列同步器,顾名思义,其主要作用是处理同步。它是并发锁和很多同步工具类的实现基石(如 ReentrantLock
、ReentrantReadWriteLock
、CountDownLatch
、Semaphore
、FutureTask
等)。
**AQS 提供了对锁和同步器的通用能力支持 **。在 java.util.concurrent.locks
包中的相关锁(常用的有 ReentrantLock
、 ThreadPoolExecutor
)都是基于 AQS 来实现。这些锁都没有直接继承 AQS,而是定义了一个 Sync
类去继承 AQS。为什么要这样呢?因为锁面向的是使用用户,而同步器面向的则是线程控制,那么在锁的实现中聚合同步器而不是直接继承 AQS 就可以很好的隔离二者所关注的事情。
AQS 的应用
AQS 定义两种资源共享方式:Exclusive
(独占,只有一个线程能执行,如 ReentrantLock
)和 Share
(共享,多个线程可同时执行,如 Semaphore
/ CountDownLatch
)。
独占锁 API
获取、释放独占锁的主要 API 如下:
1 | public final void acquire(int arg) |
acquire
- 获取独占锁。acquireInterruptibly
- 获取可中断的独占锁。tryAcquireNanos
- 尝试在指定时间内获取可中断的独占锁。在以下三种情况下回返回:- 在超时时间内,当前线程成功获取了锁;
- 当前线程在超时时间内被中断;
- 超时时间结束,仍未获得锁返回 false。
release
- 释放独占锁。
共享锁 API
获取、释放共享锁的主要 API 如下:
1 | public final void acquireShared(int arg) |
acquireShared
- 获取共享锁。acquireSharedInterruptibly
- 获取可中断的共享锁。tryAcquireSharedNanos
- 尝试在指定时间内获取可中断的共享锁。release
- 释放共享锁。
AQS 的原理
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态;如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。这个机制是基于 CLH 锁 (Craig, Landin, and Hagersten locks) 的变体实现的,将暂时获取不到锁的线程加入到队列中。
CLH 本是一个单向队列,AQS 中的队列采用了 CLH 的变体,是一个虚拟的 FIFO 双向队列(虚拟的双向队列,是指不存在结点实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。
AQS 的核心原理图:
AQS 的数据结构
先看一下 AbstractQueuedSynchronizer
的定义:
1 | public abstract class AbstractQueuedSynchronizer |
阅读 AQS 的源码,可以发现:AQS 继承自 AbstractOwnableSynchronize
,它有以下核心属性:
state
- AQS 使用一个整型的volatile
变量来 维护同步状态。这个整数状态的意义由子类来赋予,如ReentrantLock
中该状态值表示所有者线程已经重复获取该锁的次数;Semaphore
中该状态值表示剩余的许可数量。head
和tail
- AQS **维护了一个Node
类型(AQS 的内部类)的双向队列来完成同步状态的管理 **。这个双向队列是一个双向的 FIFO 队列,通过head
和tail
指针进行访问。当 **有线程获取锁失败后,就被添加到队列末尾 **。
再来看一下 Node
的源码,很显然,Node 是一个双向队列结构:
1 | static final class Node { |
属性说明:
方法和属性值 | 含义 |
---|---|
waitStatus | 当前节点在队列中的状态 |
thread | 表示处于该节点的线程 |
prev | 前驱指针 |
next | 后继指针 |
waitStatus
是一个整型的 volatile
变量,用来维护 AQS 同步队列中线程节点的状态。waitStatus
有五个状态值:
- 0 - 一个 Node 被初始化的时候的默认值
CANCELLED(1)
- 表示线程获取锁的请求已经取消了SIGNAL(-1)
- 表示线程已经准备好了,就等资源释放了CONDITION(-2)
- 表示节点在等待队列中,节点线程等待唤醒PROPAGATE(-3)
- 当前线程处在 SHARED 情况下,该字段才会使用
独占锁的获取和释放
获取独占锁
AQS 中使用 acquire(int arg)
方法获取独占锁的相关源码如下:
1 | public final void acquire(int arg) { |
其大致流程如下:
- 先通过
tryAcquire
尝试获取同步状态,如果获取同步状态成功,则结束方法,直接返回。 - 若不成功,调用
addWaiter
方法,利用 CAS 操作将当前线程加入等待队列队尾。 - 接着,自旋尝试为等待队列中的线程节点获取独占锁,直到获取成功或线程中断。
释放独占锁
AQS 中使用 acquire(int arg)
方法获取独占锁的相关源码如下:
1 | public final boolean release(int arg) { |
- 先尝试获取解锁线程的同步状态,如果获取同步状态不成功,则结束方法,直接返回。
- 如果获取同步状态成功且队列不为空,AQS 会尝试唤醒下一个节点中的线程。
获取可中断的独占锁
AQS 中使用 acquireInterruptibly(int arg)
方法获取可中断的独占锁。
acquireInterruptibly(int arg)
实现方式 相较于获取独占锁方法( acquire
)非常相似,区别仅在于它会 通过 Thread.interrupted
检测当前线程是否被中断,如果是,则立即抛出中断异常(InterruptedException
)。
限时获取独占锁
AQS 中使用 tryAcquireNanos(int arg)
方法获取超时等待的独占锁。
doAcquireNanos 的实现方式 相较于获取独占锁方法( acquire
)非常相似,区别在于它会根据超时时间和当前时间计算出截止时间。在获取锁的流程中,会不断判断是否超时,如果超时,直接返回 false;如果没超时,则用 LockSupport.parkNanos
来阻塞当前线程。
共享锁的获取和释放
获取共享锁
AQS 中使用 acquireShared(int arg)
方法获取共享锁。
acquireShared
方法和 acquire
方法的逻辑很相似,区别仅在于自旋的条件以及节点出队的操作有所不同。
成功获得共享锁的条件如下:
tryAcquireShared(arg)
返回值大于等于 0 (这意味着共享锁的 permit 还没有用完)。- 当前节点的前驱节点是头结点。
释放共享锁
AQS 中使用 releaseShared(int arg)
方法释放共享锁。
releaseShared
首先会尝试释放同步状态,如果成功,则解锁一个或多个后继线程节点。释放共享锁和释放独占锁流程大体相似,区别在于:
对于独占模式,如果需要 SIGNAL,释放仅相当于调用头节点的 unparkSuccessor
。
获取可中断的共享锁
AQS 中使用 acquireSharedInterruptibly(int arg)
方法获取可中断的共享锁。
acquireSharedInterruptibly
方法与 acquireInterruptibly
几乎一致,不再赘述。
限时获取共享锁
AQS 中使用 tryAcquireSharedNanos(int arg)
方法获取超时等待式的共享锁。
tryAcquireSharedNanos
方法与 tryAcquireNanos
几乎一致,不再赘述。
自定义同步器
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
- 使用者继承
AbstractQueuedSynchronizer
并重写指定的方法。 - 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:
1 | // 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。 |
什么是钩子方法呢? 钩子方法是一种被声明在抽象类中的方法,一般使用 protected
关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。