JAVA从入门到精通 线程池执行原理分析
小职 2018-02-27 来源 :网络 阅读 694 评论 0

摘要:本文将会围绕JAVA从入门到精通之线程池的生命周期,分析线程池执行任务的过程。

本文将会围绕JAVA从入门到精通之线程池的生命周期,分析线程池执行任务的过程。


线程池状态

首先认识两个贯穿线程池代码的参数:

runState:线程池运行状态

workerCount:工作线程的数量

线程池用一个32位的int来同时保存runState和workerCount,其中高3位是runState,其余29位是workerCount。代码中会反复使用runStateOf和workerCountOf来获取runState和workerCount。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池状态private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;

// ctl操作private static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }

RUNNING:可接收新任务,可执行等待队列里的任务

SHUTDOWN:不可接收新任务,可执行等待队列里的任务

STOP:不可接收新任务,不可执行等待队列里的任务,并且尝试终止所有在运行任务

TIDYING:所有任务已经终止,执行terminated()

TERMINATED:terminated()执行完成

线程池状态默认从RUNNING开始流转,到状态TERMINATED结束,中间不需要经过每一种状态,但不能让状态回退。下面是状态变化可能的路径和变化条件:

JAVA从入门到精通 线程池执行原理分析

图1 线程池状态变化路径


Worker的创建

线程池是由Worker类负责执行任务,Worker继承了AbstractQueuedSynchronizer,引出了Java并发框架的核心AQS。

AbstractQueuedSynchronizer,简称AQS,是Java并发包里一系列同步工具的基础实现,原理是根据状态位来控制线程的入队阻塞、出队唤醒来处理同步。

AQS不会在这里展开讨论,只需要知道Worker包装了Thread,由它去执行任务。

调用execute将会根据线程池的情况创建Worker,可以归纳出下图四种情况:

JAVA从入门到精通 线程池执行原理分析

图2 worker在线程池里的四种可能


public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();


    int c = ctl.get();

    //1

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

    //2

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            //3

            reject(command);

        else if (workerCountOf(recheck) == 0)

            //4

            addWorker(null, false);

    }

    //5

    else if (!addWorker(command, false))

        //6

        reject(command);

}

标记1对应第一种情况,要留意addWorker传入了core,core=true为corePoolSize,core=false为maximumPoolSize,新增时需要检查workerCount是否超过允许的最大值。

标记2对应第二种情况,检查线程池是否在运行,并且将任务加入等待队列。标记3再检查一次线程池状态,如果线程池忽然处于非运行状态,那就将等待队列刚加的任务删掉,再交给RejectedExecutionHandler处理。标记4发现没有worker,就先补充一个空任务的worker。

标记5对应第三种情况,等待队列不能再添加任务了,调用addWorker添加一个去处理。

标记6对应第四种情况,addWorker的core传入false,返回调用失败,代表workerCount已经超出maximumPoolSize,那就交给RejectedExecutionHandler处理。

private boolean addWorker(Runnable firstTask, boolean core) {

        //1

        retry:

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);


            // Check if queue empty only if necessary.

            if (rs >= SHUTDOWN &&

                ! (rs == SHUTDOWN &&

                   firstTask == null &&

                   ! workQueue.isEmpty()))

                return false;


            for (;;) {

                int wc = workerCountOf(c);

                if (wc >= CAPACITY ||

                    wc >= (core ? corePoolSize : maximumPoolSize))

                    return false;

                if (compareAndIncrementWorkerCount(c))

                    break retry;

                c = ctl.get();  // Re-read ctl

                if (runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop

            }

        }

        //2

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

            w = new Worker(firstTask);

            final Thread t = w.thread;

            if (t != null) {

                final ReentrantLock mainLock = this.mainLock;

                mainLock.lock();

                try {

                    // Recheck while holding lock.

                    // Back out on ThreadFactory failure or if

                    // shut down before lock acquired.

                    int rs = runStateOf(ctl.get());


                    if (rs < SHUTDOWN ||

                        (rs == SHUTDOWN && firstTask == null)) {

                        if (t.isAlive()) // precheck that t is startable

                            throw new IllegalThreadStateException();

                        workers.add(w);

                        int s = workers.size();

                        if (s > largestPoolSize)

                            largestPoolSize = s;

                        workerAdded = true;

                    }

                } finally {

                    mainLock.unlock();

                }

                if (workerAdded) {

                    t.start();

                    workerStarted = true;

                }

            }

        } finally {

            if (! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

    }

标记1的第一段代码,目的很简单,是为workerCount加一。至于为什么代码写了这么长,是因为线程池的状态在不断变化,并发环境下需要保证变量的同步性。外循环判断线程池状态、任务非空和队列非空,内循环使用CAS机制保证workerCount正确地递增。不了解CAS可以看认识非阻塞的同步机制CAS,后续增减workerCount都会使用CAS。

标记2的第二段代码,就比较简单。创建一个新Worker对象,将Worker添加进workers里(Set集合)。成功添加后,启动worker里的线程。在finally里判断线程是否启动成功,不成功直接调用addWorkerFailed。

private void addWorkerFailed(Worker w) {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            if (w != null)

                workers.remove(w);

            decrementWorkerCount();

            tryTerminate();

        } finally {

            mainLock.unlock();

        }

    }

