Java语言 线程池:ThreadPoolExecutor 的深入理解
小标 2018-11-05 来源 : 阅读 737 评论 0

摘要:本文主要向大家介绍了Java语言 线程池:ThreadPoolExecutor 的深入理解,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

本文主要向大家介绍了Java语言 线程池:ThreadPoolExecutor 的深入理解,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

线程池介绍


在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:


如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。


那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?


这就是线程池的目的了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。


什么时候使用线程池?


单个任务处理时间比较短


需要处理的任务数量很大


使用线程池的好处


引用自ifeve.com/java-thread…的说明:


降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。


提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。


提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。


Java中的线程池是用ThreadPoolExecutor类来实现的. 本文就结合JDK 1.8对该类的源码来分析一下这个类内部对于线程的创建, 管理以及后台任务的调度等方面的执行原理。


先看一下线程池的类图:



Executor框架接口


Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。


J.U.C中有三个Executor接口:


Executor:一个运行新任务的简单接口;


ExecutorService:扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;


ScheduledExecutorService:扩展了ExecutorService。支持Future和定期执行任务。


Executor接口


public interface Executor {

 void execute(Runnable command);

}

   


Executor接口只有一个execute方法,用来替代通常创建或启动线程的方法。例如,使用Thread来创建并启动线程的代码如下:


Thread t = new Thread();

t.start();

   


使用Executor来启动线程执行任务的代码如下:


Thread t = new Thread();

executor.execute(t);

   


对于不同的Executor实现,execute()方法可能是创建一个新线程并立即启动,也有可能是使用已有的工作线程来运行传入的任务,也可能是根据设置线程池的容量或者阻塞队列的容量来决定是否要将传入的线程放入阻塞队列中或者拒绝接收传入的线程。


ExecutorService接口


ExecutorService接口继承自Executor接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支持即时关闭,也就是shutDownNow()方法,则任务需要正确处理中断。


ScheduledExecutorService接口


ScheduledExecutorService扩展ExecutorService接口并增加了schedule方法。调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。


ThreadPoolExecutor分析


ThreadPoolExecutor继承自AbstractExecutorService,也是实现了ExecutorService接口。


几个重要的字段


   

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY= (1 << COUNT_BITS) - 1;

 

// runState is stored in the high-order bits

private static final int RUNNING = -1 << COUNT_BITS;

private static final int SHUTDOWN=  0 << COUNT_BITS;

private static final int STOP =  1 << COUNT_BITS;

private static final int TIDYING =  2 << COUNT_BITS;

private static final int TERMINATED =  3 << COUNT_BITS;

   


ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。


下面再介绍下线程池的运行状态. 线程池一共有五种状态, 分别是:


RUNNING:能接受新提交的任务,并且也能处理阻塞队列中的任务;


SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);


STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;


TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。


TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。


进入TERMINATED的条件如下:


线程池不是RUNNING状态;


线程池状态不是TIDYING状态或TERMINATED状态;


如果线程池状态是SHUTDOWN并且workerQueue为空;


workerCount为0;


设置TIDYING状态成功。


下图为线程池的状态转换过程:


ctl相关方法


这里还有几个对ctl进行计算的方法:


   

private static int runStateOf(int c)  { return c & ~CAPACITY; }

private static int workerCountOf(int c)  { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

   


runStateOf:获取运行状态;


workerCountOf:获取活动线程数;


ctlOf:获取运行状态和活动线程数的值。


ThreadPoolExecutor构造方法


public ThreadPoolExecutor(int corePoolSize,

  int maximumPoolSize,

  long keepAliveTime,

  TimeUnit unit,

  BlockingQueue<runnable> workQueue,

  ThreadFactory threadFactory,

  RejectedExecutionHandler handler) {

 if (corePoolSize < 0 ||

  maximumPoolSize <= 0 ||

  maximumPoolSize < corePoolSize ||

  keepAliveTime < 0)

  throw new IllegalArgumentException();

 if (workQueue == null || threadFactory == null || handler == null)

  throw new NullPointerException();

 this.corePoolSize = corePoolSize;

 this.maximumPoolSize = maximumPoolSize;

 this.workQueue = workQueue;

 this.keepAliveTime = unit.toNanos(keepAliveTime);

 this.threadFactory = threadFactory;

 this.handler = handler;

}</runnable>

   


构造方法中的字段含义如下:


corePoolSize:核心线程数量,当有新任务在execute()方法提交时,会执行以下判断:


如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;


如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;


如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;


如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;


所以,任务提交时,判断的顺序为 corePoolSize --> workQueue --> maximumPoolSize。


maximumPoolSize:最大线程数量;


workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列;


workQueue:保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池以后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式:


直接切换:这种方式常用的队列是SynchronousQueue,但现在还没有研究过该队列,这里暂时还没法介绍;


使用无界队列:一般使用基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不会起作用了(后面也会说到)。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。


使用有界队列:一般使用ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量。


如果要想降低系统资源的消耗(包括CPU的使用率,操作系统资源的消耗,上下文环境切换的开销等), 可以设置较大的队列容量和较小的线程池容量, 但这样也会降低线程处理任务的吞吐量。


如果提交的任务经常发生阻塞,那么可以考虑通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量。


如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样CPU的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。


keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;


threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。


handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:


AbortPolicy:直接抛出异常,这是默认策略;


CallerRunsPolicy:用调用者所在的线程来执行任务;


DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;


DiscardPolicy:直接丢弃任务;


execute方法


execute()方法用来提交任务,代码如下:


public void execute(Runnable command) {

 if (command == null)

  throw new NullPointerException();

 /*

  * clt记录着runState和workerCount

  */

 int c = ctl.get();

 /*

  * workerCountOf方法取出低29位的值,表示当前活动的线程数;

  * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;

  * 并把任务添加到该线程中。

  */

 if (workerCountOf(c) < corePoolSize) {

  /*

* addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;

* 如果为true,根据corePoolSize来判断;

* 如果为false,则根据maximumPoolSize来判断

*/

  if (addWorker(command, true))

return;

  /*

* 如果添加失败,则重新获取ctl值

*/

  c = ctl.get();

 }

 /*

  * 如果当前线程池是运行状态并且任务添加到队列成功

  */

 if (isRunning(c) && workQueue.offer(command)) {

  // 重新获取ctl值

  int recheck = ctl.get();

  // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,

  // 这时需要移除该command

  // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回

  if (! isRunning(recheck) && remove(command))

reject(command);

  /*

* 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法

* 这里传入的参数表示:

* 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;

* 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;

* 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。

*/

  else if (workerCountOf(recheck) == 0)

addWorker(null, false);

 }

 /*

  * 如果执行到这里,有两种情况:

  * 1. 线程池已经不是RUNNING状态;

  * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。

  * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;

  * 如果失败则拒绝该任务

  */

 else if (!addWorker(command, false))

  reject(command);

}

   


简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:


如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;


如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;


如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;


如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。


这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。


execute方法执行流程如下:


addWorker方法


addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,代码如下:


<div class="line number41 index        

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注编程语言JAVA频道!


本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 0
看完这篇文章有何感觉?已经有1人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程