打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
图灵学院:java高并发之Executor
concurrent Executor


一:线程池的用途

平时的业务中,如果要使用多线程,那么我们会在业务开始前创建线程,业务结束后,销毁线程。但是对于业务来说,线程的创建和销毁是与业务本身无关的,只关心线程所执行的任务。因此希望把尽可能多的cpu用在执行任务上面,而不是用在与业务无关的线程创建和销毁上面。而线程池则解决了这个问题,线程池的作用就是将线程进行复用。

二:java并发包提供的线程池

1:线程池的类(Executors类)

类图

newFixedThreadPool 固定数量的线程池,线程池中的线程数量是固定的,不会改变。

newSingleThreadExecutor 单一线程池,线程池中只有一个线程。执行完成一个再执行另外一个,其实有点把多线程变成了单线程

newCachedThreadPool 缓存线程池,线程池中的线程数量不固定,会根据需求的大小进行改变。它的线程回自动释放默认时间是60s

newScheduledThreadPool 计划任务调度的线程池,用于执行计划任务,比如每隔5分钟怎么样,

newWorkStealingPool   创建一个 work-stealing 线程池,使用目前机器上可用的处理器作为它的并行级别。  jdk1.8新增的

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2.         return new ThreadPoolExecutor(nThreads, nThreads,
  3.                                       0L, TimeUnit.MILLISECONDS,
  4.                                       new LinkedBlockingQueue());
  5. }
  6. public static ExecutorService newSingleThreadExecutor() {
  7.         return new FinalizableDelegatedExecutorService
  8.             (new ThreadPoolExecutor(1, 1,
  9.                                     0L, TimeUnit.MILLISECONDS,
  10.                                     new LinkedBlockingQueue()));
  11. }
  12. public static ExecutorService newCachedThreadPool() {
  13.         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  14.                                       60L, TimeUnit.SECONDS,
  15.                                       new SynchronousQueue());
  16. }
  17.  public static ExecutorService newWorkStealingPool() {
  18.         return new ForkJoinPool
  19.             (Runtime.getRuntime().availableProcessors(),
  20.              ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  21.              null, true);
  22.     }

    

    

    

ThreadPoolExecutor 构造函数中参数的含义。

corePoolSize  线程池中核心线程数的数目

maximumPoolSize 线程池中最多能容纳多少个线程

keepAliveTime 当现在线程数目大于corePoolSize时,超过keepAliveTime时间后,多出corePoolSize的那些线程将被终结。

unit keepAliveTime的单位

workQueue 当任务数量很大,线程池中线程无法满足时,提交的任务会被放到阻塞队列中,线程空闲下来则会不断从阻塞队列中取数据。

FixedThreadPool,它的线程的核心数目和最大容纳数目都是一样的,以至于在工作期间,并不会创建和销毁线程。当任务数量很大,线程池中的线程无法满足时,任务将被保存到LinkedBlockingQueue中,而LinkedBlockingQueue的大小是Integer.MAX_VALUE。这就意味着,任务不断地添加,会使内存消耗越来越大。

CachedThreadPool,它的核心线程数量是0,最大容纳数目是Integer.MAX_VALUE,它的阻塞队列是SynchronousQueue,这是一个特别的队列,它的大小是0。由于核心线程数量是0,所以必然要将任务添加到SynchronousQueue中,这个队列只有一个线程在从中添加数据,同时另一个线程在从中获取数据时,才能成功。独自往这个队列中添加数据会返回失败。当返回失败时,则线程池开始扩展线程,这就是为什么CachedThreadPool的线程数目是不固定的。当60s该线程仍未被使用时,线程则被销毁。

