Java8自定义CompletableFuture的原理是什么


Java8 自定义CompletableFuture原理

Future 接口 的局限性有很多,其中一个就是需要主动的去询问是否完成,如果等子线程的任务完成以后,通知我,那岂不是更好?

public class FutureInAction3 {    public static void main(String[] args) {        Future future = invoke(() -> {            try {                Thread.sleep(10000L);                return "I am Finished.";            } catch (InterruptedException e) {                return "I am Error";            }        });        future.setCompletable(new Completable() {            @Override            public void complete(String s) {                System.out.println("complete called ---- " + s);            }            @Override            public void exception(Throwable cause) {                System.out.println("error");                cause.printStackTrace();            }        });        System.out.println("....do something else .....");        System.out.println("try to get result ->" + future.get());    }    private static  Future invoke(Callable callable) {        AtomicReference result = new AtomicReference<>();        AtomicBoolean finished = new AtomicBoolean(false);        Future future = new Future() {            private Completable completable;            @Override            public T get() {                return result.get();            }            @Override            public boolean isDone() {                return finished.get();            }            // 设置完成            @Override            public void setCompletable(Completable completable) {                this.completable = completable;            }            // 获取            @Override            public Completable getCompletable() {                return completable;            }        };        Thread t = new Thread(() -> {            try {                T value = callable.action();                result.set(value);                finished.set(true);                if (future.getCompletable() != null)                    future.getCompletable().complete(value);            } catch (Throwable cause) {                if (future.getCompletable() != null)                    future.getCompletable().exception(cause);            }        });        t.start();        return future;    }    private interface Future {        T get();        boolean isDone();        //  1        void setCompletable(Completable completable);        //  2        Completable getCompletable();    }    private interface Callable {        T action();    }    // 回调接口    private interface Completable {        void complete(T t);        void exception(Throwable cause);    }}


Java8 中的 completeFuture 是对 Future 的扩展实现, 主要是为了弥补 Future 没有相应的回调机制的缺陷.

我们先看看 Java8 之前的 Future 的使用

package demos;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;/** * @author djh on  2019/4/22 10:23 * @E-Mail 1544579459@qq.com */public class Demo {    public static void main(String[] args) throws ExecutionException, InterruptedException {        ExecutorService cachePool = Executors.newCachedThreadPool();        Future future = cachePool.submit(() -> {            Thread.sleep(3000);            return "异步任务计算结果!";        });        // 提交完异步任务后, 主线程可以继续干一些其他的事情.        doSomeThingElse();        // 为了获取异步计算结果, 我们可以通过 future.get 和 轮询机制来获取.        String result;        // Get 方式会导致当前线程阻塞, 这显然违背了异步计算的初衷.        // result = future.get();        // 轮询方式虽然不会导致当前线程阻塞, 但是会导致高额的 CPU 负载.        long start = System.currentTimeMillis();        while (true) {            if (future.isDone()) {                break;            }        }        System.out.println("轮询耗时:" + (System.currentTimeMillis() - start));                result = future.get();        System.out.println("获取到异步计算结果啦: " + result);        cachePool.shutdown();    }    private static void doSomeThingElse() {        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("我的最重要的事情干完了, 我要获取异步计算结果来执行剩下的事情.");    }}


从上面的 Demo 中我们可以看出, future 在执行异步任务时, 对于结果的获取显的不那么优雅, 很多第三方库就针对 Future 提供了回调式的接口以用来获取异步计算结果, 如Google的: ListenableFuture, 而 Java8 所提供的 CompleteFuture 便是官方为了弥补这方面的不足而提供的 API.


package demos;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * @author djh on  2019/5/1 20:26 * @E-Mail 1544579459@qq.com */public class CompleteFutureDemo {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture completableFutureOne = new CompletableFuture<>();        ExecutorService cachePool = Executors.newCachedThreadPool();        cachePool.execute(() -> {            try {                Thread.sleep(3000);                completableFutureOne.complete("异步任务执行结果");                System.out.println(Thread.currentThread().getName());            } catch (InterruptedException e) {                e.printStackTrace();            }        });        // WhenComplete 方法返回的 CompletableFuture 仍然是原来的 CompletableFuture 计算结果.        CompletableFuture completableFutureTwo = completableFutureOne.whenComplete((s, throwable) -> {            System.out.println("当异步任务执行完毕时打印异步任务的执行结果: " + s);        });        // ThenApply 方法返回的是一个新的 completeFuture.        CompletableFuture completableFutureThree = completableFutureTwo.thenApply(s -> {            System.out.println("当异步任务执行结束时, 根据上一次的异步任务结果, 继续开始一个新的异步任务!");            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return s.length();        });        System.out.println("阻塞方式获取执行结果:" + completableFutureThree.get());        cachePool.shutdown();    }}

从上面的 Demo 中我们主要需要注意 thenApply 和 whenComplete 这两个方法, 这两个方法便是 CompleteFuture 中最具有意义的方法, 他们都会在 completeFuture 调用 complete 方法传入异步计算结果时回调, 从而获取到异步任务的结果.

相比之下 future 的阻塞和轮询方式获取异步任务的计算结果, CompleteFuture 获取结果的方式就显的优雅的多。
