AQS简介
AbstractQueuedSynchronizer类简称为(AQS),它是实现同步器的基本组件,内部使用int类型来表示同步状态,并提供CAS方法来操作这个同步状态。如常用的ReentrantLock/Semaphore/CountDownLatch等等就是基于AQS实现的,用法是通过继承AQS实现其模版方法,然后将子类作为同步组件的内部类。
上图是该类的属性,AQS是一个FIFO的双向队列,内部通过head和tail记录头部和尾部节点,队列元素为Node。当一个线程试图获得锁时,如果该锁已被占用就会新创一个Node节点把他插入到尾部。头部节点是成功获得锁的节点,当头部节点被释放的时候,会唤醒后面的节点。
在AQS中最重要的就是对状态值state的操作,操作state的方式分为两种。
独占(Exclusive),例如ReentrantLock就是独占锁的一种。
共享(share),例如Semaphore/CountDownLatch。
使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源, 就会标记是这个线程获取到了,其他线程再尝试操作 state 获取资源时会发现当前该资源 不是自己持有的,就会在获取失败后被阻塞。 比如独占锁 ReentrantLock 的实现, 当一个 线程获取了 Reer rantLock 的锁后,在 AQS 内 部会首先使用 CAS 操作把 state 状态值从 0 变为 1 ,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持 有者,则会把状态值从 l 变为 2,也就是设置可重入次数,而当另外一个线程获取锁时发 现自己并不是该锁的持有者就会被放入 AQS 阻塞队列后挂起。
对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过 CAS 方 式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源 还能满足它的需要,则当前线程只需要使用 CAS 方式进行获取即可。 比如 Semaphore 信 号量, 当一个线程通过 acquire() 方法获取信号量时,会首先看当前信号量个数是否满足需 要, 不满足则把当前线程放入阻塞队列,如果满足则通过 自旋 CAS 获取信号量。
代写。。源码看晕了
源码分析
acquire(int arg)独占锁方法
1 | public final void acquire(int arg) { |
当一个线程尝试获得独占锁时,会先尝试获得锁,如果失败就将当前线程封装成Exclusive类型的Node插入AQS堵塞队列尾部,并将自己堵塞挂起。
tryAcquire(int arg)
1 | protected boolean tryAcquire(int arg) { |
tryAcquire()需要具体子类去实现的,根据所需场景设置state。
addWaiter(Node mode)
1 | private Node addWaiter(Node mode) { |
addWaiter()方法将当前线程封装成新节点插入队列尾部,并放回。
enq(final Node node)
1 | private Node enq(final Node node) { |
enq()方法确保一定能将node插入队列尾部
acquireQueued(final Node node, int arg)
1 | final boolean acquireQueued(final Node node, int arg) { |
在看shouldParkAfterFailedAcquire()方法前,先要了解withStatus变量。withStatus用来记录线程的等待状态可以为。
- CANCELLED=1:表示线程因为中断或者等待超时,需要从等待队列中取消等待;
- SIGNAL=-1:当前线程thread1占有锁,队列中的head(仅仅代表头结点,里面没有存放线程引用)的后继结点node1处于等待状态,如果已占有锁的线程thread1释放锁或被CANCEL之后就会通知这个结点node1去获取锁执行。
- 表示结点在等待队列中(这里指的是等待在某个lock的condition上,关于Condition的原理下面会写到),当持有锁的线程调用了Condition的signal()方法之后,结点会从该condition的等待队列转移到该lock的同步队列上,去竞争lock。(注意:这里的同步队列就是我们说的AQS维护的FIFO队列,等待队列则是每个condition关联的队列)
- 表示下一次共享状态获取将会传递给后继结点获取这个共享同步状态。
引用:[https://www.cnblogs.com/fsmly/p/11274572.html]shouldParkAfterFailedAcquire(Node pred, Node node)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//获得前驱节点的状态
if (ws == Node.SIGNAL)//判断状态是否为SIGNAL如果是就可以休眠了
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
//如果前端节点状态为CANCELLED,即代表放弃等待了,这时候node需要找一个在等待状态的节点,并把它做为自己的前驱节点
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//尝试设置前驱节点值为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}shouldParkAfterFailedAcquire()方法就是设置自己的前驱节点为SIGNAL状态,这样自己才能去堵塞休眠。。
parkAndCheckInterrupt()
1 | private final boolean parkAndCheckInterrupt() { |
整体流程图如下
流程图片来源[https://www.cnblogs.com/zyrblog/p/9866140.html]
release(int arg)独占锁释放方法
1 | public final boolean release(int arg) { |
tryRelease(int arg)
1 | protected boolean tryRelease(int arg) { |
这里一样需要具体子类去实现
unparkSuccessor(Node node)
1 | private void unparkSuccessor(Node node) { |
acquireShared(int arg)共享锁方法
1 | public final void acquireShared(int arg) { |
tryAcquireShared(int arg)
1 | protected int tryAcquireShared(int arg) { |
这里的tryAcquireShared(int arg)方法一样需要具体子类去实现,根据所需场景自定义状态设置,和结果返回。
doAcquireShared(int arg)
1 | private void doAcquireShared(int arg) { |
setHeadAndPropagate(Node node, int propagate)
1 | private void setHeadAndPropagate(Node node, int propagate) { |
releaseShared(int arg)共享锁释放
1 | public final boolean releaseShared(int arg) { |
tryReleaseShared(int arg)
1 | protected boolean tryReleaseShared(int arg) { |
一样由具体子类重写
doReleaseShared()
1 | private void doReleaseShared() { |