相关演示代码

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class ExecutorsDome {
  4.     public static void main(String[] args) {
  5.         //ExecutorService executorService = Executors.newCachedThreadPool();//默认是60s 的线程时间
  6.        ExecutorService executorService = Executors.newFixedThreadPool(10);//固定数量的线程池,线程池中的线程数量是固定的,不会改变。
  7.        // ExecutorService executorService = Executors.newSingleThreadExecutor();//线程池中只有一个线程,执行完成一个在执行另外一个,其实有点把多线程变成了单线程
  8.         //ExecutorService executorService =Executors.newScheduledThreadPool(6);
  9.       // ExecutorService executorService=Executors.newWorkStealingPool();
  10.         executorService.execute(new ThreadPools(1));
  11.         executorService.execute(new ThreadPools(2));
  12.         executorService.execute(new ThreadPools(3));
  13.         executorService.execute(new ThreadPools(4));
  14.         executorService.execute(new ThreadPools(5));
  15.         executorService.submit(new ThreadPools(6));
  16.     }
  17. }
  18. class ThreadPools implements Runnable {
  19.     private int i=0;
  20.     public ThreadPools(int i){
  21.         this.i=i;
  22.     }
  23.     public void run() {
  24.         try {
  25.             Thread.sleep(5000);
  26.         } catch (InterruptedException e) {
  27.             // TODO Auto-generated catch block
  28.             e.printStackTrace();
  29.         }
  30.         System.out.println("ThreadPools:"+i);
  31.     }
  32. }

==============================

定时的demo

  1. public class ScheduledExecutorServiceDemo {
  2.     public static void main(String[] args) throws Exception, ExecutionException {
  3.         ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);  
  4.         ScheduledFuture scheduledFuture = executorService.schedule(new Callable() {  
  5.             public String call() throws Exception {  
  6.                 return "call";  
  7.             }  
  8.         }, 10, TimeUnit.SECONDS);  
  9.         System.out.println(scheduledFuture.get());  
  10.         executorService.shutdown(); 
  11.     }
  12. }

结果是10s后返回"call"的结果

2:拒绝(饱和)策略

ThreadPoolExecutor的构造函数里面有一个参数是RejectedExecutionHandler,它是一个接口实现类有:AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy,这这个分别代表了4种策略

AbortPolicy:如果不能接受任务了,则抛出异常。默认拒绝策略是它

CallerRunsPolicy:如果不能接受任务了,则让调用的线程去完成。

DiscardOldestPolicy:如果不能接受任务了,则丢弃最老的一个任务,由一个队列来维护。

DiscardPolicy:如果不能接受任务了,则丢弃任务。

当然我们也可以自己实现RejectedExecutionHandler接口来自己定义拒绝策略。

3:ForkJoin

fork/join(jdk1.7新增)框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。类似于ExecutorService接口的其他实现,fork/join框架会将任务分发给线程池中的工作线程。fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。

他的思想可以用一张图来了解

演示代码

  1. import java.util.concurrent.ExecutionException;
  2. import java.util.concurrent.ForkJoinPool;
  3. import java.util.concurrent.Future;
  4. import java.util.concurrent.RecursiveTask;
  5. //1*1+2*2+3*3....100*100
  6. public class ForkJoinPoolDemo {
  7.     public static void main(String[] args) {
  8.         MyTask mt = new MyTask(1);
  9.         ForkJoinPool forkJoinPool = new ForkJoinPool();
  10.         Future result = forkJoinPool.submit(mt);
  11.         try {
  12.             System.out.println(result.get());
  13.         } catch (InterruptedException e) {
  14.             e.printStackTrace();
  15.         } catch (ExecutionException e) {
  16.             e.printStackTrace();
  17.         }
  18.         forkJoinPool.shutdown();
  19.         A.a();
  20.     }
  21. }
  22. //ForkJoin算法
  23. class MyTask extends RecursiveTask {
  24.     int i;
  25.     public MyTask(int i){
  26.         this.i = i;
  27.     }
  28.     @Override
  29.     protected Integer compute() {
  30.         if (i >= 100) {
  31.             return i * i;
  32.         }
  33.         MyTask newTask2 = new MyTask(i + 1);
  34.         newTask2.fork();
  35.         return i * i + newTask2.join();
  36.     }
  37. }
  38. //普通算法
  39. class A { 
  40.     public static void a() {
  41.         int j=0;
  42.         for (int i = 1; i <= 100; i++) {  
  43.             j=j+(i*i);
  44.         }
  45.         System.out.println(j);
  46.     }
  47. }

以上两种算法的结果是一样的都是 338350。更深入的了解请网上超资料。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
主题:java并发编程-Executor框架
Java并发编程(19):并发新特性
java多线程系列:Executors框架
Java:Java程序猿必会的四种线程池
线程-1、创建线程的方式及实现
线程池之 ScheduledThreadPoolExecutor中scheduleAtFixedRate执行定时任务
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服