从 lock 到 AQS 再到 ReentrantLock


四月的第一周(实际上学习了两周有余),来学习另一种并发锁 Lock。

自 JDK 5 起,Java 类库中新提供了 java.util.concurrent 包(通常简称为 JUC 包),本周要学习的 Lock 接口,尤其是 ReentrantLock 类均出自该包当中。


Java 中有两种对并发资源加锁的方式,除了上次写过的 synchronized 之外,还有本次要学习的 Lock。synchronized 是 JVM 通过底层实现的,是物理攻击,而 Lock 是通过 JDK 纯粹在软件层面上实现的,是魔法伤害,这周来学习魔法伤害。


Lock 类本身是一个接口,对锁进行了规范,Lock 接口的定义如下:

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}

Lock 接口一共规范给定了 6 个方法。


其中最为常用的,是 lock() 方法和 unlock() 方法,这两个方法必须成对出现,否则就有可能出现异常,使用逻辑如下:

1
2
3
4
5
6
7
// 假如已经创建了一个lock对象
lock.lock();
try {
// ...
} finally {
lock.unlock();
}

这里使用 lock 上锁,与使用 synchronized 上锁的效果是相同的,但在使用上从大括号代码块变为 try 代码块,并且一定要使用 finally 语句为 lock 对象解锁。

阿里巴巴的 Java 代码规约中指出:

锁【lock.lock】必须紧跟try代码块,且unlock要放到finally第一行。

……

说明一:如果在lock方法与try代码块之间的方法调用抛出异常,那么无法解锁,造成其它线程无法成功获取锁。
说明二:如果lock方法在try代码块之内,可能由于其它方法抛出异常,导致在finally代码块中,unlock对未加锁的对象解锁,它会调用AQS的tryRelease方法(取决于具体实现类),抛出IllegalMonitorStateException异常。
说明三:在Lock对象的lock方法实现中可能抛出unchecked异常,产生的后果与说明二相同。

已经说得非常清晰了。


Lock 接口规定了四种上锁,除了上文说到的最传统的 lock() 方法之外,还有以下三种:

  • lockInterruptibly() 会处理线程中断的上锁
  • tryLock() 尝试上锁并立即返回,上锁成功则 true,上锁失败则 false
  • tryLock(long time, TimeUnit unit) 尝试一段时间上锁后返回,上锁成功则 true,上锁失败则 false

除以上上锁方法之外,最后还有一个方法 newCondition(),该方法用于协调线程,这个后面再提。


写到这里,必须来补充学习两点其他的内容:线程中断、AQS,这两点是接下来要学习的基础。

线程中断

去年写过一篇文章学习线程,但基本只局限在线程的创建上,对线程的使用逻辑仍是知之甚少。这次学习 Lock 锁需要简单学习下线程的状态,以及线程中断的逻辑。

通常意义上线程有六种状态,但依我来看线程实际上只有两种状态:可运行状态、不可运行状态。

  • 可运行状态:线程可以运行,但是并不一定正在运行,细分的话可以分为正在运行等待运行两种状态。
  • 不可运行状态:线程不能运行,可能是主动的(主动等待),也可能是被动的(要用的资源被锁住了)。细分的话能分为三种状态:无限期等待状态、限期等待状态、阻塞状态,前两种是线程自己发起的,第三种是线程被迫的。

