1. 线程池
线程池:三大方法、7大参数、4种拒绝策略
1.1 线程池的好处
1、降低资源的消耗
2、提高响应的速度
3、方便管理。
4、线程复用、可以控制最大并发数、管理线程
1.2 线程池:三大方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class Demo01 { public static void main(String[] args) { ExecutorService threadPool = Executors.newSingleThreadExecutor(); 程
个固定的线程池的大小
的,遇强则强,遇弱则弱
try { for (int i = 0; i < 100; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" ok"); }); }
} catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); }
} }
|
1.3 七大参数
源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小 int maximumPoolSize, // 最大核心线程池大小 long keepAliveTime, // 超时了没有人调用就会释放 TimeUnit unit, // 超时单位 BlockingQueue<Runnable> workQueue, // 阻塞队列 ThreadFactory threadFactory, // 线程工厂:创建线程的,一般 不用动 RejectedExecutionHandler handle // 拒绝策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
|
手动创建一个线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
public class Demo01 { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); 最早的竞争,也不会抛出异常! try { for (int i = 1; i <= 9; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" ok"); }); }
} catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); }
} }
|
1.4 四种拒绝策略
1.5 小结和拓展
池的最大的大小如何去设置?
了解:IO密集型,CPU密集型:(调优)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class Demo01 { public static void main(String[] args) {
System.out.println(Runtime.getRuntime().availableProcessors()); ExecutorService threadPool = new ThreadPoolExecutor( 2, Runtime.getRuntime().availableProcessors(), 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); 最早的竞争,也不会抛出异常! try { for (int i = 1; i <= 9; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" ok"); }); }
} catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); }
} }
|
2. ForkJoin(分解大任务)
ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。处理大数据量!
大数据:Map Reduce (把大任务拆分为小任务)
ForkJoin 特点:工作窃取
这个里面维护的都是双端队列,如果当前队列已无任务,则会去其他队列的另一头窃取任务执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start; private Long end;
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; }
@Override protected Long compute() { if ((end-start)<temp){ Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; }else { long middle = (start + end) / 2; ForkJoinDemo task1 = new ForkJoinDemo(start, middle); task1.fork(); ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end); task2.fork(); return task1.join() + task2.join(); } } }
|
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { }
public static void test1(){ Long sum = 0L; long start = System.currentTimeMillis(); for (Long i = 1L; i <= 10_0000_0000; i++) { sum += i; } long end = System.currentTimeMillis(); System.out.println("sum="+sum+" 时间:"+(end-start)); }
public static void test2() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum="+sum+" 时间:"+(end-start)); }
public static void test3(){ long start = System.currentTimeMillis(); long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum); long end = System.currentTimeMillis(); System.out.println("sum="+"时间:"+(end-start)); }
}
|
3.Future 异步回调
Future 设计的初衷: 对将来的某个事件的结果进行建模
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
public class Demo01 { public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer"); int i = 10/0; return 1024; }); System.out.println(completableFuture.whenComplete((t, u) -> { System.out.println("t=>" + t); System.out.println("u=>" + u); }).exceptionally((e) -> { System.out.println(e.getMessage()); return 233; }).get());
} }
|