从 lock 到 AQS 再到 ReentrantLock
四月的第一周(实际上学习了两周有余),来学习另一种并发锁 Lock。
自 JDK 5 起,Java 类库中新提供了 java.util.concurrent 包(通常简称为 JUC 包),本周要学习的 Lock 接口,尤其是 ReentrantLock 类均出自该包当中。
Java 中有两种对并发资源加锁的方式,除了上次写过的 synchronized 之外,还有本次要学习的 Lock。synchronized 是 JVM 通过底层实现的,是物理攻击,而 Lock 是通过 JDK 纯粹在软件层面上实现的,是魔法伤害,这周来学习魔法伤害。
Lock 类本身是一个接口,对锁进行了规范,Lock 接口的定义如下:
1 | public interface Lock { |
Lock 接口一共规范给定了 6 个方法。
其中最为常用的,是 lock()
方法和 unlock()
方法,这两个方法必须成对出现,否则就有可能出现异常,使用逻辑如下:
1 | // 假如已经创建了一个lock对象 |
这里使用 lock 上锁,与使用 synchronized 上锁的效果是相同的,但在使用上从大括号代码块变为 try 代码块,并且一定要使用 finally 语句为 lock 对象解锁。
阿里巴巴的 Java 代码规约中指出:
锁【lock.lock】必须紧跟try代码块,且unlock要放到finally第一行。
……
说明一:如果在lock方法与try代码块之间的方法调用抛出异常,那么无法解锁,造成其它线程无法成功获取锁。
说明二:如果lock方法在try代码块之内,可能由于其它方法抛出异常,导致在finally代码块中,unlock对未加锁的对象解锁,它会调用AQS的tryRelease方法(取决于具体实现类),抛出IllegalMonitorStateException异常。
说明三:在Lock对象的lock方法实现中可能抛出unchecked异常,产生的后果与说明二相同。
已经说得非常清晰了。
Lock 接口规定了四种上锁,除了上文说到的最传统的 lock()
方法之外,还有以下三种:
lockInterruptibly()
会处理线程中断的上锁tryLock()
尝试上锁并立即返回,上锁成功则 true,上锁失败则 falsetryLock(long time, TimeUnit unit)
尝试一段时间上锁后返回,上锁成功则 true,上锁失败则 false
除以上上锁方法之外,最后还有一个方法 newCondition()
,该方法用于协调线程,这个后面再提。
写到这里,必须来补充学习两点其他的内容:线程中断、AQS,这两点是接下来要学习的基础。
线程中断
去年写过一篇文章学习线程,但基本只局限在线程的创建上,对线程的使用逻辑仍是知之甚少。这次学习 Lock 锁需要简单学习下线程的状态,以及线程中断的逻辑。
通常意义上线程有六种状态,但依我来看线程实际上只有两种状态:可运行状态、不可运行状态。
- 可运行状态:线程可以运行,但是并不一定正在运行,细分的话可以分为
正在运行
和等待运行
两种状态。 - 不可运行状态:线程不能运行,可能是主动的(主动等待),也可能是被动的(要用的资源被锁住了)。细分的话能分为三种状态:无限期等待状态、限期等待状态、阻塞状态,前两种是线程自己发起的,第三种是线程被迫的。
(下图主要参考《Java 线程运行怎么有第六种状态?》、《Java 6 Thread States and Life Cycle》)
对各个状态分别进行解释:
New
新增:线程刚刚创建(例如Thread t = new Thread()
),还没有执行代码Runnable
可运行:线程可以运行(例如thread.start()
),但并不代表一定在运行,是否正在运行要看虚拟机和 CPU 的线程调度情况。CPU 将时间划分为 10-20 ms 的一个个时间片,在每一个时间片中执行一条线程,到时间就切换(切换地太快导致似乎在并行执行多条线程),这被称为 CPU 在调度线程。在
Runnable
状态下,每一条线程都有可能会被执行,但是执行和切换的速度都很快,非要分出来是在执行还是在等待并没有太大的意义。Ready
等待运行:等待 CPU 调度Running
正在运行:CPU 正在执行
Waiting
无限期等待:线程主动等待,并且不设置等待结束的时间,直到被其他线程“唤醒”(例如thread.join()
)。Timed Waiting
限期等待:线程主动等待,但是设置一个等待的时长,到时间就自动唤醒(例如thread.sleep(sleepTime)
),在等待的这段时间也可以被其他线程“唤醒”。Blocked
阻塞等待:线程被动等待,因为抢锁失败了,被迫等着(例如使用 synchronized 同时让多条线程获取资源,总有线程会被迫等待)。
有关线程状态还可以剖析地更深一些:
- Java 的 Thread 类看似是一个寻常的 Java 对象,实际上可以视为对底层系统操作线程的封装,因此使用 Thread 类时不能完全按照面向对象的常规思维来思考,而是要以底层硬件的实现逻辑来思考。
- 上文我将线程分为了可运行状态和不可运行状态,细分析的话,这实际上是指 CPU 有没有为线程分配时间片。在另外的地方(线程和进程的区别)学习到,线程是操作系统能够调度的最小单位,“能调度的最小单位“这种说法,就是指 CPU 划分出一个个时间片,每一个时间片”调度“一个线程。可运行状态指的是 CPU 能够调度线程,而不可运行状态指的是 CPU 不能调度线程,比如某一个线程中执行
Thread.sleep(sleepTime)
方法,那么这个线程进入Timed Waiting
状态,在这种状态下 CPU 不再调度该线程,直到该线程休眠时间结束,回到Runnable
状态,CPU 才可以调度该线程,这个行为被称作线程的“挂起”。 - 线程通过
sleep(time)
和wait(time)
方法都可以进入Timed Waiting
状态,CPU 都不再会调度该线程,但是 sleep 的一方不会释放锁,wait 的一方会释放锁。其他线程如果需要正在 sleep 的线程的资源,将一直阻塞到那个线程醒来再释放资源。 - 只有使用 synchronized 才能导致线程进入
Blocked
状态,线程从Waiting
状态无法直接进入Runnable
状态,只能先进入Blocked
状态去获取锁。(顺便一提,进入Waiting
状态的 wait()、notify()、notifyAll() 方法,只能在 synchronized 代码块中使用)
我们终于要写到中断了。
线程中断,这里的“中断”是一个颇有迷惑性的词语,它并不是指线程就此停止,而是指线程收到了一个“中断信号”,线程应该根据这个信号来自行了断一些事情(但是收到中断信号也可以不处理)。
比如,线程 1 向线程 2 发送了一条中断信息,线程 2 的中断状态发生了改变,线程 2 根据中断状态来进行逻辑处理。所以我认为,中断是线程间通信的一种方式,通信的内容是“建议另一条线程停止行为”,但是线程并不一定采取意见,即使采取意见也绝不是终止线程,而是停止某个一直重复运行的行为,继续执行后续的代码。
我目前所见,中断有两种使用场景:
线程根据中断状态,停止某个循环(例如下面这段伪代码)
1
2
3
4while(还没中断){
循环执行
}
中断了,进行后续操作如果线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提前结束该线程,但是不能中断 I/O 阻塞和 synchronized 锁阻塞。
1
2
3
4
5
6try {
// 当前线程休眠1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
// 线程中断,不让继续休眠了,处理后续的业务逻辑
}这里的用法是,当线程处于不可运行状态时(暂停 CPU 调度),以异常的形式,强制让线程处理中断,以恢复回到可运行状态(CPU 可调度)。虽然这是在处理异常,但实际上并不是指程序有什么错误,而是代表一种强制手段:必须要对中断进行处理。再换句话说,这是一种恢复线程状态,停止发呆的一种机制。
线程中断有三个相关方法:
API | 介绍 |
---|---|
public void interrupt() | 中断线程 |
public boolean isInterrupted() | 查看线程是否中断 |
public static boolean interrupted() | 静态方法,查看当前线程是否中断的同时,清除中断状态 即如果线程中断,执行之后将不再处于中断状态 |
中断的源码,以及阻塞状态下的线程抛出中断异常的原理,这里暂不考究了。在此只掌握到两点即可:
- 线程中断不代表线程活动终止
- 线程中断的基本原理,是给线程的中断标志位赋 true
AQS
AQS 可以算是 JUC 包的核心,一大片并发类,包括本周要学习的 ReentrantLock 锁,都是以 AQS 为内核,不了解 AQS 则无法继续学习。
AQS 的全称是 AbstractQueuedSynchronizer(抽象队列同步器,中文一般简称“队列同步器”),它的作用正如其名,是一个队列,需要同步的线程们在队列里排队,每次让一个线程占用资源,剩下的线程在队列同步器里待命。这样的设计实现了这种效果:当多个线程争抢资源时,保证只会有一条线程在运行,其他线程都在等待队列里等候安排。
打开 AQS 接口看源码,会看到多如牛毛的方法,初识 AQS 如果从这些方法着手,就可以准备去世了,因此我们从 AQS 的成员变量着手,对 AQS 进行猜测性学习。
以下代码部分,基本全部参考自《一行一行源码分析清楚 AbstractQueuedSynchronizer》,这篇博文写的真的非常好,学习 AQS 必看(主要也是因为 AQS 的代码太神奇了,自己看完全看不懂……Doug Lea 这老爷子我服了)。
AQS 重要的成员变量有四个,分别是:
1 | // 头结点,你直接把它当做【当前持有锁的线程】可能是最好理解的 |
AQS 接口中定义了一个内部类:Node,这个类是 AQS 队列的基本构成元素,即并发线程们在 AQS 队列里等候时,都是装在这个 Node 对象里排序的。Node 类源码如下:
1 | static final class Node { |
Node 类的代码容易看得人一头雾水,初学时应当将其视为一个普通的链表节点,它必须需要
- Node prev:指向前个节点
- Node next:指向后个节点
- Thread Thread:本节点需要存储的内容
除此之外该节点还有一个状态位:
- int waitStatus:节点状态,在之后的代码中很重要
Node 类定义的其他内容不用太过纠结,看之后的代码会懂。
根据学习这个类,以及参考学习其他 AQS 相关的博文,可以大概知道 AQS 队列的基本结构和设计逻辑是这样的:
看图应该就能明白 AQS 的数据结构,需要注意的是,head 并不在 AQS 的阻塞队列当中。
以下部分是 AQS 的源码分析,同样基本参考自《一行一行源码分析清楚 AbstractQueuedSynchronizer》这篇文章,这部分的内容很难,可以不看,不会影响到 Lock 接口的学习。
之前的代码中说过,使用 Lock 接口上锁的基本步骤是:
1 | lock.lock(); --> AQS#acquire() |
实际上,lock() 和 unlock() 方法的原理,是使用 AQS 的 acquire() 和 release() 方法实现的,因此我们来粗略地学习这两个方法,并大致了解 AQS 的原理。(以下代码说明均为简略版,查看详细代码说明请参见上述博文)
上锁(新线程加入队列)
1
2
3
4
5
6
7
8
9
10
11
12
13public final void acquire(int arg) {
// 如果是首次进入队列的节点,那么尝试获取锁
// 如果不是首次进入队列,那么调整一下队列(之前的节点可能超时退出了)
/* 先尝试获取一次锁 */ ┌-----------------------------------------------------┐
/* 如果能拿到,就不用排队了 */ | /* 把线程包装成node,并加入阻塞队列 */ |
┌----------------------┐ | ┌---------------------------┐ |
if ( | !tryAcquire(arg) | && | acquireQueued( | addWaiter(Node.EXCLUSIVE) |, arg)) |
└----------------------┘ | └---------------------------┘ |
└-----------------------------------------------------┘
// 线程已经进入队列了
// 阻塞线程,耐心等吧
selfInterrupt();
}
解锁(老线程执行完毕,传唤下一个线程)
1
2
3
4
5
6
7
8
9
10
11public final boolean release(int arg) {
// 尝试释放线程
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
AQS 的具体实现代码,我自认为是又长又难的,因此不把全部代码整理出来了,只在此记录一些点吧:
- AQS 中有大量的方法,是为了处理并发的,例如队列还是空的,同时有两个线程进来申请锁,如何来让一个线程拿到锁,另一个线程去队列里排队等候。AQS 解决并发问题的原理是 CAS(CAS 的原理去看上篇介绍 synchronized 的博文),AQS 去调用 JDK5 刚刚出现的 sun.misc.Unsafe 类里面的方法,这个类对 CPU 的 CAS 指令进行了封装。
- 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。当占用锁的线程结束,调用 unlock() 方法,此时 AQS 会去队列里唤醒排在最前面的节点线程。
- AQS 接口确定了队列同步的主要逻辑,也就是上锁时线程先尝试获取锁,失败则加入队列;解锁时队列先尝试解除锁,如果解锁成功则唤醒后继节点。但是
尝试获取锁
和尝试解除锁
这两个操作,都是交由子类去实现的。这就使得 AQS 框架确立了基础的并发队列机制,但锁的形式可以有各种不同。实际上每个锁(每个 AQS 接口的实现类)就是在重写 AQS 的tryAcquire()
和tryRelease()
方法,其他的都依赖于 AQS 接口代码。 - AQS 有两个很重要的变量,分别是队列的状态 state,以及队列节点的状态 waitStatus。
- state:0 代表锁没有被占用,1 代表有线程正在占用锁,1 往上代表有线程正在重入占用锁
- waitStatus:0 代表初始化,大于 0 代表该节点取消了等待,-1 代表后继节点需要被唤醒
先写到这里吧。
接下来首先学习最常用的锁:ReentrantLock。
ReentrantLock 的字面意义是可重入锁,代表线程可以多次执行 lock() 方法占有锁,不会导致死锁问题。
ReentrantLock 允许公平锁,只要在构造方法中传入 true(new ReentrantLock(true)
)即可。公平锁的意思是,当多个线程获取锁时,按照先来后到的顺序,先申请锁的线程一定先得到锁,后申请锁的线程一定后得到锁。如果是非公平锁,那么各个线程获取到锁的顺序是“随机”的。对于 ReentrantLock 的非公平锁而言,后到的线程可以先试着获取一次锁,获取到了就直接返回,获取不到就跟公平锁一样在后面排队。ReentrantLock 实现公平锁和非公平锁的方式,是在内部维护两种 AQS 队列。
1 | // 非公平锁(Sync是一个AQS队列) |
经过刚才对 AQS 的学习,我们知道学习锁实际上只需要看 tryAcquire()
和 tryRelease()
方法,其他都交由 AQS 接口就可以了。
上锁
tryAcquire()
公平锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35// 尝试直接获取锁,返回值是boolean,代表是否获取到锁
// 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state == 0 此时此刻没有线程持有锁
if (c == 0) {
// 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
// 看看有没有别人在队列中等了半天了
if (!hasQueuedPredecessors() &&
// 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
// 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
// 因为刚刚还没人的,我判断过了
compareAndSetState(0, acquires)) {
// 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
setExclusiveOwnerThread(current);
return true;
}
}
// 会进入这个else if分支,说明是重入了,需要操作:state=state+1
// 这里不存在并发问题
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁
// 回到上面一个外层调用方法(AQS的acquire()方法)继续看:
// if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
return false;
}非公平锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26protected final boolean tryAcquire(int acquires) {
// 调用了nonfairTryAcquire()方法,往下看
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 与公平锁相比,只有这里有区别
// 非公平锁不会先判断AQS队列中是否有等候的节点,而是直接试着获取一次锁
// 如果这次尝试获取不到,则和公平锁一样尾插队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁和非公平锁只有两点区别:
非公平锁实际上会先 CAS 获取一次锁,如果失败则调用 AQS 的 acquire() 方法(这段上面没提)
1
2
3
4
5
6
7
8
9
10
11
12// 非公平锁的lock()方法(会先CAS获取一次锁,获取不到再走AQS接口)
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 公平锁的lock()方法
final void lock() {
acquire(1);
}在首次试着获取锁失败的情况下,非公平锁会在 tryAcquire() 方法中再试着获取一次锁,但是公平锁会严格地按照先来后到的顺序获取
可以总结出来,非公平锁比公平锁多尝试获取了两次锁,如果成功就不用进入队列了。这样可以提高并发的线程吞吐量,但是有可能导致先等待的线程一直获取不到锁。
解锁
tryRelease()
公平锁和非公平锁,共用一套解锁方法,也就是
Lock#unlock() -> AQS#release() -> Lock#tryRelease() -> AQS#unparkSuccessor()
,其中 tryRelease() 方法是交由实现类 ReentrantLock 去重写的(不明白的话回到上面看一看 AQS 的解锁逻辑)。ReentrantLock 重写的 tryRelease() 方法的代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否完全释放锁
boolean free = false;
// 处理重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}ReentrantLock 作为可重入锁,每次上锁就使 AQS 队列的状态(初始化是 0)增加 1,解锁使状态减少 1,如果 AQS 队列的状态变为 0 了,就代表没有线程持有锁。
学习得越深入,越感觉锁是一个无底洞,至少还需要再学习 ReentrantReadWriteLock、Semaphore、CountDownLatch、CyclicBarrier、Condition 等内容,本篇承载不了那么多,暂时先写到这里。后续的内容容我先消化一段时间,然后再回来补上。