(下图主要参考《Java 线程运行怎么有第六种状态?》《Java 6 Thread States and Life Cycle》

线程状态

对各个状态分别进行解释:

  • New 新增:线程刚刚创建(例如 Thread t = new Thread()),还没有执行代码

  • Runnable 可运行:线程可以运行(例如 thread.start()),但并不代表一定在运行,是否正在运行要看虚拟机和 CPU 的线程调度情况。

    CPU 将时间划分为 10-20 ms 的一个个时间片,在每一个时间片中执行一条线程,到时间就切换(切换地太快导致似乎在并行执行多条线程),这被称为 CPU 在调度线程。在 Runnable 状态下,每一条线程都有可能会被执行,但是执行和切换的速度都很快,非要分出来是在执行还是在等待并没有太大的意义。

    • Ready 等待运行:等待 CPU 调度
    • Running 正在运行:CPU 正在执行
  • Waiting 无限期等待:线程主动等待,并且不设置等待结束的时间,直到被其他线程“唤醒”(例如 thread.join())。

  • Timed Waiting 限期等待:线程主动等待,但是设置一个等待的时长,到时间就自动唤醒(例如 thread.sleep(sleepTime)),在等待的这段时间也可以被其他线程“唤醒”。

  • Blocked 阻塞等待:线程被动等待,因为抢锁失败了,被迫等着(例如使用 synchronized 同时让多条线程获取资源,总有线程会被迫等待)。


有关线程状态还可以剖析地更深一些:

  • Java 的 Thread 类看似是一个寻常的 Java 对象,实际上可以视为对底层系统操作线程的封装,因此使用 Thread 类时不能完全按照面向对象的常规思维来思考,而是要以底层硬件的实现逻辑来思考。
  • 上文我将线程分为了可运行状态和不可运行状态,细分析的话,这实际上是指 CPU 有没有为线程分配时间片。在另外的地方(线程和进程的区别)学习到,线程是操作系统能够调度的最小单位,“能调度的最小单位“这种说法,就是指 CPU 划分出一个个时间片,每一个时间片”调度“一个线程。可运行状态指的是 CPU 能够调度线程,而不可运行状态指的是 CPU 不能调度线程,比如某一个线程中执行 Thread.sleep(sleepTime) 方法,那么这个线程进入 Timed Waiting 状态,在这种状态下 CPU 不再调度该线程,直到该线程休眠时间结束,回到 Runnable 状态,CPU 才可以调度该线程,这个行为被称作线程的“挂起”。
  • 线程通过 sleep(time)wait(time) 方法都可以进入 Timed Waiting 状态,CPU 都不再会调度该线程,但是 sleep 的一方不会释放锁,wait 的一方会释放锁。其他线程如果需要正在 sleep 的线程的资源,将一直阻塞到那个线程醒来再释放资源。
  • 只有使用 synchronized 才能导致线程进入 Blocked 状态,线程从 Waiting 状态无法直接进入 Runnable 状态,只能先进入 Blocked 状态去获取锁。(顺便一提,进入 Waiting 状态的 wait()、notify()、notifyAll() 方法,只能在 synchronized 代码块中使用)

我们终于要写到中断了。

线程中断,这里的“中断”是一个颇有迷惑性的词语,它并不是指线程就此停止,而是指线程收到了一个“中断信号”,线程应该根据这个信号来自行了断一些事情(但是收到中断信号也可以不处理)。

比如,线程 1 向线程 2 发送了一条中断信息,线程 2 的中断状态发生了改变,线程 2 根据中断状态来进行逻辑处理。所以我认为,中断是线程间通信的一种方式,通信的内容是“建议另一条线程停止行为”,但是线程并不一定采取意见,即使采取意见也绝不是终止线程,而是停止某个一直重复运行的行为,继续执行后续的代码。

我目前所见,中断有两种使用场景:

  1. 线程根据中断状态,停止某个循环(例如下面这段伪代码)

    1
    2
    3
    4
    while(还没中断){
    循环执行
    }
    中断了,进行后续操作
  2. 如果线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提前结束该线程,但是不能中断 I/O 阻塞和 synchronized 锁阻塞。

    1
    2
    3
    4
    5
    6
    try {
    // 当前线程休眠1秒
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    // 线程中断,不让继续休眠了,处理后续的业务逻辑
    }

    这里的用法是,当线程处于不可运行状态时(暂停 CPU 调度),以异常的形式,强制让线程处理中断,以恢复回到可运行状态(CPU 可调度)。虽然这是在处理异常,但实际上并不是指程序有什么错误,而是代表一种强制手段:必须要对中断进行处理。再换句话说,这是一种恢复线程状态,停止发呆的一种机制。

线程中断有三个相关方法:

API 介绍
public void interrupt() 中断线程
public boolean isInterrupted() 查看线程是否中断
public static boolean interrupted() 静态方法,查看当前线程是否中断的同时,清除中断状态
即如果线程中断,执行之后将不再处于中断状态

中断的源码,以及阻塞状态下的线程抛出中断异常的原理,这里暂不考究了。在此只掌握到两点即可:

  1. 线程中断不代表线程活动终止
  2. 线程中断的基本原理,是给线程的中断标志位赋 true

AQS

AQS 可以算是 JUC 包的核心,一大片并发类,包括本周要学习的 ReentrantLock 锁,都是以 AQS 为内核,不了解 AQS 则无法继续学习。

AQS 的全称是 AbstractQueuedSynchronizer(抽象队列同步器,中文一般简称“队列同步器”),它的作用正如其名,是一个队列,需要同步的线程们在队列里排队,每次让一个线程占用资源,剩下的线程在队列同步器里待命。这样的设计实现了这种效果:当多个线程争抢资源时,保证只会有一条线程在运行,其他线程都在等待队列里等候安排。

打开 AQS 接口看源码,会看到多如牛毛的方法,初识 AQS 如果从这些方法着手,就可以准备去世了,因此我们从 AQS 的成员变量着手,对 AQS 进行猜测性学习。

以下代码部分,基本全部参考自《一行一行源码分析清楚 AbstractQueuedSynchronizer》,这篇博文写的真的非常好,学习 AQS 必看(主要也是因为 AQS 的代码太神奇了,自己看完全看不懂……Doug Lea 这老爷子我服了)。


AQS 重要的成员变量有四个,分别是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 头结点,你直接把它当做【当前持有锁的线程】可能是最好理解的
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程(该变量继承自父类),举个最重要的使用例子
// 因为锁可以重入,reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread;

AQS 接口中定义了一个内部类:Node,这个类是 AQS 队列的基本构成元素,即并发线程们在 AQS 队列里等候时,都是装在这个 Node 对象里排序的。Node 类源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static final class Node {
// 标识节点当前在共享模式下
static final Node SHARED = new Node();
// 标识节点当前在独占模式下
static final Node EXCLUSIVE = null;

// ======== 下面的几个int常量是给waitStatus用的 ===========
// 代表此线程取消了争抢这个锁
static final int CANCELLED = 1;
// 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
// 本文不分析condition
static final int CONDITION = -2;
// 同样的不分析,略过吧
static final int PROPAGATE = -3;
// =====================================================

// 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
// 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
// ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
volatile int waitStatus;
// 前驱节点的引用
volatile Node prev;
// 后继节点的引用
volatile Node next;
// 这个就是线程本尊
volatile Thread thread;
}

Node 类的代码容易看得人一头雾水,初学时应当将其视为一个普通的链表节点,它必须需要

  • Node prev:指向前个节点
  • Node next:指向后个节点
  • Thread Thread:本节点需要存储的内容

除此之外该节点还有一个状态位:

  • int waitStatus:节点状态,在之后的代码中很重要

Node 类定义的其他内容不用太过纠结,看之后的代码会懂。

根据学习这个类,以及参考学习其他 AQS 相关的博文,可以大概知道 AQS 队列的基本结构和设计逻辑是这样的:

AQS数据结构

看图应该就能明白 AQS 的数据结构,需要注意的是,head 并不在 AQS 的阻塞队列当中。


以下部分是 AQS 的源码分析,同样基本参考自《一行一行源码分析清楚 AbstractQueuedSynchronizer》这篇文章,这部分的内容很难,可以不看,不会影响到 Lock 接口的学习。

之前的代码中说过,使用 Lock 接口上锁的基本步骤是:

1
2
3
4
5
6
lock.lock();		--> AQS#acquire()
try {
// ...
} finally {
lock.unlock(); --> AQS#release()
}

实际上,lock() 和 unlock() 方法的原理,是使用 AQS 的 acquire() 和 release() 方法实现的,因此我们来粗略地学习这两个方法,并大致了解 AQS 的原理。(以下代码说明均为简略版,查看详细代码说明请参见上述博文)

  1. 上锁(新线程加入队列)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public final void acquire(int arg) {
    // 如果是首次进入队列的节点,那么尝试获取锁
    // 如果不是首次进入队列,那么调整一下队列(之前的节点可能超时退出了)
    /* 先尝试获取一次锁 */ ┌-----------------------------------------------------┐
    /* 如果能拿到,就不用排队了 */ | /* 把线程包装成node,并加入阻塞队列 */ |
    ┌----------------------┐ | ┌---------------------------┐ |
    if ( | !tryAcquire(arg) | && | acquireQueued( | addWaiter(Node.EXCLUSIVE) |, arg)) |
    └----------------------┘ | └---------------------------┘ |
    └-----------------------------------------------------┘
    // 线程已经进入队列了
    // 阻塞线程,耐心等吧
    selfInterrupt();
    }
  1. 解锁(老线程执行完毕,传唤下一个线程)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public final boolean release(int arg) {
    // 尝试释放线程
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    // 唤醒后继节点
    unparkSuccessor(h);
    return true;
    }
    return false;
    }

