七月的第二周,来学习 AQS,上次学习 AQS 是在四月份,这个月我们把它啃掉。
复习一下 AQS 的基础,只简单地粘上来一些代码,具体的原理与设计可以回看四月份写的《从 lock 到 AQS 再到 ReentrantLock》。
AQS 中很重要的是数据结构和状态的设计,这一部分会反复地回来看,直到背过:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io .Serializable { private transient volatile Node head; private transient volatile Node tail; private volatile int state; private transient Thread exclusiveOwnerThread; static final class Node { static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; } }
接下来可以通过 ReentrantLock 来看 AQS 的基本框架了。
ReentrantLock 公平锁的上锁过程:
public static void main (String[] args) { ReentrantLock lock = new ReentrantLock(true ); lock.lock(); try { } finally { lock.unlock(); } } public void lock () { sync.lock(); } final void lock () { acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } public final boolean hasQueuedPredecessors () { Node h = head, t = tail, s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } private final boolean compareAndSetTail (Node expect, Node update) { return unsafe.compareAndSwapObject(this , tailOffset, expect, update); } private static final long tailOffset;static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state" )); headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head" )); tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail" )); waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus" )); nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next" )); } catch (Exception ex) { throw new Error(ex); } } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
ReentrantLock 公平锁的解锁过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 public static void main (String[] args) { ReentrantLock lock = new ReentrantLock(true ); lock.lock(); try { } finally { lock.unlock(); } } public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
道格李老爷子的代码,看起来确实有点吃力,我自己测试起了两个线程,debug 多遍,整理出下面这张图:
这里只考虑了最简单的情况,没考虑抢锁、取消锁等情况(不过我觉得总体理解上还可以)。测试了多遍之后,觉得 AQS 类中的 acquireQueued() 方法很有趣,设计上非常精巧,循环运用得非常道格李。
下周我们来看 Condition 等类,争取下周把 AQS 框架看完。