一、线程回顾

1、创建线程的四种方式

  1. 继承Thread
    • 启动线程【创建继承Thread的实现类,调用start方法启动】
  2. 实现Runnable接口
    • 启动线程【创建一个Thread,往里面传入一个Runnable的实现类,调用start方法启动】
  3. 实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常)
    • 启动线程【创建一个Thread,往里面传入一个Runnable的实现类,调用start方法启动】
  4. 线程池

前三种创建线程的方式————以下三种创建线程的方式都不建议使用

这三种方式可能会导致线程耗尽,这三种方式创建线程,就相当于,一个公司有一个新任务就招一个新员工,这样会导致资源耗尽

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ThreadTest {

//创建线程方式一:继承Thread
public static class Thread01 extends Thread {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
}
}

//创建线程方式二:实现Runnable接口
public static class Runnable01 implements Runnable {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
}
}

//创建线程方式三:实现Callable接口
public static class Callable01 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getId());
return 10 / 2;
}
}

}


使用线程池方式创建线程 ————建议使用线程池创建线程

线程池的详细使用可以去看下JUC

四种创建线程的区别:

  • 继承Thread和实现Runnable创建线程不能得到返回值,实现Callable< V >接口可以获取返回值
  • 这三种方式都不能控制资源
  • 使用线程池可以控制资源,性能稳定
1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadTest {

//创建线程池
private static final ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) {

System.out.println("========================线程池的使用=====================================");

service.execute(() -> System.out.println("使用线程池中的线程,执行一个异步方法"));

}
}

2、线程池的七大参数和创建线程池

线程池(ExecutorService)的创建

Executors

new ThreadPoolExecutor()


ThreadPoolExecutor创建线程池的七大参数:

int corePoolSize:核心线程数(一直存在,除非调用allowCoreThreadTimeOut);线程池创建好以后就准备就绪的线程数量,就等待来接收异步任务去执行

int maximumPoolSize:最大线程数量;控制资源,【如果指定为200,就算提交了100000个异步任务,也只能一次执行200个异步任务】

long keepAliveTime:线程存活时间。释放空闲的线程(maximumPoolSize-corePoolSize),只要线程的空闲时间大于指定的线程存活时间,就释放线程。但是不会释放corePoolSize中的线程

TimeUnit unit:用于指定keepAliveTime的时间单位

BlockingQueue< Runnable > workQueue:阻塞队列,如果一次性任务有很多,就回家那个目前多的任务放在队列里面,只要有空闲的线程了,就会从队列中取出新的任务继续执行

ThreadFactory threadFactory:线程的创建工厂

RejectedExecutionHandler handler:处理策略。如果队列满了,按照我们指定的拒绝策略执行任务


工作流程

  1. 线程池创建,准备好core 数量的核心线程,准备接受任务
    • core 满了,就将再进来的任务放入阻塞队列中。空闲的core 就会自己去阻塞队
      列获取任务执行
    • 阻塞队列满了,就直接开新线程执行,最大只能开到max 指定的数量
    • max 都执行好了。Max-core 数量空闲的线程会在keepAliveTime 指定的时间后自
      动销毁。最终保持到core 大小
    • 如果线程数开到了max 的数量,还有新任务进来,就会使用reject 指定的拒绝策
      略进行处理
  2. 所有的线程创建都是由指定的factory 创建的

题目:

一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;

先有7 个能直接得到执行,接下来50 个进入队列排队,在多开13 个继续执行。现在70 个被安排上了。剩下30 个默认拒绝策略。

1
2
3
4
5
6
7
8
//使用ThreadPoolExecutor创建一个线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,
200,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

3、常见四种线程池

这四种线程池都可以使用Executors快速创建

newCachedThreadPool

newFixedThreadPool

newScheduledThreadPool

newSingleThreadExecutor

二、CompletableFuture 异步编排

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

1
2
3
4
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadTest {

//创建线程池
private static ExecutorService executor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {

System.out.println("========================异步编排使用---starter=====================================");

// CompletableFuture启动方式一:【调用runAsync方法,没有返回值】
CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 * 2;
System.out.println("运行结果为:" + i);
}, executor);

// CompletableFuture启动方式二:【调用supplyAsync方法,有返回值】
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果为:" + i);
return i;
}, executor);
System.out.println("返回值为:"+supplyAsync.get());

System.out.println("========================异步编排使用---end=====================================");
}
}

