CompletableFuture组合式异步编程

Callable与Runnable

java.lang.Runnable吧,它是一个接口,在它里面只声明了一个run()方法,由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call(),这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

Future 接口

要使用Future,通常你只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。

局限性:

  • 将两个异步计算合并为一个
  • 等待Future集合中的所有任务都完成
  • 仅等待Future集合中最快结束的任务完成
  • 通过编程方式完成一个Future任务的执行
  • 应对Future的完成事件

并行操作

同步API,使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用。

异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用的由来。

FutureTask

为避免发生客户端被阻塞的风险,使用FutureTask执行完毕可以发送一个通知,仅在计算结果可用时执行一个由Lambda表达式或者方法引用定义的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Task task = new Task();// 新建异步任务
FutureTask<Integer> future = new FutureTask<Integer>(task) {
// 异步任务执行完成,回调
@Override
protected void done() {
try {
System.out.println("future.done():" + get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
};
// 创建线程池(使用了预定义的配置)
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(future);

parallelStream并行流

1
2
3
4
5
6
public List<String> findPrices(String product) { 
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)))
.collect(toList());
}

CompletableFuture异步

1
2
3
4
5
6
7
8
9
10
11
public List<String> findPrices(String product) { 
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getName() + " price is " +
shop.getPrice(product)))
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
  • 如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的。
  • 如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程。
  • 如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,可以设定需要使用的线程数。
  • 涉及等待I/O的操作不使用并行流的另一个原因是,处理流的
    流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

CompletableFuture内的异常

1
2
3
4
5
6
7
8
9
10
11
12
public Future<Double> getPriceAsync(String product) { 
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread( () -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception ex) {
futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;
}
------ 本文结束------

本文标题:CompletableFuture组合式异步编程

文章作者:Perkins

发布时间:2020年10月09日

原始链接:https://perkins4j2.github.io/posts/13999100/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。