AQS 的具体实现代码,我自认为是又长又难的,因此不把全部代码整理出来了,只在此记录一些点吧:

  • AQS 中有大量的方法,是为了处理并发的,例如队列还是空的,同时有两个线程进来申请锁,如何来让一个线程拿到锁,另一个线程去队列里排队等候。AQS 解决并发问题的原理是 CAS(CAS 的原理去看上篇介绍 synchronized 的博文),AQS 去调用 JDK5 刚刚出现的 sun.misc.Unsafe 类里面的方法,这个类对 CPU 的 CAS 指令进行了封装。
  • 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。当占用锁的线程结束,调用 unlock() 方法,此时 AQS 会去队列里唤醒排在最前面的节点线程。
  • AQS 接口确定了队列同步的主要逻辑,也就是上锁时线程先尝试获取锁,失败则加入队列;解锁时队列先尝试解除锁,如果解锁成功则唤醒后继节点。但是尝试获取锁尝试解除锁这两个操作,都是交由子类去实现的。这就使得 AQS 框架确立了基础的并发队列机制,但锁的形式可以有各种不同。实际上每个锁(每个 AQS 接口的实现类)就是在重写 AQS 的 tryAcquire()tryRelease() 方法,其他的都依赖于 AQS 接口代码。
  • AQS 有两个很重要的变量,分别是队列的状态 state,以及队列节点的状态 waitStatus。
    • state:0 代表锁没有被占用,1 代表有线程正在占用锁,1 往上代表有线程正在重入占用锁
    • waitStatus:0 代表初始化,大于 0 代表该节点取消了等待,-1 代表后继节点需要被唤醒

