打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
话.线程池实现原理

今儿同事问了一个关于多线程的问题。一般的线程,在调用其start()方法之后,会自动开始执行相应的逻辑,如果之后再次调用start(),会怎么样?是否可以???每个线程其实是有多个状态的,刚刚创建之后,其处于NEW状态,在执行完相应的run()逻辑之后,其处理TERMINATED状态。如果多次调用这个start()方法,是否可以让逻辑执行多次呢?答案肯定是不行的(一个线程执行完自己的任务之后就被销毁了),所以多次执行start()必然不靠谱,应该会报错。同事又问到:那jdk线程池又是如何重用的呢?

jdk内部的ThreadPoolExecutor的实现

线程池内部相当于在跑一个while循环,在不断的从阻塞队列里拿task,之后调用task.run(),此时相应Runnablerun方法就会被执行,执行完以后继续去拿,如此反复,整个线程其实一直没有执行完成,所以,其状态也不会变成TERMINATED,就此实现了线程的重用。

java.util.concurrent.ThreadPoolExecutor

/** * Main run loop */public void run() {    try {        Runnable task = firstTask;        firstTask = null;        while (task != null || (task = getTask()) != null) {            runTask(task);            task = null;        }    } finally {        workerDone(this);    }}

这个ThreadPoolExecutor内部的Worker即之前创建出来的线程,在这里不断的获取task,其runTask方法如下:

/** * Runs a single task between before/after methods. */private void runTask(Runnable task) {    final ReentrantLock runLock = this.runLock;    runLock.lock();    try {        /*         * Ensure that unless pool is stopping, this thread         * does not have its interrupt set. This requires a         * double-check of state in case the interrupt was         * cleared concurrently with a shutdownNow -- if so,         * the interrupt is re-enabled.         */        if (runState < STOP &&            Thread.interrupted() &&            runState >= STOP)            thread.interrupt();        /*         * Track execution state to ensure that afterExecute         * is called only if task completed or threw         * exception. Otherwise, the caught runtime exception         * will have been thrown by afterExecute itself, in         * which case we don't want to call it again.         */        boolean ran = false;        beforeExecute(thread, task);        try {            task.run();            ran = true;            afterExecute(task, null);            ++completedTasks;        } catch (RuntimeException ex) {            if (!ran)                afterExecute(task, ex);            throw ex;        }    } finally {        runLock.unlock();    }}

注意task.run()这句,即总结的,调用Runnablerun方法。

用一个小例子来分析下整个线程池的执行任务的过程:

public class ThreadPoolTest {    public static void main(String[] args) {        ExecutorService exec = Executors.newFixedThreadPool(1); //设置固定线程池大小为1        List<Runnable> list = new ArrayList<Runnable>();        for(int i=0;i<5;i++) list.add(new MyRunnable());        for(int i=0;i<5;i++) exec.execute(list.get(i));    }}class MyRunnable implements Runnable {    @Override    public void run() {        System.out.println("..."); //可以在此加断点DEUBG    }}

此时,exec会不断的向线程池中添加任务。

/** * Executes the given task sometime in the future.  The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current <tt>RejectedExecutionHandler</tt>. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted * for execution * @throws NullPointerException if command is null */public void execute(Runnable command) {    if (command == null) throw new NullPointerException();    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {        if (runState == RUNNING && workQueue.offer(command)) {            if (runState != RUNNING || poolSize == 0)                ensureQueuedTaskHandled(command);        } else if (!addIfUnderMaximumPoolSize(command)) {            reject(command); // is shutdown or saturated        }    }}

看到这个Runnable什么时候执行,是新创建线程执行,还是复用之前的线程,甚至是否需要拒绝请求。当前的线程池newFixedThreadPool(1)是个固定大小的池,此时poolSize == corePoolSize,所以此时的创建新线程的逻辑会返回false

/** * Creates and starts a new thread running firstTask as its first * task, only if fewer than corePoolSize threads are running * and the pool is not shut down. * @param firstTask the task the new thread should run first (or * null if none) * @return true if successful */private boolean addIfUnderCorePoolSize(Runnable firstTask) {    Thread t = null;    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        if (poolSize < corePoolSize && runState == RUNNING)            t = addThread(firstTask);    } finally {        mainLock.unlock();    }    if (t == null)        return false;    t.start();    return true;}

所以,当前的command就被放入阻塞队列中workQueue.offer(command)runTask中的task则是通过以下方法提到的:

Runnable getTask() {    for (;;) {        try {            int state = runState;            if (state > SHUTDOWN)                return null;            Runnable r;            if (state == SHUTDOWN)  // Help drain queue                r = workQueue.poll();            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);            else                r = workQueue.take();            if (r != null)                return r;            if (workerCanExit()) {                if (runState >= SHUTDOWN) // Wake up others                    interruptIdleWorkers();                return null;            }            // Else retry        } catch (InterruptedException ie) {            // On interruption, re-check runState        }    }}

此处是明显的生产者-消费者模式,其中生产者是Client,在外部不断的提交待执行的任务,消费者则是线程池内的线程,这两者之间通过阻塞队列workQueue建立起了连接,一个生产,一个执行待执行的任务的run方法将其消费。这时,线程中的线程是连续执行任务,还是会结束,在新任务出现时创建新的Worker线程,是由所使用的线程池类型决定的,例如:

ExecutorService exec = Executors.new Executors.newFixedThreadPool(2); //设置固定线程池大小为2

线程池的固定线程数为2,其执行过程中是调用阻塞队列的take方法,此时如果队列中没有任务,是会一直阻塞。而如果使用CachedTharedPool,则会执行阻塞队列的poll方法,根据定义的超时时间进行等待。Worker在执行时,是执行阻塞队列的take方法还是poll方法,取决于timed是否为true,如下:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;...............  try {      Runnable r = timed ?          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :          workQueue.take();      if (r != null)          return r;      timedOut = true;  } catch (InterruptedException retry) {      timedOut = false;  }

在使用CachedThreaPool时,由于共corePoolSize为0,所以每次执行时timedtrue,此时执行阻塞队列的poll方法,keepAliveTime=60000000000后续返回时,此时由于队列中没有任务,所以timedOut=true,所以,在后续执行时

if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {    if (compareAndDecrementWorkerCount(c))        return null;    continue;}

此处会return null。

OK,今儿就先到这儿了 :)

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
Java线程池架构原理和源码解析(ThreadPoolExecutor)
Java Executor 框架
多线程技术原理
线程重用——线程池的基本原理
Java并发编程:线程池的使用
深入理解Java之线程池(中)
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服