分析 AQS(队列同步器)
AbstractQueuedSynchronizer
(AQS),是用来构建所或者其他同步组件的基础框架,它使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的队列工作。
源码版本 Jdk 1.8
怎么实现队列同步器
同步器主要的使用方式是继承,子类实现它的部分方法来管理同步状态变量就可以了。
简单的说,同步器,使用一个状态 state:int
表示它的状态变化,如果有其他的锁需要使用 AQS ,需要操作这个状态变量,AQS 直接提供了三个方法供修改状态变量:
1 | // 获取当前同步资源状态 |
同步器是一个 CLH 队列(FIFO),队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。
1 | static final class Node { |
属性名称 | 描述 |
---|---|
int waitStatus | 表示节点的状态。其中包含的状态有:CANCELLED ,值为1,表示当前的线程被取消;SIGNAL ,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;CONDITION ,值为-2,表示当前节点在等待condition,也就是在condition队列中;PROPAGATE ,值为-3,表示当前场景下后续的acquireShared能够得以执行;值为0,表示当前节点在sync队列中,等待着获取锁。 |
Node prev | 前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。 |
Node next | 后继节点。 |
Node nextWaiter | 存储condition队列中的后继节点。 |
Thread thread | 入队列时的当前线程。 |
当前占有资源的节点就是头节点。
AQS 定义两种资源共享方式:
Exclusive
:独占模式,又称排他模式,只能有一个线程占用资源,如 ReentrantLock;Share
:共享模式,多个线程可以一起执行,同时占用资源。
同步器对外部使用者提供五个方法,让锁使用资源的方法,主需要实现其中的部分方法,实现对共享资源的获取和释放就可以了。
isHeldExclusively()
:该线程是否正在独占资源。只有用到condition才需要去实现它。tryAcquire(int)
:独占方式。尝试获取资源,成功则返回true,失败则返回false。tryRelease(int)
:独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)
:共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。tryReleaseShared(int)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
独占模式
acquire
1 | // 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行 CAS 设置同步状态。 |
总结 acquire
流程:
- 使用
tryAcquire()
尝试直接去获取资源,如果成功则直接返回; - 如果上一步没有成功,
tryAcquire()
尝试直接去获取资源,如果成功则直接返回; acquireQueued()
让线程进入等待队列中自旋,当轮到自己去获取资源的时候,采取尝试获取资源,如果被中断过,则返回true
,如果返回false
则直接返回;- 如果上一步返回 true,表示线程被中断过,但是在等待过程中是不响应的,获取到资源的时候,才去将本线程进行中断。
release
1 | public final boolean release(long arg) { |
这里为什么从开始尾节点遍历,参考文章
因为在CLH队列中的结点随时有可能被中断,被中断的结点的waitStatus设置为CANCEL,而且它会被踢出CLH队列
,如何个踢出法,就是它的前趋结点的next并不会指向它,而是指向下一个非CANCEL的结点,而它自己的next指针指向它自己(将自己踢出,并让 GC 回收)。一旦这种情况发生,如何从头向尾方向寻找继任结点会出现问题,因为一个CANCEL结点的next为自己,那么就找不到正确的继任接点
。
总结下 release:
需要独占模式中自定义的同步器子类去实现,用来释放资源,释放相应的资源,将 state 减少相应的数量即可,如果完全释放了资源,唤醒等待队列中有效的线程来获取资源。
- 处理当前节点:非CANCELLED状态重置为0;
- 寻找下个节点:如果是CANCELLED状态,说明节点中途溜了,从队列尾开始寻找排在最前还在等着的节点
- 唤醒:利用LockSupport.unpark唤醒下个节点里的线程。
总结
- 获取资源的时候,队列同步器中每个节点都是一个线程在进行自旋,如果该节点的前驱节点是头节点,它就可以去获取资源,退出自旋的时候,将本线程的节点设置成头节点;
- 释放资源的时候,将本线程的等待状态改成 0 (等待状态),然后让下一个小于 0 的有效节点的节点状态改成 0(等待状态),然后资源状态。
- 只有当前节点的前一个节点为
SIGNAL
时,才能当前节点才能被挂起。对线程的挂起及唤醒操作是通过使用 LockSupport 类的park/unpark
实现的。
doAcquireNanos
该方法提供了具备有超时功能的获取状态的调用,如果在指定的nanosTimeout
内没有获取到状态,那么返回false,反之返回true。可以将该方法看做acquireInterruptibly的升级版,也就是在判断是否被中断的基础上增加了超时控制。
针对超时控制这部分的实现,主要需要计算出睡眠的delta,也就是间隔值。间隔可以表示为 nanosTimeout 等于原有nanosTimeout – now(当前时间)+ lastTime(睡眠之前记录的时间)
。如果nanosTimeout大于0,那么还需要使当前线程睡眠,反之则返回false。
1 | // 独占式超时获取资源 |
共享模式
acquireShared
1 | // 共享模式获取资源,获取成功则返回,获取失败进入同步等待队列中,整个过程忽略线程中断。 |
tryAcquireShared()
尝试获取资源,成功则直接返回;- 失败则通过
doAcquireShared()
进入等待队列park()
,直到被unpark()
/interrupt()
并成功获取到资源才返回。整个等待过程也是忽略中断的。
releaseShared
1 |
|
Mutex(独占锁)
具体是参考《Java 并发编程艺术》一书第五章:
1 | public class Mutex implements Lock { |
- 定义一个静态内部类继承类同步器并实现了独占模式的操作方法;
- 获取资源 tryAcquire 中,CAS 获取资源,获取成功返回 true;
- 释放资源 tryRelealse,将资源设置为0;
测试一下(每过一秒打印一个结果):
1 | public class MutexTest { |
打印结果:
1 | 0 |
TwinsLock(共享锁)
具体是参考《Java 并发编程艺术》一书第五章:
1 | public class TwinsLock implements Lock { |
TwinsLock 包含了一个自定义的同步器 sync,该同步器以共享方式获取同步状态。当 消耗资源tryAcquireShared(int reduceCount)
大于 或者等于 0 的时候,表示当前线程获取锁成功。
验证上面锁的正确性,就是要验证是否同一时刻有两个线程同时进行打印任务。
1 | public class TwinsLockTest { |
打印结果:
1 |
|
总结
在这个 AQS 同步器中我们时刻需要更改注意两个方面的问题:
一是要去维护同步队列,更改同步器的资源状态变量,通过
Unsafe
提供原子操作 CAS;二是底层还要去根据同步器状态变量去实现线程等待,线程唤醒的,它是通过
LockSupport
的park/unpark
操作。
当然使用者不需要注意这些问题,代码已经把这些方法都已经封装好了,只要实现资源变量的变化的几个方法就可以了。如果是要使用独占模式,只需要实现
1
2protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)如果只是要使用共享模式,需要实现
1
2protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
补充:CLH 队列锁
就是源码分析中那个同步队列,节点单位就是内部类 Node,获取锁的时候跟队列的头有关,释放锁主要删除头节点和从尾节点唤醒。它虽然保留了自旋操作,但是真实情况下,是阻塞了线程(LockSupport)。
补充:CAS
这部分参考文章 认识非阻塞的同步机制CAS
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论V值是否等于A值,都将返回V的原值。
当多个线程尝试使用CAS同时更新一个变量,最终只有一个线程会成功,其他线程都会失败。但和使用锁不同,失败的线程不会被阻塞,而是被告之本次更新操作失败了,可以再试一次。此时,线程可以根据实际情况,继续重试或者跳过操作,大大减少因为阻塞而损失的性能。所以,CAS是一种乐观的操作,它希望每次都能成功地执行更新操作。
AQS 中的 CAS 由 Unsafe
提供。
补充:自旋锁
这部分参考文章 CAS和自旋锁(spin lock)
由于在多处理器系统环境中有些资源因为其有限性,有时需要互斥访问(mutual exclusion),这时会引入锁的机制,只有获取了锁的进程才能获取资源访问。即是每次只能有且只有一个进程能获取锁,才能进入自己的临界区,同一时间不能两个或两个以上进程进入临界区,当退出临界区时释放锁。设计互斥算法时总是会面临一种情况,即没有获得锁的进程怎么办?通常有2种处理方式。一种是没有获得锁的调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,这就是自旋锁,他不用将线城阻塞起来(NON-BLOCKING);另一种是没有获得锁的进程就阻塞(BLOCKING)自己,请求OS调度另一个线程上处理器,这就是互斥锁。
跟互斥锁一样,一个执行单元要想访问被自旋锁保护的共享资源,必须先得到锁,在访问完共享资源后,必须释放锁。如果在获取自旋锁时,没有任何执行单元保持该锁,那么将立即得到锁;如果在获取自旋锁时锁已经有保持者,那么获取锁操作将自旋在那里,直到该自旋锁的保持者释放了锁。由此我们可以看出,自旋锁是一种比较低级的保护数据结构或代码片段的原始方式,这种锁可能存在两个问题:
- 递归死锁:试图递归地获得自旋锁必然会引起死锁:递归程序的持有实例在第二个实例循环,以试图获得相同自旋锁时,不会释放此自旋锁。在递归程序中使用自旋锁应遵守下列策略:递归程序决不能在持有自旋锁时调用它自己,也决不能在递归调用时试图获得相同的自旋锁。此外如果一个进程已经将资源锁定,那么,即使其它申请这个资源的进程不停地疯狂“自旋”,也无法获得资源,从而进入死循环。
- 过多占用cpu资源。如果不加限制,由于申请者一直在循环等待,因此自旋锁在锁定的时候,如果不成功,不会睡眠,会持续的尝试,单cpu的时候自旋锁会让其它process动不了. 因此,一般自旋锁实现会有一个参数限定最多持续尝试次数. 超出后, 自旋锁放弃当前time slice. 等下一次机会
由此可见,自旋锁比较适用于锁使用者保持锁时间比较短的情况。正是由于自旋锁使用者一般保持锁时间非常短,因此选择自旋而不是睡眠是非常必要的,自旋锁的效率远高于互斥锁。
补充:LockSupport
具体内容请参考 LockSupport(park/unpark)源码分析
LockSupport.park()
和 LockSupport.unpark(Thread thread)
调用的是 Unsafe
中本地方法:
1 | //park |
park
函数是将当前调用Thread阻塞,而 unpark
函数则是将指定线程唤醒。
ReentrantLock(独占锁)
https://www.jianshu.com/p/fe027772e156
其实 ReentantLock 的实现和上面的例子 Mutex
的差不多,不过它另外实现了可重入和公平锁两个部分。
公平锁
1 | protected final boolean tryAcquire(int acquires) { |
相比非公平锁,它多了一个方法hasQueuedPredecessors
判断队列是否有排在前面的线程在等待锁,没有的话调用compareAndSetState
使用 CAS 的方式修改state,然后设置本线程为独占锁,并且它是可重入锁。
ReentrantReadWriteLock(读写锁)
首先来分析下读写锁的几个重要的特点:
- 读写状态的设计;
- 写锁的获取与释放;
- 读锁的获取与释放;
- 锁降级的实现。
构造方法
1 | public ReentrantReadWriteLock() { |
读写状态设计
在读写锁中最重要的就是Sync类,它继承自AQS,还记得吗,AQS使用一个int型来保存状态,状态在这里就代表锁,它提供了获取和修改状态的方法。可是,这里要实现读锁和写锁,只有一个状态怎么办?Doug Lea是这么做的,它把状态的高16位用作读锁,低16位用作写锁,所以无论是读锁还是写锁最多只能被持有65535次。所以在判断读锁和写锁的时候,需要进行位运算:
- 由于读写锁共享状态,所以状态不为0,只能说明是有锁,可能是读锁,也可能是写锁;
- 读锁是高16为表示的,所以读锁加1,就是状态的高16位加1,低16位不变,所以要加的不是1,而是2^16,减一同样是这样。
- 写锁用低16位表示,要获得写锁的次数,要用状态&2^16-1,结果的高16位全为0,低16位就是写锁被持有的次数。
那它是怎么确定读写的各自的状态的了,是通过位运算符,假设当前同步状态值为 S,写状态等于 S & 0x0000FFFF
(只有 低 16 位),读状态等于 S >>> 16
(无符号补位右移 16 位);这个时候写状态增加 1 时,等于 S + 1
,当读状态增加 1,等于 S + (1 << 16)
,也就是 S + 0x00010000
。
根据状态的划分可以得出一个结论: S 不等于 0 时,就是写状态计算公式 S & 0x0000FFFF == 0
,则读状态 S >>> 16 > 0
这个时候,读锁获取到了。
同步器的设计
1 | //ReentrantReadWriteLock的同步器 |
写锁的获取和释放
首先,写锁是一个可重入的排他锁,如果当前线程获取到了写锁,则增加写状态,如果读锁或者写锁已经被获取了,它则进入等待状态(写锁要确保写锁的操作对读锁可见)。
写锁和对外提供的方法和 ReentrantLock 一样的,这里主要去分析下它是怎么获取和释放资源的。
1 |
|
获取锁:
获取锁的过程:
- 首先获取写锁的数量;
- 判断写锁是否已经被获取了,如果已经获取了,就要做重入操作,将锁的资源数量加一,然后返回;如果是首次获取,就要进行 CAS 操作获取独占锁;
释放锁的过程:
- 计算如果释放完资源的数量;
- 如果剩下的资源数量为 0,则释放写锁;
- 如果剩下的资源数量不为 0,就将计算完的资源数量写入。
读锁的获取和释放
读锁是一个支持重入的共享锁,它能够被多个线程同时获取,当写锁的状态为 0 的时候,读锁总是获取成功的,并且增加读状态。这里比较复杂些。
1 |
|
阅读读锁的时候需要注意firstReader
,和 HoldCounter
这两个变量的变化就可以了:
- 如果读锁没有被持有,那么每一个线程的
HoldCounter
变量中的 count 变量一定是为 0; - 如果当前线程是第一个获取到读锁的线程,设置
firstReader
为当前线程,并且设置firstReadHoldCount
数量; - 那么如果当前线程不是第一个获取读锁的线程,那么获取当前线程的
HoldCounter
,获取count
的值,判断它等不等于 0 ,如果等于 0 的话,表示当前线程没有获取读锁,那么可以从readHolds
的管理中将它移除,
锁降级
锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
总结
主要是分析 AQS 源码的实现,了解到所有的同步类都是实现自定义的同步器 sync
,实现独占方法或者共享方法中的获取资源和释放资源方法供自己使用,同步器只要关注资源变量 state 的变化,对使用者非常友好,层次分明,而不需要关注队列和线程的阻塞的情况。