再探 AQS


七月的第二周,来学习 AQS,上次学习 AQS 是在四月份,这个月我们把它啃掉。


复习一下 AQS 的基础,只简单地粘上来一些代码,具体的原理与设计可以回看四月份写的《从 lock 到 AQS 再到 ReentrantLock》。

AQS 中很重要的是数据结构和状态的设计,这一部分会反复地回来看,直到背过:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

// AQS 阻塞队列头
private transient volatile Node head;

// AQS 阻塞队列尾
private transient volatile Node tail;

// AQS 阻塞队列状态
// 0:无节点 1:有节点持锁 >1:有节点持重入锁
private volatile int state;

// AQS 阻塞队持有锁的线程
private transient Thread exclusiveOwnerThread;

/**
* AQS 的组成元素:节点
*/
static final class Node {
// 以下是节点的四种状态:
// 1(取消)、-1(可以唤醒后继节点)、-2(在 condition 队列中)、4(暂时略过)
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 节点状态
volatile int waitStatus;
// 在 AQS 阻塞队列中的前置节点
volatile Node prev;
// 在 AQS 阻塞队列中的后继节点
volatile Node next;
// 本节点本尊:线程
volatile Thread thread;
// 在 condition 条件队列中的后一个节点
Node nextWaiter;
}
}

接下来可以通过 ReentrantLock 来看 AQS 的基本框架了。


ReentrantLock 公平锁的上锁过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
/* —————————————————————— 前戏 —————————————————————— */

public static void main(String[] args) {
// 申请一个公平锁
ReentrantLock lock = new ReentrantLock(true);
lock.lock();
try {
// ...
} finally {
lock.unlock();
}
}

/**
* ReentrantLock lock
*/
public void lock() {
sync.lock();
}

/**
* FairSync lock
*/
final void lock() {
acquire(1);
}

/* ———————————————————————————————————————————————— */








/* —————————————————————— 申请锁的入口 —————————————————————— */

/**
* AQS acquire
*/
public final void acquire(int arg) {
// 试着申请一下锁 没成功,加入队列
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 本线程阻塞
selfInterrupt();
}

/* ———————————————————————————————————————————————————————— */








/* —————————————————————— tryAcquire —————————————————————— */

/**
* FairSync 尝试获取锁
* true:
* - 1.AQS队列空,自己请求到了锁
* - 2.锁重入
*/
protected final boolean tryAcquire(int acquires) {
// 获取【当前线程】和【AQS队列的状态】
final Thread current = Thread.currentThread();
int c = getState();
// state==0 -> 没有线程持有锁
if (c == 0) {
// 在AQS没有节点在等待的前提下(公平),尝试修改state,试图占有AQS
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
// 设置AQS锁的线程是自己
setExclusiveOwnerThread(current);
return true;
}
}
// state!=0 且 当前线程就是AQS持有锁的线程 -> 锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 锁重入太多次,溢出了
if (nextc < 0) throw new Error("Maximum lock count exceeded");
// 锁重入次数 +1
setState(nextc);
return true;
}
return false;
}