addWorkerFailed将减少已经递增的workerCount,并且调用tryTerminate结束线程池。


Worker的执行

Worker(Runnable firstTask) {

    setState(-1); // inhibit interrupts until runWorker

    this.firstTask = firstTask;

    this.thread = getThreadFactory().newThread(this);

}

public void run() {

    runWorker(this);

}

Worker在构造函数里采用ThreadFactory创建Thread,在run方法里调用了runWorker,看来是真正执行任务的地方。

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); // allow interrupts

    boolean completedAbruptly = true;

    try {

       //1

        while (task != null || (task = getTask()) != null) {

            w.lock();

           //2

            if ((runStateAtLeast(ctl.get(), STOP) ||

                 (Thread.interrupted() &&

                  runStateAtLeast(ctl.get(), STOP))) &&

                !wt.isInterrupted())

                wt.interrupt();

            try {

               //3

                beforeExecute(wt, task);

                Throwable thrown = null;

                try {

                    task.run();

                } catch (RuntimeException x) {

                    thrown = x; throw x;

                } catch (Error x) {

                    thrown = x; throw x;

                } catch (Throwable x) {

                    thrown = x; throw new Error(x);

                } finally {

                    afterExecute(task, thrown);

                }

            } finally {

                task = null;

                 //4

                w.completedTasks++;

                w.unlock();

            }

        }

        completedAbruptly = false;       //5

    } finally {

        //6

        processWorkerExit(w, completedAbruptly);

    }

}

标记1进入循环,从getTask获取要执行的任务,直到返回null。这里达到了线程复用的效果,让线程处理多个任务。

标记2是一个比较复杂的判断,保证了线程池在STOP状态下线程是中断的,非STOP状态下线程没有被中断。如果你不了解Java的中断机制,看如何正确结束Java线程这篇。

标记3调用了run方法,真正执行了任务。执行前后提供了beforeExecute和afterExecute两个方法,由子类实现。

标记4里的completedTasks统计worker执行了多少任务,最后累加进completedTaskCount变量,可以调用相应方法返回一些统计信息。

标记5的变量completedAbruptly表示worker是否异常终止,执行到这里代表执行正常,后续的方法需要这个变量。

标记6调用processWorkerExit结束,后面会分析。

接着来看worker从等待队列获取任务的getTask方法:

private Runnable getTask() {

    boolean timedOut = false; // Did the last poll() time out?


    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);


        //1

        // Check if queue empty only if necessary.

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

            decrementWorkerCount();

            return null;

        }


        int wc = workerCountOf(c);

        //2

        // Are workers subject to culling?

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))

            && (wc > 1 || workQueue.isEmpty())) {

            if (compareAndDecrementWorkerCount(c))

                return null;

            continue;

        }

       //3

        try {

            Runnable r = timed ?

                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                workQueue.take();

            if (r != null)

                return r;

            timedOut = true;

        } catch (InterruptedException retry) {

            timedOut = false;

        }

    }

}

标记1检查线程池的状态,这里就体现出SHUTDOWN和STOP的区别。如果线程池是SHUTDOWN状态,还会先处理完等待队列的任务;如果是STOP状态,就不再处理等待队列里的任务了。

标记2先看allowCoreThreadTimeOut这个变量,false时worker空闲,也不会结束;true时,如果worker空闲超过keepAliveTime,就会结束。接着是一个很复杂的判断,好难转成文字描述,自己看吧。注意一下wc>maximumPoolSize,出现这种可能是在运行中调用setMaximumPoolSize,还有wc>1,在等待队列非空时,至少保留一个worker。

标记3是从等待队列取任务的逻辑,根据timed分为等待keepAliveTime或者阻塞直到有任务。

最后来看结束worker需要执行的操作:

private void processWorkerExit(Worker w, boolean completedAbruptly) {

   //1

    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

        decrementWorkerCount();


  //2

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        completedTaskCount += w.completedTasks;

        workers.remove(w);

    } finally {

        mainLock.unlock();

    }


   //3

    tryTerminate();


    int c = ctl.get();

    //4

    if (runStateLessThan(c, STOP)) {

        if (!completedAbruptly) {

            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

            if (min == 0 && ! workQueue.isEmpty())

                min = 1;

            if (workerCountOf(c) >= min)

                return; // replacement not needed

        }

        addWorker(null, false);

    }

}