先写到这里吧。



接下来首先学习最常用的锁:ReentrantLock。

ReentrantLock 的字面意义是可重入锁,代表线程可以多次执行 lock() 方法占有锁,不会导致死锁问题。

ReentrantLock 允许公平锁,只要在构造方法中传入 true(new ReentrantLock(true))即可。公平锁的意思是,当多个线程获取锁时,按照先来后到的顺序,先申请锁的线程一定先得到锁,后申请锁的线程一定后得到锁。如果是非公平锁,那么各个线程获取到锁的顺序是“随机”的。对于 ReentrantLock 的非公平锁而言,后到的线程可以先试着获取一次锁,获取到了就直接返回,获取不到就跟公平锁一样在后面排队。ReentrantLock 实现公平锁和非公平锁的方式,是在内部维护两种 AQS 队列。

1
2
3
4
// 非公平锁(Sync是一个AQS队列)
static final class NonfairSync extends Sync {...}
// 公平锁
static final class FairSync extends Sync {...}

经过刚才对 AQS 的学习,我们知道学习锁实际上只需要看 tryAcquire()tryRelease() 方法,其他都交由 AQS 接口就可以了。

  1. 上锁 tryAcquire()

    • 公平锁

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      // 尝试直接获取锁,返回值是boolean,代表是否获取到锁
      // 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
      protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      // state == 0 此时此刻没有线程持有锁
      if (c == 0) {
      // 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
      // 看看有没有别人在队列中等了半天了
      if (!hasQueuedPredecessors() &&
      // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
      // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
      // 因为刚刚还没人的,我判断过了
      compareAndSetState(0, acquires)) {

      // 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
      setExclusiveOwnerThread(current);
      return true;
      }
      }
      // 会进入这个else if分支,说明是重入了,需要操作:state=state+1
      // 这里不存在并发问题
      else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0)
      throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
      }
      // 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁
      // 回到上面一个外层调用方法(AQS的acquire()方法)继续看:
      // if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      // selfInterrupt();
      return false;
      }
    • 非公平锁

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      protected final boolean tryAcquire(int acquires) {
      // 调用了nonfairTryAcquire()方法,往下看
      return nonfairTryAcquire(acquires);
      }

      final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
      // 与公平锁相比,只有这里有区别
      // 非公平锁不会先判断AQS队列中是否有等候的节点,而是直接试着获取一次锁
      // 如果这次尝试获取不到,则和公平锁一样尾插队列
      if (compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
      }
      }
      else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0)
      throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
      }
      return false;
      }

    公平锁和非公平锁只有两点区别:

    1. 非公平锁实际上会先 CAS 获取一次锁,如果失败则调用 AQS 的 acquire() 方法(这段上面没提)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      // 非公平锁的lock()方法(会先CAS获取一次锁,获取不到再走AQS接口)
      final void lock() {
      if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
      else
      acquire(1);
      }

      // 公平锁的lock()方法
      final void lock() {
      acquire(1);
      }
    2. 在首次试着获取锁失败的情况下,非公平锁会在 tryAcquire() 方法中再试着获取一次锁,但是公平锁会严格地按照先来后到的顺序获取

    可以总结出来,非公平锁比公平锁多尝试获取了两次锁,如果成功就不用进入队列了。这样可以提高并发的线程吞吐量,但是有可能导致先等待的线程一直获取不到锁。

  2. 解锁 tryRelease()

    公平锁和非公平锁,共用一套解锁方法,也就是 Lock#unlock() -> AQS#release() -> Lock#tryRelease() -> AQS#unparkSuccessor(),其中 tryRelease() 方法是交由实现类 ReentrantLock 去重写的(不明白的话回到上面看一看 AQS 的解锁逻辑)。ReentrantLock 重写的 tryRelease() 方法的代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    // 是否完全释放锁
    boolean free = false;
    // 处理重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉
    if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
    }

    ReentrantLock 作为可重入锁,每次上锁就使 AQS 队列的状态(初始化是 0)增加 1,解锁使状态减少 1,如果 AQS 队列的状态变为 0 了,就代表没有线程持有锁。



学习得越深入,越感觉锁是一个无底洞,至少还需要再学习 ReentrantReadWriteLock、Semaphore、CountDownLatch、CyclicBarrier、Condition 等内容,本篇承载不了那么多,暂时先写到这里。后续的内容容我先消化一段时间,然后再回来补上。