今儿同事问了一个关于多线程的问题。一般的线程,在调用其start()
方法之后,会自动开始执行相应的逻辑,如果之后再次调用start()
,会怎么样?是否可以???每个线程其实是有多个状态的,刚刚创建之后,其处于NEW
状态,在执行完相应的run()
逻辑之后,其处理TERMINATED
状态。如果多次调用这个start()
方法,是否可以让逻辑执行多次呢?答案肯定是不行的(一个线程执行完自己的任务之后就被销毁了),所以多次执行start()
必然不靠谱,应该会报错。同事又问到:那jdk
线程池又是如何重用的呢?
jdk
内部的ThreadPoolExecutor
的实现
线程池内部相当于在跑一个while
循环,在不断的从阻塞队列里拿task
,之后调用task.run()
,此时相应Runnable
的run
方法就会被执行,执行完以后继续去拿,如此反复,整个线程其实一直没有执行完成,所以,其状态也不会变成TERMINATED
,就此实现了线程的重用。
java.util.concurrent.ThreadPoolExecutor
/** * Main run loop */public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); }}
这个ThreadPoolExecutor
内部的Worker
即之前创建出来的线程,在这里不断的获取task
,其runTask
方法如下:
/** * Runs a single task between before/after methods. */private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { /* * Ensure that unless pool is stopping, this thread * does not have its interrupt set. This requires a * double-check of state in case the interrupt was * cleared concurrently with a shutdownNow -- if so, * the interrupt is re-enabled. */ if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); /* * Track execution state to ensure that afterExecute * is called only if task completed or threw * exception. Otherwise, the caught runtime exception * will have been thrown by afterExecute itself, in * which case we don't want to call it again. */ boolean ran = false; beforeExecute(thread, task); try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); }}
注意task.run()
这句,即总结的,调用Runnable
的run
方法。
用一个小例子来分析下整个线程池的执行任务的过程:
public class ThreadPoolTest { public static void main(String[] args) { ExecutorService exec = Executors.newFixedThreadPool(1); //设置固定线程池大小为1 List<Runnable> list = new ArrayList<Runnable>(); for(int i=0;i<5;i++) list.add(new MyRunnable()); for(int i=0;i<5;i++) exec.execute(list.get(i)); }}class MyRunnable implements Runnable { @Override public void run() { System.out.println("..."); //可以在此加断点DEUBG }}
此时,exec
会不断的向线程池中添加任务。
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current <tt>RejectedExecutionHandler</tt>. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted * for execution * @throws NullPointerException if command is null */public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) { reject(command); // is shutdown or saturated } }}
看到这个Runnable
什么时候执行,是新创建线程执行,还是复用之前的线程,甚至是否需要拒绝请求。当前的线程池newFixedThreadPool(1)
是个固定大小的池,此时poolSize == corePoolSize
,所以此时的创建新线程的逻辑会返回false
。
/** * Creates and starts a new thread running firstTask as its first * task, only if fewer than corePoolSize threads are running * and the pool is not shut down. * @param firstTask the task the new thread should run first (or * null if none) * @return true if successful */private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true;}
所以,当前的command
就被放入阻塞队列中workQueue.offer(command)
,runTask
中的task
则是通过以下方法提到的:
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } }}
此处是明显的生产者-消费者模式,其中生产者是Client
,在外部不断的提交待执行的任务,消费者则是线程池内的线程,这两者之间通过阻塞队列workQueue
建立起了连接,一个生产,一个执行待执行的任务的run
方法将其消费。这时,线程中的线程是连续执行任务,还是会结束,在新任务出现时创建新的Worker
线程,是由所使用的线程池类型决定的,例如:
ExecutorService exec = Executors.new Executors.newFixedThreadPool(2); //设置固定线程池大小为2
线程池的固定线程数为2,其执行过程中是调用阻塞队列的take
方法,此时如果队列中没有任务,是会一直阻塞。而如果使用CachedTharedPool
,则会执行阻塞队列的poll
方法,根据定义的超时时间进行等待。Worker
在执行时,是执行阻塞队列的take
方法还是poll
方法,取决于timed
是否为true
,如下:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;............... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }
在使用CachedThreaPool
时,由于共corePoolSize
为0,所以每次执行时timed
为true
,此时执行阻塞队列的poll
方法,keepAliveTime=60000000000
后续返回时,此时由于队列中没有任务,所以timedOut=true
,所以,在后续执行时
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue;}
此处会return null。
联系客服