/**
* AQS 判断队列中是否有节点在等待
* true:
* - 有头结点持锁,且队列中有节点在等,且队列最前面的节点不是自己
*/
public final boolean hasQueuedPredecessors() {
// 获取 AQS 的 head 和 tail,并自定义一个节点
Node h = head, t = tail, s;
// 头尾不同,且队列中有节点,而且队列中的最前面的那个节点不是自己
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

/* ———————————————————————————————————————————————————— */








/* —————————————————————— addWaiter —————————————————————— */

/**
* AQS 加入阻塞队列
*/
private Node addWaiter(Node mode) {
// 将当前线程封装成 AQS 的 Node,此时的 mode 是独占模式(Node.EXCLUSIVE)
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果 tail 不为空,尝试把自己设成新的 tail
if (pred != null) {
node.prev = pred;
// CAS 将自己设为新的 tail,如果成功了,将旧的tail的后继节点设为自己
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果tail为null,或者CAS将自己设为新tail失败了
enq(node);
return node;
}

/**
* AQS Node 构造方法其一
*/
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

/**
* AQS 更新尾结点(CAS)
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

/**
* AQS 常量 tailOffset
* 由下面的AQS静态代码块可以看出,tailOffset是 tail 属性在 AQS 对象中的内存偏移量
* 如果这个偏移量发生了改变,就说明 tail 发生了改变
*/
private static final long tailOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
// tailOffset看这里,这是查资料发现,是取内存偏移量
tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}

/**
* AQS 加入AQS队列(尾插)
*/
private Node enq(final Node node) {
// 采用CAS加入队列,不加进去不罢休
for (;;) {
Node t = tail;
// 进入此方法有两种可能:tail为null(即AQS队列空) 或 曾经试过一次CAS设新tail
// tail为空 -> AQS队列为空 -> 试着抢head
if (t == null) {
if (compareAndSetHead(new Node()))
// AQS中只有一个节点时,头就是尾
tail = head;
}
// tail不为空 -> AQS队列不为空 -> 试着把自己设为新tail
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

/* —————————————————————————————————————————————————————— */









/* ———————————————————— acquireQueued ———————————————————— */

/**
* AQS 判断是否需要阻塞线程
* 所有没抢到线程的,全都汇聚于此
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取node的前驱节点(如果没有,空指针异常)
final Node p = node.predecessor();
// 如果前驱节点是head,试着获取一次AQS锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
// 在此返回false,外层方法就会直接返回,不会阻塞当前线程
return interrupted;
}
// 前面这个方法:判断是否应该挂起线程 后面这个方法:挂起线程(唤醒也会在这里起来)
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

/**
* AQS 判断是否应当挂起线程(在没有抢到head的情况下)
* 第一个参数是前驱节点,第二个参数是自己
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的state
int ws = pred.waitStatus;
// 如果前驱节点的state是 -1,说明本节点之后会被唤醒,可以等了,直接返回true
if (ws == Node.SIGNAL)
return true;
// 如果前驱节点的state>0,说明前驱节点放弃了,使劲往前找,直到找到一个没有放弃的前驱结点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这里很有趣
// 我们从来没有为state赋值,因此前驱节点的state是0,我们为它设置为-1,让它好了后叫我们
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不应该挂起线程,出去,在外层方法继续CAS获取头节点
return false;
}

/**
* AQS 挂起线程
*/
private final boolean parkAndCheckInterrupt() {
// 挂起线程
LockSupport.park(this);
// 返回线程是否阻塞
return Thread.interrupted();
}

/* —————————————————————————————————————————————————————— */

ReentrantLock 公平锁的解锁过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/* —————————————————————— 前戏 —————————————————————— */

public static void main(String[] args) {
// 申请一个公平锁
ReentrantLock lock = new ReentrantLock(true);
lock.lock();
try {
// ...
} finally {
lock.unlock();
}
}

/**
* ReentrantLock unlock
*/
public void unlock() {
sync.release(1);
}

/* ———————————————————————————————————————————————— */








/* —————————————————————— release —————————————————————— */

/**
* AQS release 解锁
*/
public final boolean release(int arg) {
// 尝试解锁
if (tryRelease(arg)) {
// 解锁成功
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤起后继节点
unparkSuccessor(h);
return true;
}
return false;
}

/* ———————————————————————————————————————————————— */









/* —————————————————————— tryRelease —————————————————————— */

/**
* sync tryRelease 尝试解锁
*/
protected final boolean tryRelease(int releases) {
// AQS state -1 即解锁后的 AQS state
int c = getState() - releases;
// 非当前线程不能解锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 解锁后 state 是 0 则解锁成功,否则重入锁-1
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

/* ———————————————————————————————————————————————— */








/* —————————————————— unparkSuccessor —————————————————— */

/**
* AQS 解锁成功后(非重入),唤起后继节点
*/
private void unparkSuccessor(Node node) {
// node的waitStatus,>0 取消等待,0没有后继,<0在ReentrantLock中代表需要唤醒后继
int ws = node.waitStatus;
if (ws < 0)
// 需要唤醒后继节点时,将本节点状态设为0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 后继节点如果不存在,或者存在但取消等待了
if (s == null || s.waitStatus > 0) {
s = null;
// 如果后继节点取消等待,那么从尾往前找,一路找到排在最前面正在等待的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 此时的节点,不是null,就是排在最前的非取消等待的节点
if (s != null)
LockSupport.unpark(s.thread);
}

/* ———————————————————————————————————————————————— */

道格李老爷子的代码,看起来确实有点吃力,我自己测试起了两个线程,debug 多遍,整理出下面这张图:

AQS_ReentrantLock

这里只考虑了最简单的情况,没考虑抢锁、取消锁等情况(不过我觉得总体理解上还可以)。测试了多遍之后,觉得 AQS 类中的 acquireQueued() 方法很有趣,设计上非常精巧,循环运用得非常道格李。

下周我们来看 Condition 等类,争取下周把 AQS 框架看完。