平时的业务中,如果要使用多线程,那么我们会在业务开始前创建线程,业务结束后,销毁线程。但是对于业务来说,线程的创建和销毁是与业务本身无关的,只关心线程所执行的任务。因此希望把尽可能多的cpu用在执行任务上面,而不是用在与业务无关的线程创建和销毁上面。而线程池则解决了这个问题,线程池的作用就是将线程进行复用。
类图
newFixedThreadPool 固定数量的线程池,线程池中的线程数量是固定的,不会改变。
newSingleThreadExecutor 单一线程池,线程池中只有一个线程。执行完成一个再执行另外一个,其实有点把多线程变成了单线程
newCachedThreadPool 缓存线程池,线程池中的线程数量不固定,会根据需求的大小进行改变。它的线程回自动释放默认时间是60s
newScheduledThreadPool 计划任务调度的线程池,用于执行计划任务,比如每隔5分钟怎么样,
newWorkStealingPool 创建一个 work-stealing 线程池,使用目前机器上可用的处理器作为它的并行级别。 jdk1.8新增的
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue());
- }
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue()));
- }
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue());
- }
- public static ExecutorService newWorkStealingPool() {
- return new ForkJoinPool
- (Runtime.getRuntime().availableProcessors(),
- ForkJoinPool.defaultForkJoinWorkerThreadFactory,
- null, true);
- }
ThreadPoolExecutor 构造函数中参数的含义。
corePoolSize 线程池中核心线程数的数目
maximumPoolSize 线程池中最多能容纳多少个线程
keepAliveTime 当现在线程数目大于corePoolSize时,超过keepAliveTime时间后,多出corePoolSize的那些线程将被终结。
unit keepAliveTime的单位
workQueue 当任务数量很大,线程池中线程无法满足时,提交的任务会被放到阻塞队列中,线程空闲下来则会不断从阻塞队列中取数据。
FixedThreadPool,它的线程的核心数目和最大容纳数目都是一样的,以至于在工作期间,并不会创建和销毁线程。当任务数量很大,线程池中的线程无法满足时,任务将被保存到LinkedBlockingQueue中,而LinkedBlockingQueue的大小是Integer.MAX_VALUE。这就意味着,任务不断地添加,会使内存消耗越来越大。
CachedThreadPool,它的核心线程数量是0,最大容纳数目是Integer.MAX_VALUE,它的阻塞队列是SynchronousQueue,这是一个特别的队列,它的大小是0。由于核心线程数量是0,所以必然要将任务添加到SynchronousQueue中,这个队列只有一个线程在从中添加数据,同时另一个线程在从中获取数据时,才能成功。独自往这个队列中添加数据会返回失败。当返回失败时,则线程池开始扩展线程,这就是为什么CachedThreadPool的线程数目是不固定的。当60s该线程仍未被使用时,线程则被销毁。
相关演示代码
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- public class ExecutorsDome {
- public static void main(String[] args) {
- //ExecutorService executorService = Executors.newCachedThreadPool();//默认是60s 的线程时间
- ExecutorService executorService = Executors.newFixedThreadPool(10);//固定数量的线程池,线程池中的线程数量是固定的,不会改变。
- // ExecutorService executorService = Executors.newSingleThreadExecutor();//线程池中只有一个线程,执行完成一个在执行另外一个,其实有点把多线程变成了单线程
- //ExecutorService executorService =Executors.newScheduledThreadPool(6);
- // ExecutorService executorService=Executors.newWorkStealingPool();
- executorService.execute(new ThreadPools(1));
- executorService.execute(new ThreadPools(2));
- executorService.execute(new ThreadPools(3));
- executorService.execute(new ThreadPools(4));
- executorService.execute(new ThreadPools(5));
- executorService.submit(new ThreadPools(6));
- }
- }
- class ThreadPools implements Runnable {
- private int i=0;
- public ThreadPools(int i){
- this.i=i;
- }
- public void run() {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- System.out.println("ThreadPools:"+i);
- }
- }
==============================
定时的demo
- public class ScheduledExecutorServiceDemo {
- public static void main(String[] args) throws Exception, ExecutionException {
- ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
- ScheduledFuture scheduledFuture = executorService.schedule(new Callable() {
- public String call() throws Exception {
- return "call";
- }
- }, 10, TimeUnit.SECONDS);
- System.out.println(scheduledFuture.get());
- executorService.shutdown();
- }
- }
结果是10s后返回"call"的结果
ThreadPoolExecutor的构造函数里面有一个参数是RejectedExecutionHandler,它是一个接口实现类有:AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy,这这个分别代表了4种策略
AbortPolicy:如果不能接受任务了,则抛出异常。默认拒绝策略是它
CallerRunsPolicy:如果不能接受任务了,则让调用的线程去完成。
DiscardOldestPolicy:如果不能接受任务了,则丢弃最老的一个任务,由一个队列来维护。
DiscardPolicy:如果不能接受任务了,则丢弃任务。
当然我们也可以自己实现RejectedExecutionHandler接口来自己定义拒绝策略。
fork/join(jdk1.7新增)框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。类似于ExecutorService接口的其他实现,fork/join框架会将任务分发给线程池中的工作线程。fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。
他的思想可以用一张图来了解
演示代码
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.Future;
- import java.util.concurrent.RecursiveTask;
- //1*1+2*2+3*3....100*100
- public class ForkJoinPoolDemo {
- public static void main(String[] args) {
- MyTask mt = new MyTask(1);
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- Future result = forkJoinPool.submit(mt);
- try {
- System.out.println(result.get());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- forkJoinPool.shutdown();
- A.a();
- }
- }
- //ForkJoin算法
- class MyTask extends RecursiveTask {
- int i;
- public MyTask(int i){
- this.i = i;
- }
- @Override
- protected Integer compute() {
- if (i >= 100) {
- return i * i;
- }
- MyTask newTask2 = new MyTask(i + 1);
- newTask2.fork();
- return i * i + newTask2.join();
- }
- }
- //普通算法
- class A {
- public static void a() {
- int j=0;
- for (int i = 1; i <= 100; i++) {
- j=j+(i*i);
- }
- System.out.println(j);
- }
- }
以上两种算法的结果是一样的都是 338350。更深入的了解请网上超资料。
联系客服