CompletableFuture和Future介绍

介绍

Future是JDK1.5后提供的异步调用接口。
CompletableFuture是JDK8提出的一个支持非阻塞的多功能的异步类,实现了Future。

与Future区别

  • CompletableFuture支持回调,可指定线程池
  • CompletableFuture支持链式、顺序和聚合等执行
  • CompletableFuture支持不阻塞主线程且无需轮询等待状态
  • 都是提交后直接执行,并在get时阻塞等待

CompletableFuture主要方法

  • thenRun,前一个执行后再同步执行
  • thenRunAsync,前一个执行后再异步执行
  • runAsync,无返回值的异步执行
  • supplyAsync,有返回值的异步执行
  • allOf,聚合多个调用
  • get,阻塞等待执行完成

CompletableFuture异步执行Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CompletableFutureTest {
public static void main(String[] args) {
//线程池
Executor executor = Executors.newFixedThreadPool(10);

//异步列表
List<CompletableFuture<Boolean>> completableFutureList = new ArrayList<>();

//输入列表
List<Integer> sceneList = new ArrayList<>();
sceneList.add(1);
sceneList.add(2);
sceneList.add(3);

sceneList.forEach(e -> {
//异步
CompletableFuture<Boolean> completableFuture =
CompletableFuture.supplyAsync(() -> execute(e), executor);
completableFutureList.add(completableFuture);
});

try {
//聚合阻塞
CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get(1, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}

private static boolean execute(Integer number) {
return number > 0;
}
}

Future 阻塞Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
while (true) {
//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
if (future.isDone()&& !future.isCancelled()) {
//获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。
Integer i = future.get();//获取结果
System.out.println("任务i="+i+"获取完成!"+new Date());
list.add(i);
break;//当前future获取结果完毕,跳出while
} else {
Thread.sleep(1);
//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
}
}

CompletableFuture-allOf问题

allOf等待所有任务执行完成,直到timeout超时。

1
2
3
CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get(timeout, TimeUnit.MILLISECONDS);

若timeout超时后,所有任务均返回失败;类似事务操作,导致已获取结果的任务后续处理异常,无法容错。

正确使用方法

allOf不限制等待时间,子任务内部限定。具体操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Slf4j
public class CompletableFutureUtil {

private List<CompletableFuture<Void>> completableFutureList = new ArrayList<>(1000);
public static ScheduledExecutorService es = Executors.newScheduledThreadPool(20);

public static CompletableFutureUtil newInstance() {
return new CompletableFutureUtil();
}

public CompletableFutureUtil runAsync(Runnable runnable, int timeout) {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(runnable, ThreadPool.POOL);
completableFutureList.add(timeoutAfter(timeout, completableFuture));
return this;
}

public CompletableFutureUtil runAsync(Runnable runnable) {
completableFutureList.add(CompletableFuture.runAsync(runnable, ThreadPool.POOL));
return this;
}

public void finish(int timeout) {
try {
CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.warn("timeout,caller:{},size:{}", getMethod(), completableFutureList.size());
} catch (Exception e) {
log.error("error,caller:{},size:{}", getMethod(), completableFutureList.size(), e);
}
}

public void finish() {
try {
CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get();
} catch (InterruptedException e) {
log.warn("interrupted,caller:{},size:{}", getMethod(), completableFutureList.size());
} catch (Exception e) {
log.error("error,caller:{},size:{}", getMethod(), completableFutureList.size(), e);
}
}

private <T> CompletableFuture<T> timeoutAfter(int timeout,
CompletableFuture<T> f) {
es.schedule(() -> f.complete(null), timeout, TimeUnit.MILLISECONDS);
return f;
}

public static String getMethod() {
StackTraceElement caller = (new Throwable()).getStackTrace()[2];

return caller.getClassName() + "@" + caller.getMethodName();
}
}

调用方法:

1
2
3
4
5
6
7
8
9
10
11
CompletableFutureUtil completableFutureUtil = CompletableFutureUtil.newInstance();

list.forEach(key -> completableFutureUtil.runAsync(() -> {
count.incrementAndGet();
}, 1000));

try {
completableFutureUtil.finish();
} catch (Exception e) {
log.error("error ", e);
}

  • 异步任务内部进行限定超时时间,runAsync(Runnable runnable, int timeout)
  • 内部由定时任务进行扫描和容错处理(返回null),es.schedule(() -> f.complete(null), timeout, TimeUnit.MILLISECONDS);
  • allOf不限定执行时候,直接get等待
------ 本文结束------

本文标题:CompletableFuture和Future介绍

文章作者:Perkins

发布时间:2020年05月19日

原始链接:https://perkins4j2.github.io/posts/13999/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。