简介 异步计算所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。
JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。
以前我们获取一个异步任务的结果可能是这样写的:
Future 接口的局限性Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:
将多个异步计算的结果合并成一个 等待Future集合中的所有任务都完成 Future完成事件(即,任务完成以后触发执行动作) CompletionStageCompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println()) 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发 CompletableFuture在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。 它实现了Future和CompletionStage接口
实例代码 基本的CompletableFuter使用1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;public class BaseComFuture { public static void main (String[] args) throws InterruptedException, ExecutionException { long start = System.nanoTime(); start = System.nanoTime(); CompletableFuture<String> resultCompletableFuture = CompletableFuture.supplyAsync(()->{ try { System.out.println("get start1" ); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } return "Hello CompletableFuture" ; }); TimeUnit.SECONDS.sleep(1 ); System.out.println(123 ); System.out.println(resultCompletableFuture.get()); System.out.println("aaaaaaa" ); System.out.println("aaaaaaa" ); System.out.println("aaaaaaa" ); System.out.println(456 ); System.out.println(System.nanoTime()-start); } }
回调1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class BaseComFutureCallback { public static void main (String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(5 ); CompletableFuture<String> resultCompletableFuture = CompletableFuture.supplyAsync(()->{ try { System.out.println("get start,will sleep 3s" ); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello CompletableFuture" ; }, executor); CompletableFuture<String> resultCompletableFuture2 = CompletableFuture.supplyAsync(()->{ try { System.out.println("get start,will sleep 3s" ); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello CompletableFuture" ; }, executor); System.out.println(resultCompletableFuture.thenAccept((t)->{ System.out.println("进入回调函数-" + t); System.out.println(Thread.currentThread().getName()); })); System.out.println(resultCompletableFuture2.thenAccept((t)->{ System.out.println("进入回调函数-" + t); System.out.println(Thread.currentThread().getName()); })); System.out.println("带有回调的print语句后面一句话" ); System.out.println("" ); System.out.println("it will shutdown 10's later" ); TimeUnit.SECONDS.sleep(10 ); executor.shutdown(); System.out.println("shutdown" ); } }
异常通知的情况1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.function.Consumer;import java.util.function.Function;import java.util.function.Supplier;public class BaseComFutureExceptionally2 { public static void main (String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(5 ); CompletableFuture<String> resultCompletableFuture = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get () { try { System.out.println("get start,will sleep 3s" ); TimeUnit.SECONDS.sleep(3 ); throw new RuntimeException("错误" ); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello CompletableFuture" ; } }, executor); System.out.println(resultCompletableFuture.thenAccept(new Consumer<String>() { @Override public void accept (String t) { System.out.println("进入回调函数-" + t); throw new RuntimeException("aaaaa" ); } }).exceptionally(new Function<Throwable, Void>() { @Override public Void apply (Throwable t) { System.out.println(t.getMessage()); return null ; } })); resultCompletableFuture.completeExceptionally(new RuntimeException("error" )); System.out.println("it will shutdown 10's later" ); TimeUnit.SECONDS.sleep(10 ); executor.shutdown(); System.out.println("time out ,main shutdown now" ); } }
我们不仅仅回调,还可以将这些回调 操作串起来需要用到apply函数组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 import java.util.concurrent.*;import java.util.function.Function;public class ConvertComFuture { public static void main (String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(5 ); CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("aaaaa" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } return "zero" ; }, executor); CompletableFuture<Integer> f2 = f1.thenApply(new Function<String, Integer>() { @Override public Integer apply (String t) { System.out.println("进入f2的apply方法" ); System.out.println("f1传进来的字符串-" +t); System.out.println("返回该字符串的长度-" +Integer.valueOf(t.length())); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.valueOf(t.length()); } }).exceptionally(new Function<Throwable, Integer>() { @Override public Integer apply (Throwable t) { System.out.println(t.getMessage()); return null ; } }); System.out.println("bbbbb" ); CompletableFuture<Double> f3 = f2.thenApply(self -> self * 2.0 ).thenApply(self -> self*3 ).exceptionally(new Function<Throwable, Double>() { @Override public Double apply (Throwable t) { System.out.println(t.getMessage()); return null ; } }); System.out.println("shutdown in 3s" ); TimeUnit.SECONDS.sleep(3 ); System.out.println("shutdown" ); executor.shutdown(); } }