2、完成时执行回调

1
2
3
4
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
  • whenComplete 可以处理正常和异常的计算结果,但是不能修改返回值,exceptionally 处理异常情况。可以修改返回值
  • whenComplete 和 whenCompleteAsync 的区别:
    • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
    • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以Async 结尾,意味着Action 使用相同的线程执行,而Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ThreadTest {

//创建线程池
private static ExecutorService executor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {

//使用 whenComplete 处理回调
CompletableFuture.supplyAsync(() -> 10 / 0, executor)
.whenComplete((result, ex) -> System.out.println("异步任务成功完成了,结果是:" + result + ";异常是:" + ex));

//使用 exceptionally 处理回调
CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> 10 / 0, executor)
.exceptionally(ex -> {
System.out.println("异常为:" + ex);
return 20;
});
System.out.println("exceptionally处理后的返回结果为:" + exceptionally.get());
}
}

3、handle 方法

1
2
3
4
5
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);

和complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ThreadTest {

//创建线程池
private static ExecutorService executor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {

CompletableFuture<Integer> handle = CompletableFuture.supplyAsync(() -> 10 / 0, executor)
.handle((result, exception) -> {
if (result != null) {
return -1;
}
if (exception != null) {
return 1;
}
return 0;
});
System.out.println("handle执行后的返回值为:" + handle.get());
}
}

4、线程串行化方法—thenXxx

以Async 结尾的方法,可以继续传入一个新的线程,不需要和之前的是同一个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//不能接收上一步的结果,也没有结果返回
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action) ;
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

//可以接收上一次任务的返回值,但是自己没有返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

//可以就收上一步任务的返回值,自己可以有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) ;
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) ;
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor);

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

thenRun 方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun 的后续操作

带有Async 默认是异步执行的。同之前。

以上都要前置任务成功完成。

Function<? super T,? extends U>

​ T:上一个任务返回结果的类型

​ U:当前任务的返回值类型

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadTest {

//创建线程池
private static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
CompletableFuture
.supplyAsync(() -> {
System.out.println("任务1启动");
return 10 / 5;
}, executor)
.thenApplyAsync(result -> {
System.out.println("上一步的结果为:" + result);
System.out.println("任务2启动");
return result * 2;
})
.thenAcceptAsync(result -> {
System.out.println("上一个任务的结果为:" + result);
System.out.println("任务3启动");
})
.thenRunAsync(() -> System.out.println("任务4启动了"), executor);
}
}

5、两任务组合 - 都要完成

两个任务必须都完成,触发该任务。

thenCombine:组合两个future,获取两个future 的返回结果,并返回当前任务的返回值

thenAcceptBoth:组合两个future,获取两个future 任务的返回结果,然后处理任务,没有返回值。

runAfterBoth:组合两个future,不需要获取future 的结果,只需两个future 处理完任务后,处理该任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) ;

public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ThreadTest {

//创建线程池
private static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程启动:" + Thread.currentThread().getId());
System.out.println("任务1线程结束:");
return 10 / 5;
}, executor);

CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程启动:" + Thread.currentThread().getId());
System.out.println("任务2线程结束:");
return "任务二";
}, executor);

//前面两个任务执行完成之后才执行----不能接收返回值----自己没有返回值
future01.runAfterBoth(future02, () -> {
System.out.println("任务三开始");
});

//前面两个任务执行完成之后才执行----可以接收返回值----自己没有返回值
future01.thenAcceptBothAsync(future02, (result1, result2) -> {
System.out.println("任务一的返回值为:" + result1);
System.out.println("任务二的返回值为:" + result2);
}, executor);

System.out.println(future01.thenCombineAsync(future02, (result1, result2) ->
"thenAcceptBothAsync的返回值为任务1和任务2的返回值结合:" + result1 + result2, executor).get());
}
}

6、两任务组合 - 一个完成

当两个任务中,任意一个future 任务完成的时候,执行任务。

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

runAfterEither:两个任务有一个执行完成,不需要获取future 的结果,处理任务,也没有返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) ;

public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) ;

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadTest {

//创建线程池
private static ExecutorService executor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws Exception {
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程启动:" + Thread.currentThread().getId());
System.out.println("任务1线程结束:");
return 10 / 5;
}, executor);

CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程启动:" + Thread.currentThread().getId());
System.out.println("任务2线程结束:");
return "任务二";
}, executor);

