这周我们啃完 AQS。
Condition Condition 类可以看做 Java 重新实现了一遍 object.wait()
和 object.notify()
,使用起来两者的思路是一致的,都是在执行 wait
/await
或者 notify
/signal
方法前需要获取锁,调用 wait
/await
会将锁打开,并将线程包装存起来,调用 notify
/signal
会将线程唤起,一直等到它获取到锁,然后继续运行。
看 Condition 类的源码能够更加理解这套使用思路的原因。
下面是 Condition 的代码,首先是 await()
方法(只分析了无参方法)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void unlinkCancelledWaiters () { Node t = firstWaiter; Node trail = null ; while (t != null ) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null ; if (trail == null ) firstWaiter = next; else trail.nextWaiter = next; if (next == null ) lastWaiter = trail; } else trail = t; t = next; } } final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); } final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null ) return false ; if (node.next != null ) return true ; return findNodeFromTail(node); } private boolean findNodeFromTail (Node node) { Node t = tail; for (;;) { if (t == node) return true ; if (t == null ) return false ; t = t.prev; } } private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; } final boolean transferAfterCancelledWait (Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0 )) { enq(node); return true ; } while (!isOnSyncQueue(node)) Thread.yield(); return false ; } final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null ) return false ; if (node.next != null ) return true ; return findNodeFromTail(node); } private boolean findNodeFromTail (Node node) { Node t = tail; for (;;) { if (t == node) return true ; if (t == null ) return false ; t = t.prev; } } private void reportInterruptAfterWait (int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } static void selfInterrupt () { Thread.currentThread().interrupt(); }
然后贴一下 signal()
方法的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignal(first); } protected final boolean isHeldExclusively () { return getExclusiveOwnerThread() == Thread.currentThread(); } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
过程还是很繁琐的,尤其是 await()
方法,看得我要吐了。
跟上一篇文章一样,本篇也整理了流程图,下图是最简单情况下的 condition 节点变化图(不考虑中断、锁的争抢、且锁是公平锁):
顺便附带一下测试上图的代码吧,一共起了四个线程,第一个 await()
,第二个 signal()
,后面两个打酱油,模拟 AQS 阻塞队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public static void main (String[] args) { ReentrantLock lock = new ReentrantLock(true ); Condition condition = lock.newCondition(); new Thread(() -> { lock.lock(); try { condition.await(); } catch (InterruptedException e) { System.out.println("condition唤醒" ); } finally { lock.unlock(); } }).start(); new Thread(() -> { lock.lock(); try { condition.signal(); } finally { lock.unlock(); } }).start(); for (int i = 0 ; i < 2 ; i++) { new Thread(() -> { lock.lock(); try { } finally { lock.unlock(); } }).start(); } }
CountDownLatch CountDownLatch 的意思是 带有倒数功能的栅栏
,有两个核心方法:await()
和 countDown()
,前者是等待,后者是倒数。
举一个使用的例子:首先创建一个栅栏,然后一堆线程执行了 await()
方法,被堵在了栅栏门口,被迫等待。这个栅栏开始时有一个数字 N,每执行一次 countDown()
方法(任何线程任何时候都可以),栅栏的数字 N 就会减一,一直到 N 被减为 0,栅栏口打开,所有的线程继续执行。
画了一张图,描述了一下 CountDownLatch 的使用场景:
CountDownLatch 的源码相对简单很多:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException("count < 0" ); this .sync = new Sync(count); } private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } } protected final void setState (int newState) { state = newState; } public void countDown () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c - 1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } private void setHead (Node node) { head = node; node.thread = null ; node.prev = null ; }
CountDownLatch 的原理是,依旧维护一个 sync 队列(AQS 队列),在 countDown 到 0 之前,所有 await()
的线程都包装成节点,按顺序阻塞在 sync 队列中。当 CountDownLatch 内部的计数器 count 减到 0 时,唤醒头结点(一个空节点),让空节点唤醒后继节点,后继节点醒来再唤醒它的后继,链式地叫醒所有。
画一张简图,描述一下内部队列在阻塞时的数据结构(就不画流程图了):
CyclicBarrier CyclicBarrier 字面上的意思是周期性的栅栏,好像跟 CountDownLatch 相像,但其实还是不太一样的。
CyclicBarrier 的作用是凑齐一波线程就放行一波,否则线程们都等待。比如初始化的时候,设置 CyclicBarrier 为每 10 个线程一组,那么前 9 个线程出现时,都会挡在 CyclicBarrier 的栅栏前,被迫等待,直到第 10 个线程出现,10 个线程一起放行。之后重复这个行为,循环往复。
CountDownLatch 是外部改变栅栏的计数,countDown 一次,栅栏计数减 1,直到减到 0 栅栏打开。而对于 CyclicBarrier,被挡在栅栏外的线程数量是决定因素,凑够线程数,栅栏打开。
CyclicBarrier 的源码就相对简单很多了,就是对 Condition 的简单使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 private final ReentrantLock lock = new ReentrantLock();private final Condition trip = lock.newCondition();private final int parties;private int count;private final Runnable barrierCommand;private Generation generation = new Generation();private static class Generation { boolean broken = false ; } public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException(); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; } public int await () throws InterruptedException, BrokenBarrierException { try { return dowait(false , 0L ); } catch (TimeoutException toe) { throw new Error(toe); } } private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } private void nextGeneration () { trip.signalAll(); count = parties; generation = new Generation(); } public void reset () { final ReentrantLock lock = this .lock; lock.lock(); try { breakBarrier(); nextGeneration(); } finally { lock.unlock(); } } private void breakBarrier () { generation.broken = true ; count = parties; trip.signalAll(); }
不细说了,简单到我上我也行(膨胀了膨胀了)。
Semaphore Semaphore 直译是信号,这是个不太容易理解的名字,实际上它的作用更像是一个资源池。例如创建一个容量为 10 的 Semaphore,如果有超过 10 个线程过来,那么就只能阻塞,一直等到有之前的线程释放资源,让容量空出来一个,才可以再运行。
再回来看 Semaphore 的意思:信号。它指的是,给线程一个信号,告知线程资源数量是否足够,你是否可以运行,如果已经满员了,那么你只能在我内部的阻塞队列里排队等候。
画一张简图描述一下这个类的作用:
Semaphore 类主要使用两个方法:acquire()
和 release()
,前者是申请资源,后者是归还资源。但 release()
方法有点奇怪,即使并没有申请过资源也是可以调用的,而且它会无节制地使资源数 +1,甚至超过最初设置的资源数量。例如下面这四行代码:
1 2 3 4 Semaphore semaphore = new Semaphore(10 ); semaphore.release(); semaphore.release(); semaphore.release();
初始化一个 Semaphore,容量设为 10,但是在 release()
了 3 次之后,容量变成了 13,可以同时有 13 个线程申请到锁然后运行。
这个类的代码跟 Condition 类有点类似,总体上讲还是比较容易的,直接看代码就好。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 private final Sync sync;abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); } } static final class NonfairSync extends Sync { } static final class FairSync extends Sync { } public Semaphore (int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } public Semaphore (int permits) { sync = new NonfairSync(permits); } public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1 ; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } public void release () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
代码学习得很顺利,基本熟知了 JUC 框架的各种实现,但是不太理解为什么要这么设计,沉淀一段时间再回来想吧。
下周学习线程池。