正常情况下,在getTask里就会将workerCount减一。标记1处用变量completedAbruptly判断worker是否异常退出,如果是,需要补充对workerCount的减一。

标记2将worker处理任务的数量累加到总数,并且在集合workers中去除。

标记3尝试终止线程池,后续会研究。

标记4处理线程池还是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker。如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。

总结一下worker:线程池启动后,worker在池内创建,包装了提交的Runnable任务并执行,执行完就等待下一个任务,不再需要时就结束。


线程池的关闭

线程池的关闭不是一关了事,worker在池里处于不同状态,必须安排好worker的”后事”,才能真正释放线程池。ThreadPoolExecutor提供两种方法关闭线程池:

shutdown:不能再提交任务,已经提交的任务可继续运行;

shutdownNow:不能再提交任务,已经提交但未执行的任务不能运行,在运行的任务可继续运行,但会被中断,返回已经提交但未执行的任务。

public void shutdown() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        checkShutdownAccess();   //1 安全策略机制

        advanceRunState(SHUTDOWN);   //2

        interruptIdleWorkers();   //3

        onShutdown(); //4 空方法,子类实现

    } finally {

        mainLock.unlock();

    }

    tryTerminate();   //5

}

shutdown将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。

public List<Runnable> shutdownNow() {

    List<Runnable> tasks;

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        checkShutdownAccess();

        advanceRunState(STOP);

        interruptWorkers();

        tasks = drainQueue();  //1

    } finally {

        mainLock.unlock();

    }

    tryTerminate();

    return tasks;

}

shutdownNow和shutdown类似,将线程池切换为STOP状态,中断目标是所有worker。drainQueue会将等待队列里未执行的任务返回。

interruptIdleWorkers和interruptWorkers实现原理都是遍历workers集合,中断条件符合的worker。

上面的代码多次出现调用tryTerminate,这是一个尝试将线程池切换到TERMINATED状态的方法。

final void tryTerminate() {

    for (;;) {

        int c = ctl.get();

        //1

        if (isRunning(c) ||

            runStateAtLeast(c, TIDYING) ||

            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

            return;

        //2

        if (workerCountOf(c) != 0) { // Eligible to terminate

            interruptIdleWorkers(ONLY_ONE);

            return;

        }

       //3

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {

                try {

                    terminated();

                } finally {

                    ctl.set(ctlOf(TERMINATED, 0));

                    termination.signalAll();

                }

                return;

            }

        } finally {

            mainLock.unlock();

        }

        // else retry on failed CAS

    }

}

标记1检查线程池状态,下面几种情况,后续操作都没有必要,直接return。

RUNNING(还在运行,不能停)

TIDYING或TERMINATED(已经没有在运行的worker)

SHUTDOWN并且等待队列非空(执行完才能停)

标记2在worker非空的情况下又调用了interruptIdleWorkers,你可能疑惑在shutdown时已经调用过了,为什么又调用,而且每次只中断一个空闲worker?你需要知道,shutdown时worker可能在执行中,执行完阻塞在队列的take,不知道要结束,所有要补充调用interruptIdleWorkers。每次只中断一个是因为processWorkerExit时,还会执行tryTerminate,自动中断下一个空闲的worker。

标记3是最终的状态切换。线程池会先进入TIDYING状态,再进入TERMINATED状态,中间提供了terminated这个空方法供子类实现。

调用关闭线程池方法后,需要等待线程池切换到TERMINATED状态。awaitTermination检查限定时间内线程池是否进入TERMINATED状态,代码如下:

public boolean awaitTermination(long timeout, TimeUnit unit)

    throws InterruptedException {

    long nanos = unit.toNanos(timeout);

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        for (;;) {

            if (runStateAtLeast(ctl.get(), TERMINATED))

                return true;

            if (nanos <= 0)

                return false;

            nanos = termination.awaitNanos(nanos);

        }

    } finally {

        mainLock.unlock();

    }

}


后言

以上过了一遍线程池主要的逻辑,总体来看线程池的设计是很清晰的。如有错误或不足,欢迎指出,也欢迎留言交流。今次介绍了线程池运行的生命周期,下篇会研究更细粒度地控制任务的生命周期,也就是submit和Future。


至此,关于Java线程池执行原理分析讲解完毕,欢迎大家继续关注!更多关于Java线程池执行原理分析的内容请关注职坐标Java频道!

本文由 @小职 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 0
看完这篇文章有何感觉?已经有1人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程