介绍
Future是JDK1.5后提供的异步调用接口。
CompletableFuture是JDK8提出的一个支持非阻塞的多功能的异步类,实现了Future。
与Future区别
- CompletableFuture支持回调,可指定线程池
- CompletableFuture支持链式、顺序和聚合等执行
- CompletableFuture支持不阻塞主线程且无需轮询等待状态
- 都是提交后直接执行,并在get时阻塞等待
CompletableFuture主要方法
- thenRun,前一个执行后再同步执行
- thenRunAsync,前一个执行后再异步执行
- runAsync,无返回值的异步执行
- supplyAsync,有返回值的异步执行
- allOf,聚合多个调用
- get,阻塞等待执行完成
CompletableFuture异步执行Demo
1 | public class CompletableFutureTest { |
Future 阻塞Demo
1 | while (true) { |
CompletableFuture-allOf问题
allOf等待所有任务执行完成,直到timeout超时。1
2
3CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get(timeout, TimeUnit.MILLISECONDS);
若timeout超时后,所有任务均返回失败;类似事务操作,导致已获取结果的任务后续处理异常,无法容错。
正确使用方法
allOf不限制等待时间,子任务内部限定。具体操作如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 4j
public class CompletableFutureUtil {
private List<CompletableFuture<Void>> completableFutureList = new ArrayList<>(1000);
public static ScheduledExecutorService es = Executors.newScheduledThreadPool(20);
public static CompletableFutureUtil newInstance() {
return new CompletableFutureUtil();
}
public CompletableFutureUtil runAsync(Runnable runnable, int timeout) {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(runnable, ThreadPool.POOL);
completableFutureList.add(timeoutAfter(timeout, completableFuture));
return this;
}
public CompletableFutureUtil runAsync(Runnable runnable) {
completableFutureList.add(CompletableFuture.runAsync(runnable, ThreadPool.POOL));
return this;
}
public void finish(int timeout) {
try {
CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.warn("timeout,caller:{},size:{}", getMethod(), completableFutureList.size());
} catch (Exception e) {
log.error("error,caller:{},size:{}", getMethod(), completableFutureList.size(), e);
}
}
public void finish() {
try {
CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.get();
} catch (InterruptedException e) {
log.warn("interrupted,caller:{},size:{}", getMethod(), completableFutureList.size());
} catch (Exception e) {
log.error("error,caller:{},size:{}", getMethod(), completableFutureList.size(), e);
}
}
private <T> CompletableFuture<T> timeoutAfter(int timeout,
CompletableFuture<T> f) {
es.schedule(() -> f.complete(null), timeout, TimeUnit.MILLISECONDS);
return f;
}
public static String getMethod() {
StackTraceElement caller = (new Throwable()).getStackTrace()[2];
return caller.getClassName() + "@" + caller.getMethodName();
}
}
调用方法:1
2
3
4
5
6
7
8
9
10
11CompletableFutureUtil completableFutureUtil = CompletableFutureUtil.newInstance();
list.forEach(key -> completableFutureUtil.runAsync(() -> {
count.incrementAndGet();
}, 1000));
try {
completableFutureUtil.finish();
} catch (Exception e) {
log.error("error ", e);
}
- 异步任务内部进行限定超时时间,runAsync(Runnable runnable, int timeout)
- 内部由定时任务进行扫描和容错处理(返回null),es.schedule(() -> f.complete(null), timeout, TimeUnit.MILLISECONDS);
- allOf不限定执行时候,直接get等待