博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ThreadPoolExecutor源码分析
阅读量:6964 次
发布时间:2019-06-27

本文共 23012 字,大约阅读时间需要 76 分钟。

由于工作需要,需要进行并行查询数据库,返回结果后进行计算,也就是说各个线程要全部运行完毕,才能进行下一步的计算,这时候要用到CountDownLatch。

先上代码,实现的比较简单,但是这不是重点,哈哈哈,主要是分析Executor相关的原理。
线程池相关参数及含义等可参考下面这篇文章,内部也有部分源码分析,但是个人认为不够详尽。

package com.gome.mars.utils;import java.util.concurrent.*;/** * @ClassName * @Description TODO * @Author oo * @Date 2018/12/10 18:00 * @Version 1.0 **/public class SimpleParallelTaskExecutor {    private CountDownLatch countDownLatch;    // 此处实现了固定大小的线程池,可根据需要进行其他实现,每次不再新建线程池实例    private static ExecutorService executor=Executors.newFixedThreadPool(20);    //构造方法参数为并行线程的数量,并且每次new CountDownLatch对象,因为不可重复使用    public SimpleParallelTaskExecutor(Integer nThreads) {        this.countDownLatch = new CountDownLatch(nThreads);    }    //调用此方法向线程池中添加任务,此处对Callable进行了简单包装,为了执行完任务调用countDownLatch.countDown();    public 
Future
addTask(Callable
task) throws Exception { return executor.submit(new WrapperThread
(task,countDownLatch)); } //可设置超时时间,检查任务是否运行完毕 public boolean checkDone(long milliseconds) throws InterruptedException { return countDownLatch.await(milliseconds, TimeUnit.MILLISECONDS); } //Callable包装类,为了执行完任务调用countDownLatch.countDown(); public class WrapperThread
implements Callable
{ private Callable
callable; private CountDownLatch countDownLatch; public WrapperThread(Callable callable, CountDownLatch countDownLatch) { this.callable = callable; this.countDownLatch = countDownLatch; } @Override public V call() throws Exception { //此处直接调用callable.call();和直接调用thread.run()类似,没有起新的线程此处和加入的任务内部是同一个线程。 V call = callable.call(); this.countDownLatch.countDown(); return call; } } public static void main(String[] args) throws Exception { SimpleParallelTaskExecutor simpleParallelTaskExecutor = new SimpleParallelTaskExecutor(2); Future
integerFuture = simpleParallelTaskExecutor.addTask(() -> { //此处模拟执行数据查询等任务 Thread.sleep(2000); return 1; }); Future
integerFuture1 = simpleParallelTaskExecutor.addTask(() -> { Thread.sleep(1000); return 2; }); simpleParallelTaskExecutor.checkDone(3000); Integer integer = integerFuture.get(); Integer integer1 = integerFuture1.get(); System.out.println(integer); System.out.println(integer1); }}复制代码

我们在addTask方法处打断点,进入submit方法,可以看出我们进入的是AbstractExecutorService类的submit方法,此方法接受一个callable对象,返回Future对象,我们可以在Future中获取执行结果。

/**     * @throws RejectedExecutionException {@inheritDoc}     * @throws NullPointerException       {@inheritDoc}     */    public 
Future
submit(Callable
task) { if (task == null) throw new NullPointerException(); RunnableFuture
ftask = newTaskFor(task); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future
submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture
ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public
Future
submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture
ftask = newTaskFor(task, result); execute(ftask); return ftask; }复制代码

submit方法有几个重载方法,都是通过NewTaskFor方法,将任务包装成一个RunnableFuture对象,只不过Runnable没有返回结果,结果类型为null。

下面我们看看NewTaskFor都做了些什么事情。

/**     * Returns a {@code RunnableFuture} for the given runnable and default     * value.     *     * @param runnable the runnable task being wrapped     * @param value the default value for the returned future     * @param 
the type of the given value * @return a {@code RunnableFuture} which, when run, will run the * underlying runnable and which, as a {@code Future}, will yield * the given value as its result and provide for cancellation of * the underlying task * @since 1.6 */ protected
RunnableFuture
newTaskFor(Runnable runnable, T value) { return new FutureTask
(runnable, value); } /** * Returns a {@code RunnableFuture} for the given callable task. * * @param callable the callable task being wrapped * @param
the type of the callable's result * @return a {@code RunnableFuture} which, when run, will call the * underlying callable and which, as a {@code Future}, will yield * the callable's result as its result and provide for * cancellation of the underlying task * @since 1.6 */ protected
RunnableFuture
newTaskFor(Callable
callable) { return new FutureTask
(callable); }复制代码

可以看出,直接创建了FutureTask对象,并且返回。所以,加入线程池的任务都被包装成FutureTask对象,没有返回值的返回值为空。

下面主要看execute(ftask);方法
ThreadPoolExecutor中的execute方法。 用一个32位数的高3位表示线程池状态,低29位表示正在运行的线程数量

public class ThreadPoolExecutor extends AbstractExecutorService {    //初始状态为Running状态且运行线程数为0,所以是-1和0按位取或    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    //32-3为29    private static final int COUNT_BITS = Integer.SIZE - 3;    //1左移29位再减1,低29位全为1,高位位0;    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    // runState is stored in the high-order bits    private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP       =  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS;    // Packing and unpacking ctl    //低29位取反都为0,高三位都为1,再和c进行按位与,只留下高三位,从而获取线程池状态    private static int runStateOf(int c)     { return c & ~CAPACITY; }    //获取工作线程数量,与上面类似,取得低29位。    private static int workerCountOf(int c)  { return c & CAPACITY; }    private static int ctlOf(int rs, int wc) { return rs | wc; }}复制代码

在程序运行中,反复使用了这几个方法,用来获取工作线程数或线程池状态

/**     * Executes the given task sometime in the future.  The task     * may execute in a new thread or in an existing pooled thread.     *     * If the task cannot be submitted for execution, either because this     * executor has been shutdown or because its capacity has been reached,     * the task is handled by the current {@code RejectedExecutionHandler}.     *     * @param command the task to execute     * @throws RejectedExecutionException at discretion of     *         {@code RejectedExecutionHandler}, if the task     *         cannot be accepted for execution     * @throws NullPointerException if {@code command} is null     */    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         * Proceed in 3 steps:         *         * 1. If fewer than corePoolSize threads are running, try to         * start a new thread with the given command as its first         * task.  The call to addWorker atomically checks runState and         * workerCount, and so prevents false alarms that would add         * threads when it shouldn't, by returning false.         *         * 2. If a task can be successfully queued, then we still need         * to double-check whether we should have added a thread         * (because existing ones died since last checking) or that         * the pool shut down since entry into this method. So we         * recheck state and if necessary roll back the enqueuing if         * stopped, or start a new thread if there are none.         *         * 3. If we cannot queue task, then we try to add a new         * thread.  If it fails, we know we are shut down or saturated         * and so reject the task.         */        //获取c的值,判断工作线程数是否小于设定的核心线程数        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            //如果小于核心线程数,直接新建线程,新建成功则返回            if (addWorker(command, true))                return;            c = ctl.get();        }        //如果上变判断不成立,那么工作线程数量大于等于核心线程数,        //或者新建线程没有成功,那么如果线程池接受新任务,则往队列里添加一个任务        //如果线程池中工作线程数已经达到了核心线程数,并且没有设置空闲销毁时间        //新任务到来就只是向阻塞队列里增加一条新任务,线程池是如何拿到这个任务并执行的呢?        if (isRunning(c) && workQueue.offer(command)) {            //再次检查,防止并发状态下,线程池状态等有变化            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }复制代码

重点方法是addWorker方法下面进一步分析

/**     * Checks if a new worker can be added with respect to current     * pool state and the given bound (either core or maximum). If so,     * the worker count is adjusted accordingly, and, if possible, a     * new worker is created and started, running firstTask as its     * first task. This method returns false if the pool is stopped or     * eligible to shut down. It also returns false if the thread     * factory fails to create a thread when asked.  If the thread     * creation fails, either due to the thread factory returning     * null, or due to an exception (typically OutOfMemoryError in     * Thread.start()), we roll back cleanly.     *     * @param firstTask the task the new thread should run first (or     * null if none). Workers are created with an initial first task     * (in method execute()) to bypass queuing when there are fewer     * than corePoolSize threads (in which case we always start one),     * or when the queue is full (in which case we must bypass queue).     * Initially idle threads are usually created via     * prestartCoreThread or to replace other dying workers.     *     * @param core if true use corePoolSize as bound, else     * maximumPoolSize. (A boolean indicator is used here rather than a     * value to ensure reads of fresh values after checking other pool     * state).     * @return true if successful     */    //第二个参数表示是否是核心线程,比较工作线程数目时,分别和corePoolSize 或者maximumPoolSize进行比较    private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            //此处为何还要有第二个判断?            //我们分析一下当rs==SHUTDOWN 时,什么情况会返回false            //1.线程池shutdown, 此时firstTask为null 并且workQueue为空时            //2.线程池shutdown,firstTask不为null这时 workQueue状态已经没有用了            //SHUTDOWN状态虽然不接受新任务,但是队列里的任务会执行完,            //也就是说当线程池为SHUTDOWN时,为了执行完队列中的任务,            //会不断添加firstTask为null的任务,firstTask为null代表要取队列中的任务            //第一种情况表示队列中的任务已经清空了,无需再循环了,线程池可能将要进入stop状态了            //第二种情况表示新任务到来,线程池已经不再接受了,所以返回false            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;            //内层循环主要工作就是cas为增加一个工作线程            for (;;) {                int wc = workerCountOf(c);                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                //增加成功就退出外层循环                if (compareAndIncrementWorkerCount(c))                    break retry;                c = ctl.get();  // Re-read ctl                //线程池状态状态有变化就继续执行外层循环                if (runStateOf(c) != rs)                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            //包含一个Thread对象,传入的是Worker对象本身,后边有详细解释。            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int rs = runStateOf(ctl.get());                    //正常运行状态或者队列中还有未执行的任务                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    //如果线程添加成功,就启动线程,执行任务。实际是执行runWorker方法。                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }复制代码

Worker实现了Runnable接口,继承AbstractQueuedSynchronizer类,所以本身就是一个线程类,有自己的run方法。

/**     * Class Worker mainly maintains interrupt control state for     * threads running tasks, along with other minor bookkeeping.     * This class opportunistically extends AbstractQueuedSynchronizer     * to simplify acquiring and releasing a lock surrounding each     * task execution.  This protects against interrupts that are     * intended to wake up a worker thread waiting for a task from     * instead interrupting a task being run.  We implement a simple     * non-reentrant mutual exclusion lock rather than use     * ReentrantLock because we do not want worker tasks to be able to     * reacquire the lock when they invoke pool control methods like     * setCorePoolSize.  Additionally, to suppress interrupts until     * the thread actually starts running tasks, we initialize lock     * state to a negative value, and clear it upon start (in     * runWorker).     */    private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {        /**         * This class will never be serialized, but we provide a         * serialVersionUID to suppress a javac warning.         */        private static final long serialVersionUID = 6138294804551838833L;        /** Thread this worker is running in.  Null if factory fails. */        final Thread thread;        /** Initial task to run.  Possibly null. */        Runnable firstTask;        /** Per-thread task counter */        volatile long completedTasks;        /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */        Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            //新建Thread对象,传入自身            this.thread = getThreadFactory().newThread(this);        }        /** Delegates main run loop to outer runWorker  */        //当调用Thread.start()时,新线程启动,调用runWorker方法,传入自身        public void run() {            runWorker(this);        }        // Lock methods        //        // The value 0 represents the unlocked state.        // The value 1 represents the locked state.        protected boolean isHeldExclusively() {            return getState() != 0;        }        protected boolean tryAcquire(int unused) {            if (compareAndSetState(0, 1)) {                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            return false;        }        protected boolean tryRelease(int unused) {            setExclusiveOwnerThread(null);            setState(0);            return true;        }        public void lock()        { acquire(1); }        public boolean tryLock()  { return tryAcquire(1); }        public void unlock()      { release(1); }        public boolean isLocked() { return isHeldExclusively(); }        void interruptIfStarted() {            Thread t;            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                }            }        }    }复制代码

runWorker方法是真正执行用户传进来的任务的地方,并且可以重写beforeExecute以及afterExecute方法,再任务执行前后加入自定义操作。

/**     * Main worker run loop.  Repeatedly gets tasks from queue and     * executes them, while coping with a number of issues:     *     * 1. We may start out with an initial task, in which case we     * don't need to get the first one. Otherwise, as long as pool is     * running, we get tasks from getTask. If it returns null then the     * worker exits due to changed pool state or configuration     * parameters.  Other exits result from exception throws in     * external code, in which case completedAbruptly holds, which     * usually leads processWorkerExit to replace this thread.     *     * 2. Before running any task, the lock is acquired to prevent     * other pool interrupts while the task is executing, and then we     * ensure that unless pool is stopping, this thread does not have     * its interrupt set.     *     * 3. Each task run is preceded by a call to beforeExecute, which     * might throw an exception, in which case we cause thread to die     * (breaking loop with completedAbruptly true) without processing     * the task.     *     * 4. Assuming beforeExecute completes normally, we run the task,     * gathering any of its thrown exceptions to send to afterExecute.     * We separately handle RuntimeException, Error (both of which the     * specs guarantee that we trap) and arbitrary Throwables.     * Because we cannot rethrow Throwables within Runnable.run, we     * wrap them within Errors on the way out (to the thread's     * UncaughtExceptionHandler).  Any thrown exception also     * conservatively causes thread to die.     *     * 5. After task.run completes, we call afterExecute, which may     * also throw an exception, which will also cause thread to     * die. According to JLS Sec 14.20, this exception is the one that     * will be in effect even if task.run throws.     *     * The net effect of the exception mechanics is that afterExecute     * and the thread's UncaughtExceptionHandler have as accurate     * information as we can provide about any problems encountered by     * user code.     *     * @param w the worker     */    final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            //如果task!=null 就执行当前任务(工作线程数小于核心现场数),            //如果为null(队列中有任务)就在队列中获取一个任务,            //此处可以看出,如果队列中有任务,会一直while循环,直到队列为空,            //队列为空时,由于是阻塞队列,线程将阻塞在这里,直到又有任务添加进队列            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    //空方法                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        //真正执行我们的任务                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        //空方法                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            //此方法用于处理当前工作线程退出的相关事宜            processWorkerExit(w, completedAbruptly);        }    }    复制代码

当此工作线程退出以后,相关清理及记录工作,当程序抛出异常,或队列不为空而没有工作线程时或工作线程数少于核心线程数时,会继续addWorker(null, false);替换当前工作线程。

/**     * Performs cleanup and bookkeeping for a dying worker. Called     * only from worker threads. Unless completedAbruptly is set,     * assumes that workerCount has already been adjusted to account     * for exit.  This method removes thread from worker set, and     * possibly terminates the pool or replaces the worker if either     * it exited due to user task exception or if fewer than     * corePoolSize workers are running or queue is non-empty but     * there are no workers.     *     * @param w the worker     * @param completedAbruptly if the worker died due to user exception     */    private void processWorkerExit(Worker w, boolean completedAbruptly) {        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted            decrementWorkerCount();        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            completedTaskCount += w.completedTasks;            workers.remove(w);        } finally {            mainLock.unlock();        }        //一个工作线程退出后会尝试终止线程池,通过判断当前线程池的状态,如果终止成功则不会进入下边的if判断,否则进入if判断        tryTerminate();        int c = ctl.get();        //判断线程池是否是running或shutdown状态,再次判断completedAbruptly,        //这个变量表示是否被打断,正常执行完毕一般为false,如果满足是就继续判断是否继续addWorker        if (runStateLessThan(c, STOP)) {            if (!completedAbruptly) {                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;                if (min == 0 && ! workQueue.isEmpty())                    min = 1;                //1.如果设置了allowCoreThreadTimeOut 参数且队列为非空,则工作线程数为0的时候才addWorker                //如果设置了allowCoreThreadTimeOut 且队列为空,则直接返回,不addWorker                //如果没有设置allowCoreThreadTimeOut ,只要工作线程数小于核心线程数,都addWorker                if (workerCountOf(c) >= min)                    return; // replacement not needed            }            //关键这在这,加入一条firstTask为null的非核心线程任务,            addWorker(null, false);        }    }复制代码

总结

线程池执行过程:

  1. submit(Callable task)方法后,将task包装成一个FutureTask对象;
  2. 执行execute(ftask)方法
    if(工作线程数<核心线程数){
    addWorker(Runnable firstTask, boolean core)
    添加成功 return;
    }
    核心线程数已经达到最大
    if(线程池是running状态){
    向队列中添加一个任务
    workQueue.offer(command)
    }
  3. addWorker(Runnable firstTask, boolean core)
    自旋尝试改变workerCount数量
    compareAndIncrementWorkerCount(c)
    成功
    new Worker(firstTask);
    并启动线程
  4. runWorker(Worker w)
    while(如果当前Worker中task!=null 执行此任务
    否则从队列中task = getTask())
    调用task.run();
    队列为空时getTask()方法中根据参数设定判断何时返回null何时阻塞
  5. 当一个工作线程退出后执行processWorkerExit(w, completedAbruptly);方法中还会尝试终止线程池,如果线程池终止成功,则直接return
    否则判断是否继续addWorker(null, false)替换当前线程

over

有不正确的地方欢迎指正!

转载地址:http://dxwsl.baihongyu.com/

你可能感兴趣的文章
2017-10-07 前端日报
查看>>
全面降低windows系统的安全隐患(一)[Web安全大家谈]
查看>>
mysql 常用函数
查看>>
可变长参数列表误区与陷阱——va_arg不可接受的类型
查看>>
说说自己对nofollow标签的一些看法
查看>>
通过URL参数请求不同的后端服务器
查看>>
MySQL 可以用localhost 连接,但不能用IP连接的问题
查看>>
linux学习(之二)-初识linux的一些常用命令
查看>>
linux基础系统管理---系统管理
查看>>
C++学习笔记——类
查看>>
Linux命令之chkconfig
查看>>
JVMTI开发教程之一个简单的Agent
查看>>
Git学习笔记
查看>>
Developer Express 之 XtraReport报表预览控件PrintControl设置
查看>>
linux修复丢失的分区表
查看>>
iOS 开发遇到的问题
查看>>
单臂路由的实现
查看>>
还有人不认识通讯诈骗,短信验证码带你认识一下
查看>>
Docker(四)镜像创建
查看>>
unigui的UnimDatePicker控件使用经验
查看>>