一、线程回顾 1、创建线程的四种方式
继承Thread
启动线程【创建继承Thread的实现类,调用start方法启动】
实现Runnable接口
启动线程【创建一个Thread,往里面传入一个Runnable的实现类,调用start方法启动】
实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常)
启动线程【创建一个Thread,往里面传入一个Runnable的实现类,调用start方法启动】
线程池
前三种创建线程的方式————以下三种创建线程的方式都不建议使用
这三种方式可能会导致线程耗尽,这三种方式创建线程,就相当于,一个公司有一个新任务就招一个新员工,这样会导致资源耗尽
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class ThreadTest { public static class Thread01 extends Thread { @Override public void run () { System.out.println("当前线程:" + Thread.currentThread().getId()); } } public static class Runnable01 implements Runnable { @Override public void run () { System.out.println("当前线程:" + Thread.currentThread().getId()); } } public static class Callable01 implements Callable <Integer> { @Override public Integer call () throws Exception { System.out.println("当前线程:" + Thread.currentThread().getId()); return 10 / 2 ; } } }
使用线程池方式创建线程 ————建议使用线程池创建线程
线程池的详细使用可以去看下JUC
四种创建线程的区别:
继承Thread和实现Runnable创建线程不能得到返回值,实现Callable< V >接口可以获取返回值
这三种方式都不能控制资源
使用线程池可以控制资源,性能稳定
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ThreadTest { private static final ExecutorService service = Executors.newFixedThreadPool(10 ); public static void main (String[] args) { System.out.println("========================线程池的使用=====================================" ); service.execute(() -> System.out.println("使用线程池中的线程,执行一个异步方法" )); } }
2、线程池的七大参数和创建线程池
线程池(ExecutorService)的创建
Executors
new ThreadPoolExecutor()
ThreadPoolExecutor创建线程池的七大参数:
int corePoolSize:核心线程数(一直存在,除非调用allowCoreThreadTimeOut);线程池创建好以后就准备就绪的线程数量,就等待来接收异步任务去执行
int maximumPoolSize:最大线程数量;控制资源,【如果指定为200,就算提交了100000个异步任务,也只能一次执行200个异步任务】
long keepAliveTime:线程存活时间。释放空闲的线程(maximumPoolSize-corePoolSize),只要线程的空闲时间大于指定的线程存活时间,就释放线程。但是不会释放corePoolSize中的线程
TimeUnit unit:用于指定keepAliveTime的时间单位
BlockingQueue< Runnable > workQueue:阻塞队列,如果一次性任务有很多,就回家那个目前多的任务放在队列里面,只要有空闲的线程了,就会从队列中取出新的任务继续执行
ThreadFactory threadFactory:线程的创建工厂
RejectedExecutionHandler handler:处理策略。如果队列满了,按照我们指定的拒绝策略执行任务
工作流程
线程池创建,准备好core 数量的核心线程,准备接受任务
core 满了,就将再进来的任务放入阻塞队列中。空闲的core 就会自己去阻塞队 列获取任务执行
阻塞队列满了,就直接开新线程执行,最大只能开到max 指定的数量
max 都执行好了。Max-core 数量空闲的线程会在keepAliveTime 指定的时间后自 动销毁。最终保持到core 大小
如果线程数开到了max 的数量,还有新任务进来,就会使用reject 指定的拒绝策 略进行处理
所有的线程创建都是由指定的factory 创建的
题目:
一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;
先有7 个能直接得到执行,接下来50 个进入队列排队,在多开13 个继续执行。现在70 个被安排上了。剩下30 个默认拒绝策略。
1 2 3 4 5 6 7 8 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor (5 , 200 , 10 , TimeUnit.SECONDS, new LinkedBlockingDeque <>(1000 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .AbortPolicy());
3、常见四种线程池 这四种线程池都可以使用Executors
快速创建
newCachedThreadPool
newFixedThreadPool
newScheduledThreadPool
newSingleThreadExecutor
二、CompletableFuture 异步编排 1、创建异步对象 CompletableFuture 提供了四个静态方法来创建一个异步操作。
1 2 3 4 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) ;public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier,Executor executor) ;public static CompletableFuture<Void> runAsync (Runnable runnable) ;public static CompletableFuture<Void> runAsync (Runnable runnable,Executor executor) ;
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class ThreadTest { private static ExecutorService executor = Executors.newFixedThreadPool(10 ); public static void main (String[] args) throws ExecutionException, InterruptedException { System.out.println("========================异步编排使用---starter=====================================" ); CompletableFuture.runAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 * 2 ; System.out.println("运行结果为:" + i); }, executor); CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2 ; System.out.println("运行结果为:" + i); return i; }, executor); System.out.println("返回值为:" +supplyAsync.get()); System.out.println("========================异步编排使用---end=====================================" ); } }
2、完成时执行回调 1 2 3 4 public CompletableFuture<T> whenComplete (BiConsumer<? super T, ? super Throwable> action) ;public CompletableFuture<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action) ;public CompletableFuture<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action, Executor executor) ;public CompletableFuture<T> exceptionally (Function<Throwable, ? extends T> fn)
whenComplete 可以处理正常和异常的计算结果,但是不能修改返回值 ,exceptionally 处理异常情况。可以修改返回值
whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以Async 结尾,意味着Action 使用相同的线程执行,而Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class ThreadTest { private static ExecutorService executor = Executors.newFixedThreadPool(10 ); public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(() -> 10 / 0 , executor) .whenComplete((result, ex) -> System.out.println("异步任务成功完成了,结果是:" + result + ";异常是:" + ex)); CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> 10 / 0 , executor) .exceptionally(ex -> { System.out.println("异常为:" + ex); return 20 ; }); System.out.println("exceptionally处理后的返回结果为:" + exceptionally.get()); } }
3、handle 方法 1 2 3 4 5 public <U> CompletableFuture<U> handle (BiFunction<? super T, Throwable, ? extends U> fn) ;public <U> CompletableFuture<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn) ;public <U> CompletableFuture<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) ;
和complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class ThreadTest { private static ExecutorService executor = Executors.newFixedThreadPool(10 ); public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> handle = CompletableFuture.supplyAsync(() -> 10 / 0 , executor) .handle((result, exception) -> { if (result != null ) { return -1 ; } if (exception != null ) { return 1 ; } return 0 ; }); System.out.println("handle执行后的返回值为:" + handle.get()); } }
4、线程串行化方法—thenXxx 以Async 结尾的方法,可以继续传入一个新的线程,不需要和之前的是同一个线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public CompletableFuture<Void> thenRun (Runnable action) ;public CompletableFuture<Void> thenRunAsync (Runnable action) ;public CompletableFuture<Void> thenRunAsync (Runnable action, Executor executor) ;public CompletableFuture<Void> thenAccept (Consumer<? super T> action) ;public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action) ;public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action, Executor executor) ;public <U> CompletableFuture<U> thenApply (Function<? super T, ? extends U> fn) ;public <U> CompletableFuture<U> thenApplyAsync (Function<? super T, ? extends U> fn) ;public <U> CompletableFuture<U> thenApplyAsync (Function<? super T, ? extends U> fn, Executor executor) ;
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun 方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun 的后续操作
带有Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class ThreadTest { private static ExecutorService executor = Executors.newFixedThreadPool(10 ); public static void main (String[] args) { CompletableFuture .supplyAsync(() -> { System.out.println("任务1启动" ); return 10 / 5 ; }, executor) .thenApplyAsync(result -> { System.out.println("上一步的结果为:" + result); System.out.println("任务2启动" ); return result * 2 ; }) .thenAcceptAsync(result -> { System.out.println("上一个任务的结果为:" + result); System.out.println("任务3启动" ); }) .thenRunAsync(() -> System.out.println("任务4启动了" ), executor); } }
5、两任务组合 - 都要完成 两个任务必须都完成,触发该任务。
thenCombine:组合两个future,获取两个future 的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future 任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个future,不需要获取future 的结果,只需两个future 处理完任务后,处理该任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public <U,V> CompletableFuture<V> thenCombine ( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) ;public <U,V> CompletableFuture<V> thenCombineAsync ( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) ;public <U,V> CompletableFuture<V> thenCombineAsync ( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) ;public <U> CompletableFuture<Void> thenAcceptBoth ( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) ;public <U> CompletableFuture<Void> thenAcceptBothAsync ( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) ;public <U> CompletableFuture<Void> thenAcceptBothAsync ( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) ;public CompletableFuture<Void> runAfterBoth (CompletionStage<?> other,Runnable action) ;public CompletableFuture<Void> runAfterBothAsync (CompletionStage<?> other,Runnable action) ;public CompletableFuture<Void> runAfterBothAsync (CompletionStage<?> other,Runnable action,Executor executor) ;
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class ThreadTest { private static ExecutorService executor = Executors.newFixedThreadPool(10 ); public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1线程启动:" + Thread.currentThread().getId()); System.out.println("任务1线程结束:" ); return 10 / 5 ; }, executor); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2线程启动:" + Thread.currentThread().getId()); System.out.println("任务2线程结束:" ); return "任务二" ; }, executor); future01.runAfterBoth(future02, () -> { System.out.println("任务三开始" ); }); future01.thenAcceptBothAsync(future02, (result1, result2) -> { System.out.println("任务一的返回值为:" + result1); System.out.println("任务二的返回值为:" + result2); }, executor); System.out.println(future01.thenCombineAsync(future02, (result1, result2) -> "thenAcceptBothAsync的返回值为任务1和任务2的返回值结合:" + result1 + result2, executor).get()); } }
6、两任务组合 - 一个完成 当两个任务中,任意一个future 任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取future 的结果,处理任务,也没有返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public <U> CompletableFuture<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn) ;public <U> CompletableFuture<U> applyToEitherAsync (CompletionStage<? extends T> other, Function<? super T, U> fn) ;public <U> CompletableFuture<U> applyToEitherAsync ( CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor) ;public CompletableFuture<Void> acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action) ;public CompletableFuture<Void> acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? super T> action) ;public CompletableFuture<Void> acceptEitherAsync ( CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor) ;public CompletableFuture<Void> runAfterEither (CompletionStage<?> other,Runnable action) ;public CompletableFuture<Void> runAfterEitherAsync (CompletionStage<?> other, Runnable action) ; public CompletableFuture<Void> runAfterEitherAsync (CompletionStage<?> other,Runnable action,Executor executor) ;
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class ThreadTest { private static ExecutorService executor = Executors.newFixedThreadPool(10 ); public static void main (String[] args) throws Exception { CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1线程启动:" + Thread.currentThread().getId()); System.out.println("任务1线程结束:" ); return 10 / 5 ; }, executor); CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2线程启动:" + Thread.currentThread().getId()); System.out.println("任务2线程结束:" ); return "任务二" ; }, executor); future01.runAfterEitherAsync(future02, () -> System.out.println("任务三执行" ), executor); future01.acceptEitherAsync(future02, result -> System.out.println("上一步的返回值为" + result), executor); System.out.println(future01.applyToEitherAsync(future02, result -> "接收的返回值为:" + result, executor).get()); } }
7、多任务组合 allOf:等待所有任务完成,才算完成
anyOf:只要有一个任务完成,就算完成了
1 2 public static CompletableFuture<Void> allOf (CompletableFuture<?>... cfs) ;public static CompletableFuture<Object> anyOf (CompletableFuture<?>... cfs) ;
三、应用场景 可以在商品介绍的部分使用异步编排的方式获取商品属性
比如:查询一个商品的详细信息中,商品属性值的获取必须要先查询到sku的信息,就可以使用异步编排的方式,先查询商品sku,再根据查询到的sku的值再去查询属性或其他需要依赖sku的信息,而不是一次性查询出来,免得使用时间过长。
自定义线程池的场景启动器 创建Maven项目
ThreadPoolProperties 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 import org.springframework.boot.context.properties.ConfigurationProperties;import java.util.concurrent.*;@ConfigurationProperties("thread.pool") public class ThreadPoolProperties { private Integer corePoolSize = 20 ; private Integer maximumPoolSize = 200 ; private Long keepAliveTime = 10L ; private TimeUnit unit = TimeUnit.SECONDS; private BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque <>(100000 ); private ThreadFactory threadFactory = Executors.defaultThreadFactory(); private RejectedExecutionHandler handler = new ThreadPoolExecutor .AbortPolicy(); public Integer getCorePoolSize () { return corePoolSize; } public void setCorePoolSize (Integer corePoolSize) { this .corePoolSize = corePoolSize; } public Integer getMaximumPoolSize () { return maximumPoolSize; } public void setMaximumPoolSize (Integer maximumPoolSize) { this .maximumPoolSize = maximumPoolSize; } public Long getKeepAliveTime () { return keepAliveTime; } public void setKeepAliveTime (Long keepAliveTime) { this .keepAliveTime = keepAliveTime; } public TimeUnit getUnit () { return unit; } public void setUnit (TimeUnit unit) { this .unit = unit; } public BlockingQueue<Runnable> getWorkQueue () { return workQueue; } public void setWorkQueue (BlockingQueue<Runnable> workQueue) { this .workQueue = workQueue; } public ThreadFactory getThreadFactory () { return threadFactory; } public void setThreadFactory (ThreadFactory threadFactory) { this .threadFactory = threadFactory; } public RejectedExecutionHandler getHandler () { return handler; } public void setHandler (RejectedExecutionHandler handler) { this .handler = handler; } }
ThreadPoolAutoConfiguration 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import com.xiaofei.thread.pool.bean.ThreadPoolProperties;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.concurrent.ThreadPoolExecutor;@Configuration @EnableConfigurationProperties(ThreadPoolProperties.class) public class ThreadPoolAutoConfiguration { @Autowired private ThreadPoolProperties threadPoolProperties; @Bean @ConditionalOnMissingBean(ThreadPoolExecutor.class) public ThreadPoolExecutor threadPoolService () { return new ThreadPoolExecutor ( threadPoolProperties.getCorePoolSize(), threadPoolProperties.getMaximumPoolSize(), threadPoolProperties.getKeepAliveTime(), threadPoolProperties.getUnit(), threadPoolProperties.getWorkQueue(), threadPoolProperties.getThreadFactory(), threadPoolProperties.getHandler() ); } }
spring.factories 1 2 3 4 org.springframework.boot.autoconfigure.EnableAutoConfiguration =\ com.xiaofei.thread.pool.config.ThreadPoolAutoConfiguration