CompletableFuture原理与用法详解(8大使用场景)

CompletableFuture原理与用法详解(8大使用场景)-mikechen

CompletableFuture是由Java 8引入的,这让我们编写清晰可读的异步代码变得更加容易,该类功能比Future 更加强大。

什么是 CompletableFuture

在Java中CompletableFuture用于异步编程,异步通常意味着非阻塞,可以使我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。

CompletableFuture 类的设计灵感来自于 Google Guava 的 ListenableFuture 类,它实现了 Future 和 CompletionStage 接口并且新增了许多方法,它支持 lambda,通过回调利用非阻塞方法,提升了异步编程模型。

通过这种方式,主线程不会被阻塞,因为子线程是另外一条线程在执行,你可以用主线程去并行执行其他的任务,使用这种并行方式,极大地提升了程序的表现。

 

为什么要引入 CompletableFuture

一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度,所以JDK5新增了Future接口,用于描述一个异步计算的结果。

虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,我们必须使用Future.get的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果。

Future<String> future = executor.submit(()->{
       Thread.sleep(2000);
       return "hello world";
});
//轮询获取结果
while (true){
    if(future.isDone()) {
         System.out.println(future.get());
         break;
     }
 }

从上面的形式看来轮询的方式会耗费无谓的CPU资源,而且也不能及时地得到计算结果,所以要实现真正的异步,上述这样是完全不够的,在Netty中我们随处可见异步编程。

代码如下:

ChannelFuture f = serverBootstrap.bind(port).sync();
f.addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    System.out.println("complete");
                }
            });

除此之外,我们需要更强大异步能力,单纯使用Future接口或者FutureTask类并不能很好地完成以下业务场景:

  • 将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果;
  • 等待Future集合种的所有任务都完成;
  • 仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果;
  • 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式);
  • 应对Future的完成时间(即当Future的完成时间完成时会收到通知,并能使用Future的计算结果进行下一步的的操作,不只是简单地阻塞等待操作的结果)。

所以JDK 8.0新增了CompletableFuture 来解决上述这些痛点。

 

CompletableFuture的应用场景

  1. 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度。
  2. 使用CompletableFuture类,它提供了异常管理的机制,让你有机会抛出、管理异步任务执行种发生的异常。
  3. 如果这些异步任务之间相互独立,或者他们之间的的某一些的结果是另一些的输入,你可以讲这些异步任务构造或合并成一个。

 

CompletableFuture设计思想

CompletableFuture按照类似“观察者模式”的设计思想,原理分析可以从“观察者”和“被观察者”两个方面着手。

由于回调种类多,但结构差异不大,所以这里单以一元依赖中的thenApply为例,不再枚举全部回调类型,如下图所示:

CompletableFuture原理与用法详解(8大使用场景)-mikechen

被观察者

  1. 每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。上面例子中步骤fn2就是作为观察者被封装在UniApply中。
  2. 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在上面的例子中对应步骤fn1的执行结果。

观察者

CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,然后检查当前CF是否已处于完成状态(即result != null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。

  1. 观察者中的dep属性:指向其对应的CompletableFuture,在上面的例子中dep指向CF2。
  2. 观察者中的src属性:指向其依赖的CompletableFuture,在上面的例子中src指向CF1。
  3. 观察者Completion中的fn属性:用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法(thenAccept、thenApply、exceptionally等)接收的函数类型也不同,即fn的类型有很多种,在上面的例子中fn指向fn2。

 

CompletableFuture核心功能

CompletableFuture的功能主要体现在它的CompletionStage,如下图所示:

CompletableFuture原理与用法详解(8大使用场景)-mikechen

CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段,可以实现如下功能:

  • 转换(thenCompose)
  • 组合(thenCombine)
  • 消费(thenAccept)
  • 运行(thenRun)
  • 带返回的消费(thenApply)

具体其他功能大家可以根据需求自行查看。

 

 CompletableFuture创建使用

创建CompletableFuture对象,提供了四个静态方法用来创建CompletableFuture对象:

public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

Asynsc表示异步,而supplyAsyncrunAsync不同在与前者异步返回一个结果,后者是void.第二个函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()作为它的线程池.其中Supplier是一个函数式接口,代表是一个生成者的意思,传入0个参数,返回一个结果。

CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            return "hello world";
  });
System.out.println(future.get());  //阻塞的获取结果  ''helllo world"

 

CompletableFuture用法详解

场景一:主动完成任务

主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会 阻塞,最后我们在一个子线程中使其终止。

public class CompletableFutureDemo1 {

