java.util.concurrent
下提供了一些辅助类来帮助我们在并发编程的设计。
学习了 AQS 后再了解这些工具类,就非常简单了。
jdk 1.8
等待多线程完成的CountDownLatch
在 concurrent
包下面提供了 CountDownLatch
类,它提供了计数器的功能,能够实现让一个线程等待其他线程执行完毕才能进入运行状态。
源码分析
首先看下最关键的地方它的自定义同步器的实现,非常简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 1. 初始化 state 资源变量
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 尝试获取贡献模式下的资源,
// 定义返回值小于 0 的时候获取资源失败
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 自旋。
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1; // 每次释放资源,硬编码减一个资源
if (compareAndSetState(c, nextc))
return nextc == 0; // 知道为 0 的时候才释放成功,也就是所有线程必须都执行释放操作说明才释放成功。
}
}
}在这里查看构造器的源码得知,
CountDownLatch
内部使用的是 内部类Sync
继承了AQS
,将我们传入进来的count
数值当作 AQS state。感觉这个是不是和可重入锁实现是一样的,只不过开始指定了线程获取的锁的次数。在上面我也发现了几个特点,第一次看这个代码其实还是不好理解,因为它相对前面的 AQS 和 TwinsLock 就是一个反着设计的代码:
- 首先获取资源的时候,线程全部都是先进入等待队列,而且在这一步骤中,不改变 state 资源的数量;
- 释放资源的时候,每次固定减少一个资源,直到资源为 0 的时候才表示释放资源成功,所以加入我们有 5 个资源,但是只有四个线程执行,如果只释放四次(总共执行 countDown 四次),就永远也释放不成功,await 一直在阻塞。
- 经过上面的分析,发现了 state 的资源数量每次进行
countDown
都去减少一个,没有方法去增加数量,所以它是不可逆的,它的计数器是不可以重复使用的。
看下 await 的实现,发现它最终实现的是
doAcquireSharedInterruptibly
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30// 仔细看这个代码,和前面的共享模式中的 doAcquireShared 方法基本一摸一样,只不过是当它遇到线程中断信号的时候,立刻抛出中断异常,仔细想想也是的,比如,自己在这里等别人吃饭,不想等了,也懒得管别人做什么了,剩下的吃饭的事情也没必要继续下去了。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 需要注意的是它重写了尝试获取资源的方法,当资源全部消耗完,才能够让你去获取资源,现在才豁然开朗,await 阻塞的线程就是这么被唤醒的。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
使用场景
CountDownLatch允许一个或多个线程等待其他线程完成操作。
比如经典问题:
有Thread1、Thread2、Thread3、Thread4四条线程分别统计C、D、E、F四个盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,应当如何实现?
这个问题关键就是要知道四条线程何时执行完。
下面是我的解决思路:
1 | /** |
同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
源码分析
属性
1 | private final ReentrantLock lock = new ReentrantLock(); |
主要的方法
构造函数
1 | public CyclicBarrier(int parties) { |
await
1 | public int await() throws InterruptedException, BrokenBarrierException { |
使用场景
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景,然后四条线程又可以分别去干自己的事情了。
现在我将上面的统计磁盘的任务 CountDownLatch
中改下,统计完统计最终后,每个线程要发出退出信号。
下面是我的实现代码:
1 | public class CyclicBarrierDemo { |
执行结果:
1 | pool-1-thread-1 线程统计 C 盘大小 |
控制并发线程数的Semaphore
Semaphore
(信号量)是用来控制同时访问特定资源的线程数量(许可证数),它通过协调各个线程,以保证合理的使用公共资源。
源码分析
构造函数
1 | public Semaphore(int permits) { |
具有公平锁的特性,permits
指定许可数量,就是资源数量 state
。
同步器的实现
1 | abstract static class Sync extends AbstractQueuedSynchronizer { |
提供其他的方法
availablePermits
:获取此信号量中当前可用的许可证数(还能有多少个线程执行);drainPermits
:立刻使用完所有可用的许可证;reducePermits
:减少相应数量的许可证,是一个protected
方法;isFair
:是否是公平状态;hasQueuedThreads
:等待队列中是否有线程,等待获取许可证;getQueueLength
:等待队列中等待获取许可证的线程数量;getQueuedThreads
:protected
方法,获取等待队列中的线程。
使用场景
Semaphore
可以用于做流量控制,特别是公用资源有限的应用场景,比如我们有五台机器,有十名工人,每个工人需要一台机器才能工作,一名工人工作完了就可以休息了,机器让其他没工作过的工人使用。
下面是我的实现代码:
1 | public class SemaphoreDemo { |
执行一下结果:
1 | 工人 0 开始使用机器工作了 |
虽然上面有 10 个工人(线程)一起并发,但是,它同时只有五个工人能够是执行的。
线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger 用于两个工作线程间的数据交换。
具体上来说,Exchanger类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据进入到第二个线程中,第二个线程的数据进入到第一个线程中,这要就完成了一个“交易”的环节。
源码分析
源码很难看懂,主要还是
【死磕Java并发】—–J.U.C之并发工具类:Exchanger
使用场景
Exchanger 可以用于遗传算法。遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据。
下面做一个卖书买书的例子:
1 | public class ExchangerDemo { |
执行结果:
1 | 饭饭要卖一本浮生六记。 |
总结
Exchanger
主要完成的是两个工作线程之间的数据交换,如果有一个线程没有执行 exchange()
方法,则会一直等待。还可以设置最大等待时间exchange(V v, TimeUnit unit)
CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
CyclicBarrier还提供其他有用的方法,比如
getNumberWaiting
方法可以获得CyclicBarrier
阻塞的线程数量。isBroken()
方法用来了解阻塞的线程是否被中断。
参考文章
- 《Java 并发编程的艺术》