//两个任务只要有一个完成就执行任务三——不能获取之前的返回值,自己也没有返回值
future01.runAfterEitherAsync(future02, () -> System.out.println("任务三执行"), executor);

//两个任务只要有一个完成就执行任务三——能获取之前的返回值,自己也没有返回值
future01.acceptEitherAsync(future02, result -> System.out.println("上一步的返回值为" + result), executor);

//两个任务只要有一个完成就执行任务三——能获取之前的返回值,可以添加返回值
System.out.println(future01.applyToEitherAsync(future02, result -> "接收的返回值为:" + result, executor).get());
}
}

7、多任务组合

allOf:等待所有任务完成,才算完成

anyOf:只要有一个任务完成,就算完成了

1
2
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

三、应用场景

可以在商品介绍的部分使用异步编排的方式获取商品属性

比如:查询一个商品的详细信息中,商品属性值的获取必须要先查询到sku的信息,就可以使用异步编排的方式,先查询商品sku,再根据查询到的sku的值再去查询属性或其他需要依赖sku的信息,而不是一次性查询出来,免得使用时间过长。

自定义线程池的场景启动器

创建Maven项目

image-20220624212823183

ThreadPoolProperties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.util.concurrent.*;

@ConfigurationProperties("thread.pool")
public class ThreadPoolProperties {
//核心线程数(一直存在,除非调用allowCoreThreadTimeOut);线程池创建好以后就准备就绪的线程数量,就等待来接收异步任务去执行
private Integer corePoolSize = 20;

//最大线程数量;控制资源,【如果指定为200,就算提交了100000个异步任务,也只能一次执行200个异步任务】
private Integer maximumPoolSize = 200;

//线程存活时间。释放空闲的线程(maximumPoolSize-corePoolSize),只要线程的空闲时间大于指定的线程存活时间,就释放线程。但是不会释放corePoolSize中的线程
private Long keepAliveTime = 10L;

//用于指定keepAliveTime的时间单位
private TimeUnit unit = TimeUnit.SECONDS;

//阻塞队列,如果一次性任务有很多,就回家那个目前多的任务放在队列里面,只要有空闲的线程了,就会从队列中取出新的任务继续执行
private BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>(100000);

//线程的创建工厂
private ThreadFactory threadFactory = Executors.defaultThreadFactory();

//处理策略。如果队列满了,按照我们指定的拒绝策略执行任务
private RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();

public Integer getCorePoolSize() {
return corePoolSize;
}

public void setCorePoolSize(Integer corePoolSize) {
this.corePoolSize = corePoolSize;
}

public Integer getMaximumPoolSize() {
return maximumPoolSize;
}

public void setMaximumPoolSize(Integer maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}

public Long getKeepAliveTime() {
return keepAliveTime;
}

public void setKeepAliveTime(Long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}

public TimeUnit getUnit() {
return unit;
}

public void setUnit(TimeUnit unit) {
this.unit = unit;
}

public BlockingQueue<Runnable> getWorkQueue() {
return workQueue;
}

public void setWorkQueue(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
}

public ThreadFactory getThreadFactory() {
return threadFactory;
}

public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

public RejectedExecutionHandler getHandler() {
return handler;
}

public void setHandler(RejectedExecutionHandler handler) {
this.handler = handler;
}
}

ThreadPoolAutoConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import com.xiaofei.thread.pool.bean.ThreadPoolProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableConfigurationProperties(ThreadPoolProperties.class)//默认会将ThreadPoolProperties放在容器中
public class ThreadPoolAutoConfiguration {

@Autowired
private ThreadPoolProperties threadPoolProperties;

@Bean
@ConditionalOnMissingBean(ThreadPoolExecutor.class)//当容器中没有该Bean的时候才创建
public ThreadPoolExecutor threadPoolService() {
//使用ThreadPoolExecutor创建一个线程池
return new ThreadPoolExecutor(
threadPoolProperties.getCorePoolSize(),
threadPoolProperties.getMaximumPoolSize(),
threadPoolProperties.getKeepAliveTime(),
threadPoolProperties.getUnit(),
threadPoolProperties.getWorkQueue(),
threadPoolProperties.getThreadFactory(),
threadPoolProperties.getHandler()
);
}
}

spring.factories

1
2
3
4
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.xiaofei.thread.pool.config.ThreadPoolAutoConfiguration
# 自动配置类所在的位置