    /**
     * 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "子线程开始干活");
                //子线程睡 5 秒
                Thread.sleep(5000);
//                //在子线程中完成主线程 如果注释掉这一行代码将会一直停住
                future.complete("success");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "A").start();
        //主线程调用 get 方法阻塞
        System.out.println("主线程调用 get 方法获取结果为: " + future.get());
        System.out.println("主线程完成,阻塞结束!!!!!!");
    }
}

 

场景二:没有返回值的异步任务

runAsync:返回一个新的 CompletableFuture,它在运行给定操作后由在ForkJoinPool.commonPool()运行的任务异步完成。

如果你想异步的运行一个后台任务并且不需要任务返回结果,就可以使用runAsync。

public class CompletableFutureDemo2 {

    /**
     * 没有返回值的异步任务
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("子线程启动干活");
                Thread.sleep(5000);
                System.out.println("子线程完成");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        //主线程阻塞
        future.get();
        System.out.println("主线程结束");
    }
}

 

场景三:有返回值的异步任务

supplyAsync:返回任务结果。

CompletableFuture.supplyAsync()它持有supplier<T> 并且返回CompletableFuture<T>,T 是通过调用 传入的supplier取得的值的类型。

Supplier<T> 是一个简单的函数式接口,表示supplier的结果。它有一个get()方法,该方法可以写入你的后台任务中,并且返回结果。

/**
 * 主线程开始
 * 子线程启动干活
 * 子线程任务完成
 * 主线程结束
 */
public class CompletableFutureDemo2 {

    /**
     * 有返回值的异步任务
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("子线程启动干活");
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "子线程任务完成";
        });
        //主线程阻塞
        System.out.println(future.get());
        System.out.println("主线程结束");
    }
}

 

场景四:线程串行化

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

/**
 主线程开始
 主线程结束, 子线程的结果为:和朋友一起去次烧烤!!!到店里——>开始点烧烤!!和朋友们次完烧烤,给女朋友带杯奶茶回去!!
 */
public class CompletableFutureDemo4 {

    private static String action="";

    /**
     * 线程依赖
     * 1、我到了烧烤店,
     * 2、开始点烧烤
     * 3、和朋友次完烧烤 ,给女朋友带奶茶回去
     * @param args
     */
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture<String> future =
                CompletableFuture.supplyAsync(() -> {
                    action="和朋友一起去次烧烤!!!! ";
                    return action;
                }).thenApply(string -> {
                    return  action+="到店里——>开始点烧烤!!";
                }).thenApply(String->{
                    return  action+="和朋友们次完烧烤,给女朋友带杯奶茶回去!!";
                });
        String str = future.get();
        System.out.println("主线程结束, 子线程的结果为:" + str);
    }

}

 

场景五:thenAccept 消费处理结果

如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept() 和 thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。

thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

/**
 主线程开始
 子线程全部处理完成,最后调用了 accept,结果为:逛淘宝,想买双鞋 等待快递到来
 */
public class CompletableFutureDemo5 {

    private static String action = "";

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture.supplyAsync(() -> {
            try {
                action = "逛淘宝,想买双鞋 ";
            } catch (Exception e) {
                e.printStackTrace();
            }
            return action;
        }).thenApply(string -> {
            return action + "选中了,下单成功!!";
        }).thenApply(String -> {
            return action + "等待快递到来";
        }).thenAccept(new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + s);
            }
        });
    }
}

 

场景六:结果合并

thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果

/**
 * 主线程开始
 * 让num+10;任务开始
 * 20
 * 21
 */
public class CompletableFutureDemo6 {

    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //第一步加 10
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("让num+10;任务开始");
            num += 10;
            return num;
        });
        //合并
        CompletableFuture<Integer> future1 = future.thenCompose(i ->
                //再来一个 CompletableFuture
                CompletableFuture.supplyAsync(() -> {
                    return i + 1;
                }));
        System.out.println(future.get());
        System.out.println(future1.get());
    }
}

 

场景7:合并多个任务的结果

allOf 与 anyOf

allOf: 一系列独立的 future 任务,等其所有的任务执行完后做一些事情

/**主线程开始
 乘以 10 任务开始
 加 10 任务开始
 减以 10 任务开始
 除以 10 任务开始
 [110, 100, 100, 10]
*/
public class CompletableFutureDemo7 {

    private static Integer num = 10;


    public static void main(String[] args) throws Exception{
        System.out.println("主线程开始");
        List<CompletableFuture> list = new ArrayList<>();

        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("加 10 任务开始");
            num += 10;
            return num;
        });
        list.add(job1);

        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("乘以 10 任务开始");
            num = num * 10;
            return num;
        });
        list.add(job2);

        CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("减以 10 任务开始");
            num = num - 10;
            return num;
        });
        list.add(job3);

        CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
            System.out.println("除以 10 任务开始");
            num = num / 10;
            return num;
        });
        list.add(job4);

        //多任务合并
        List<Integer> collect =
                list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
        System.out.println(collect);
    }

}

 

作者简介

陈睿|mikechen,10年+大厂架构经验,BAT资深面试官,就职于阿里巴巴、淘宝、百度等一线互联网大厂。

👇阅读更多mikechen架构文章👇

阿里架构 |双11秒杀 |分布式架构 |负载均衡 |单点登录 |微服务 |云原生 |高并发 |架构师

以上

关注作者「mikechen」公众号,获取更多技术干货!

后台回复架构,即可获取《阿里架构师进阶专题全部合集》,后台回复面试即可获取《史上最全阿里Java面试题总结

评论交流
    说说你的看法