当Java处理高并发的时候,线程数量特别的多的时候,而且每个线程都是执行很短的时间就结束了,频繁创建线程和销毁线程需要占用很多系统的资源和时间,会降低系统的工作效率。
参考http://www.cnblogs.com/dolphin0520/p/3932921.html
由于原文作者使用的API 是1.6 版本的,参考他的文章,做了一些修改成 jdk 1.8版本的方法,涉及到的内容比较多,可能有少许错误。
API : jdk1.8.0_144
ThreadPoolExecutor类
Java中线程池主要是并发包java.util.concurrent
中 ThreadPoolExecutor
这个类实现的。
构造函数
我们直接调用它的时候,使用的是它的构造函数,它有四个构造函数:
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
ThreadPoolExecutor
继承了AbstractExecutorService
抽象类,并提供了四个构造器,事实上,前面三个构造器都是调用的第四个构造器进行的初始化工作。所以主要研究下第四个构造器的方法。
首先了解下构造器中参数的意思:
corePoolSize
: 核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;maximumPoolSize
: 线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;keepAliveTime
:字面意思就是心跳时间,就是这个线程池中的线程数量大于corePoolSize
的时候开始计时,设置空闲线程最多能存活多长时间。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0,它的单位是参数TimeUnit unit
;unit
: 参数keepAliveTime
的时间单位,有7种取值,在TimeUnit类中有7种静态属性:1
2
3
4
5
6
7TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒workQueue
:一个阻塞队列BlockingQueue
,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择,以后再详细学习BlockingQueue
阻塞队列使用:1
2
3
4
5ArrayBlockingQueue; // 基于数组的阻塞队列实现
LinkedBlockingQueue; // 基于链表的阻塞队列
SynchronousQueue; //一种无缓冲的等待队列
DelayQueue; // 队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
PriorityBlockingQueue // 基于优先级的阻塞队列threadFactory
: 线程工厂,主要用来创建线程;handler
: 表示当拒绝处理任务时的策略,有以下四种取值:1
2
3
4ThreadPoolExecutor.AbortPolicy //丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy //也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy //丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy //由调用线程处理该任务
ThreadPoolExecutor方法
首先ThreadPoolExecutor
类自己拥有很多方法,用来获取线程池的相关属性。
1 |
ThreadPoolExecutor
继承了AbstractExecutorService
这个抽象类,
1 | public abstract class AbstractExecutorService implements ExecutorService{ |
AbstractExecutorService
实现了接口 ExecutorService
中所有的方法。
1 | public interface ExecutorService extends Executor { |
ExecutorService
接口继承了 Executor
接口。
1 | public interface Executor { |
可以看出类ThreadPoolExecutor
拥有了多少方法。
平时开发中主要使用方法:
1 | execute() // 线程池启动一个线程 |
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。
shutdown()和shutdownNow()是用来关闭线程池的。
线程池的实现
线程池的状态
1 | * The runState provides the main lifecycle control, taking on values: |
根据上面的代码文档,,可以清楚的了解到线程池的各种状态,以及在这种状态中能做的事情,状态之间的转变。
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
1 |
|
ctl作为ThreadPoolExecutor的核心状态控制字段,包含来两个信息:
- 工作线程总数
workerCount
- 线程池状态
RUNNING
、SHUTDOWN
、STOP
、TIDYING
、TERMINATED
。
COUNT_BITS 是32减去3 就是29,下面的线程池状态就是-1 到 3 分别向左移动29位。
如此,int的右侧29位,代表着线程数量,总数可以达到2的29次,29位后的3位代表线程池的状态
这样,线程池增加一个线程,只需吧ctl加1即可,而我们也发现实际这个线程池的最高线程数量是2的29次减1。并不是先前我们现象的2的32次减1。这个作者在注释中也提到了,说如果后续需要增大这个值, 可以吧ctl定义成AtomicLong。
任务的执行excute
属性变量
了解ThreadPoolExecutor
类中其他的一些比较重要成员变量:
1 |
|
- largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。
- 线程池线程一般正常工作的时候最大线程数为corePoolSize,当任务数量大于corePoolSize的时候,任务就进入等待的队列中,不继续增加线程;当等待队列也放满的时候,不能再往里面装任务的时候,这个时候就需要重新开辟新的线程,来工作了,并且数量要小于
maximumPoolSize
;如果大于maximumPoolSize,就调用handler方法。
执行任务 execute
使用AbstractExecuorService
中的submit 方法,可以执行新的进程,当然submit,最终执行的是execute方法,在ThreadPoolExecutor
类中实现了excute方法;
重点研究exexute 方法的实现,这个有点难,网上介绍1.6里面的源码中execute方法已经和我这个1.8版本有很大出入了,大致上应该没有偏离:
1 |
|
执行流程就是:
- 判断提交的任务command是否为null,若是null,则抛出空指针异常;
- 第二步 ct1.get();用这个
workerCountOf( ct1.get())
计算线程池已经使用多少线程; - 当使用的线程数小于核心线程数(corePoolSize),进入addWorker 方法中,这里就是开始进程的地方,进入到最重要的地方,为了这一步不要跳得太远,还是接着看execute方法,后面再看addWorker方法;
- 当使用的线程数不小于核心线程数(corePoolSize),新来得任务就要进入等待执行的状态;
if (isRunning(c) && workQueue.offer(command))
检查线程是否在running 状态和任务是否能够成功进入等待排队
;
4.1. 进入队列后,重新检查任务,如果线程池状态不是running状态, ,将回滚任务,拒绝执行任务,这样做主要是因为任务如果还在缓存队列等待的过程中,线程池中断了,就回滚任务,为了安全。
4.2. 如果线程中的线程数为0 了,创建一个空线程。 - 当使用的线程数不小于核心线程数(corePoolSize)的时候,并且添加进入到缓存队列失败后,就会执行
else if (!addWorker(command, false))reject(command);
这段代码,意思就是直接开辟一个新的线程去行这个任务,如果执行失败,拒绝策略进行处理这个任务,当然,如果当前线程池中的线程数目达到maximumPoolSize
,addWorker方法中也会采取任务拒绝策略进行处理。
addWorker 创建线程
下面将是阅读addWorker
的源码,研究线程池怎么添加一个任务的。
1 | /** |
看代码注释知道了第二个参数core
的意义,当它为true
的时候 使用的是线程核心数中的线程,当它为false
的时候,使用的是数量是maximumPoolSize,就是当缓存中的队列也排满的时候。
因此,调用这个 addWorker方法有4种传参的方式:
1 | addWorker(command, true); |
- 第一个:线程数小于corePoolSize时,放一个需要处理的task进worker set。如果worker set长度超过corePoolSize,就返回false。
- 第二个:当队列被放满时,就尝试将这个新来的task直接放入worker set,而此时worker set 的长度限制是maximumPoolSize。如果线程池也满了的话就返回false。
- 第三个:放入一个空的task进set,比较的的长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会判断出后去任务队列里拿任务,这样就相当于世创建了一个新的线程,只是没有马上分配任务。
- 第四个:这个方法就是放一个null的task进set,而且是在小于corePoolSize时。实际使用中是在 prestartCoreThread() 方法。这个方法用来为线程池先启动一个worker等待在那边,如果此时set中的数量已经达到corePoolSize那就返回false,什么也不干。还有是
prestartAllCoreThreads()
方法,准备corePoolSize个worker,初始化线程池中的线程。
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:1
2prestartCoreThread():初始化一个核心线程;
prestartAllCoreThreads():初始化所有核心线程
前面代码的意思就是验证线程池的状态是不是在RUNNING
状态,并且判断,线程数是不是超过了maximumPoolSize
,如果超过了最大线程数量,直接返回false,就回到execute 方法最后个if else()
代码块中,拒绝任务。
Worker 中主要实现
Worker
这个类很简单,只是继承了一个Runnable
接口,然后在run()
方法中去执行我们传入的firstTask
主要是其中的run 方法,它的run方法调用的是runWorker
:
1 | final void runWorker(Worker w) { |
注意当没有可执行的任务的时候,执行getTask()
方法:
1 | private Runnable getTask() { |
这个时候看到了,它原来去缓存队列中去取任务,来执行。
并且下面代码块做的任务,作者已经给出注释了
1 | // Recheck while holding lock. |
很容易理解了这段代码。
怎么样开启线程池,并且添加一个任务就到此结束了。
任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
1 | ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 |
任务缓存队列及排队策略
workQueue,任务缓存队列,用来存放等待执行的任务;
一个阻塞队列BlockingQueue
,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
1 | ArrayBlockingQueue; // 基于数组的阻塞队列实现,此队列创建时必须指定大小; |
线程池关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务;
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。
创建线程池并且使用
1 | package com.wuwii.test; |
执行结果:
1 | 正在执行task 0 |
从上面的结果可以看出来,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。
例外创建线程的时候建议使用的时Executors
类提供的方法来创建线程池:
1 | Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE |
配置线程池的大小
一般需要根据任务的类型来配置线程池大小:
如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1;
如果是IO密集型任务,参考值可以设置为2*NCPU。
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
总结
- 当一个task被安排进来的时候,再确定不是空值后,直接判断在池中已经有工作的线程是否小于corePoolSize,小于则增加一个线程来负责这个task。
- 如果池中已经工作的线程大于等于corePoolSize,就向队列里存task,而不是继续增加线程。
- 当workQueue.offer失败时,也就是说task不能再向队列里放的时候,而此时工作线程大于等于corePoolSize,那么新进的task,就要新开一个线程来接待了。
- 线程池工作机制是这样:
a.如果正在运行的线程数小于corePoolSize
,那就马上创建线程并运行这个任务,而不会进行排队。
b. 如果正在运行的线程数不小于corePoolSize
,那就把这个任务放入队列。
c. 如果队列满了,并且正在运行的线程数小于maximumPoolSize
,那么还是要创建线程并运行这个任务。
d.如果队列满了,并且正在运行的线程数不小于maximumPoolSize
,那么线程池就会调用handler里方法。(采用LinkedBlockingDeque
就不会出现队列满情况)。 - 使用线程池的时候,需要注意先分配好线程池的大小,大约每个线程占用10M内存,就是空间换时间,如果控制的不好,会存在内存溢出的问题,导致机器宕机。