抱歉,您的瀏覽器無法訪問本站
本頁面需要瀏覽器支持(啟用)JavaScript
了解詳情 >

AQS简介

AbstractQueuedSynchronizer类简称为(AQS),它是实现同步器的基本组件,内部使用int类型来表示同步状态,并提供CAS方法来操作这个同步状态。如常用的ReentrantLock/Semaphore/CountDownLatch等等就是基于AQS实现的,用法是通过继承AQS实现其模版方法,然后将子类作为同步组件的内部类。

upload successful

上图是该类的属性,AQS是一个FIFO的双向队列,内部通过head和tail记录头部和尾部节点,队列元素为Node。当一个线程试图获得锁时,如果该锁已被占用就会新创一个Node节点把他插入到尾部。头部节点是成功获得锁的节点,当头部节点被释放的时候,会唤醒后面的节点。

upload successful

在AQS中最重要的就是对状态值state的操作,操作state的方式分为两种。

  1. 独占(Exclusive),例如ReentrantLock就是独占锁的一种。

  2. 共享(share),例如Semaphore/CountDownLatch。

    使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源, 就会标记是这个线程获取到了,其他线程再尝试操作 state 获取资源时会发现当前该资源 不是自己持有的,就会在获取失败后被阻塞。 比如独占锁 ReentrantLock 的实现, 当一个 线程获取了 Reer rantLock 的锁后,在 AQS 内 部会首先使用 CAS 操作把 state 状态值从 0 变为 1 ,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持 有者,则会把状态值从 l 变为 2,也就是设置可重入次数,而当另外一个线程获取锁时发 现自己并不是该锁的持有者就会被放入 AQS 阻塞队列后挂起。

对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过 CAS 方 式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源 还能满足它的需要,则当前线程只需要使用 CAS 方式进行获取即可。 比如 Semaphore 信 号量, 当一个线程通过 acquire() 方法获取信号量时,会首先看当前信号量个数是否满足需 要, 不满足则把当前线程放入阻塞队列,如果满足则通过 自旋 CAS 获取信号量。

代写。。源码看晕了

源码分析

acquire(int arg)独占锁方法

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

当一个线程尝试获得独占锁时,会先尝试获得锁,如果失败就将当前线程封装成Exclusive类型的Node插入AQS堵塞队列尾部,并将自己堵塞挂起。

tryAcquire(int arg)
1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

tryAcquire()需要具体子类去实现的,根据所需场景设置state。

addWaiter(Node mode)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node addWaiter(Node mode) {
//以当前节点和给定模式创建一个新节点
//模式有:EXCLUSIVE(独占),SHARE(共享)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;//获得尾部节点
//判断尾部节点是否为空
if (pred != null) {
node.prev = pred;
//设置为尾部节点为新建节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果上面失败了就进行究极自旋保证一定插入队列尾部
enq(node);
return node;
}

addWaiter()方法将当前线程封装成新节点插入队列尾部,并放回。

enq(final Node node)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
//通过CAS自旋直到node成功插入队列尾部
for (;;) {
Node t = tail;//获得尾部节点
//队列为空,设置首部尾部指向新建节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//正常的流程
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

enq()方法确保一定能将node插入队列尾部

acquireQueued(final Node node, int arg)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标志是否拿到资源
try {
boolean interrupted = false;//标志等待时是否被中断过
for (;;) {
final Node p = node.predecessor();获得前驱节点
//如果前驱节点为头部节点,则可以尝试获得一下锁啦
if (p == head && tryAcquire(arg)) {
setHead(node);//设置当前节点为头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire判断自己是否可以休息
//parkAndCheckInterrupt让自己休眠
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

在看shouldParkAfterFailedAcquire()方法前,先要了解withStatus变量。withStatus用来记录线程的等待状态可以为。

  1. CANCELLED=1:表示线程因为中断或者等待超时,需要从等待队列中取消等待;
  2. SIGNAL=-1:当前线程thread1占有锁,队列中的head(仅仅代表头结点,里面没有存放线程引用)的后继结点node1处于等待状态,如果已占有锁的线程thread1释放锁或被CANCEL之后就会通知这个结点node1去获取锁执行。
  3. 表示结点在等待队列中(这里指的是等待在某个lock的condition上,关于Condition的原理下面会写到),当持有锁的线程调用了Condition的signal()方法之后,结点会从该condition的等待队列转移到该lock的同步队列上,去竞争lock。(注意:这里的同步队列就是我们说的AQS维护的FIFO队列,等待队列则是每个condition关联的队列)
  4. 表示下一次共享状态获取将会传递给后继结点获取这个共享同步状态。
    引用:[https://www.cnblogs.com/fsmly/p/11274572.html]
    shouldParkAfterFailedAcquire(Node pred, Node node)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//获得前驱节点的状态
    if (ws == Node.SIGNAL)//判断状态是否为SIGNAL如果是就可以休眠了
    /*
    * This node has already set status asking a release
    * to signal it, so it can safely park.
    */
    return true;
    if (ws > 0) {
    //如果前端节点状态为CANCELLED,即代表放弃等待了,这时候node需要找一个在等待状态的节点,并把它做为自己的前驱节点
    /*
    * Predecessor was cancelled. Skip over predecessors and
    * indicate retry.
    */
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    //尝试设置前驱节点值为SIGNAL
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }

    shouldParkAfterFailedAcquire()方法就是设置自己的前驱节点为SIGNAL状态,这样自己才能去堵塞休眠。。

parkAndCheckInterrupt()
1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//将自己挂起
return Thread.interrupted();检测自己是否被中断过
}

整体流程图如下
流程图片来源[https://www.cnblogs.com/zyrblog/p/9866140.html]
upload successful

release(int arg)独占锁释放方法

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
//尝试释放锁,这里一般都会成功因为独占,一般都是拿到锁的才去释放。。
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列的下一个节点
return true;
}
return false;
}
tryRelease(int arg)
1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

这里一样需要具体子类去实现

unparkSuccessor(Node node)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//尝试将当前节点设为0

Node s = node.next;获得要唤醒的下一个节点
if (s == null || s.waitStatus > 0) {//如果为空或取消
s = null;
//遍历找出有效节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒
LockSupport.unpark(s.thread);
}

upload successful

acquireShared(int arg)共享锁方法

1
2
3
4
5
public final void acquireShared(int arg) {
//尝试获得锁
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared(int arg)
1
2
3
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

这里的tryAcquireShared(int arg)方法一样需要具体子类去实现,根据所需场景自定义状态设置,和结果返回。

doAcquireShared(int arg)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//这里一样通过究极自旋操作保证把新建节点插入队列尾部
boolean failed = true;
try {
boolean interrupted = false;//中断标志
for (;;) {
final Node p = node.predecessor();//获得前驱节点
//如果head为node的前驱节点,那很有可能就是被head释放锁唤醒的
if (p == head) {
int r = tryAcquireShared(arg);//尝试获得锁
if (r >= 0) {
setHeadAndPropagate(node, r);//设置头部和尝试唤醒其他线程
p.next = null; // help GC
if (interrupted)//如果被中断过现在中断自己
selfInterrupt();
failed = false;
return;
}
}
//跟独占一样,寻找安全点休眠并尝试获得锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate(Node node, int propagate)
1
2
3
4
5
6
7
8
9
10
11
12
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //获得头部
setHead(node);//设置头部
//
//唤醒后续堵塞节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

releaseShared(int arg)共享锁释放

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(int arg)
1
2
3
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

一样由具体子类重写

doReleaseShared()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}