Java 8 completable future programming

1、 Introduction

  the so-called asynchronous call is actually a method that allows the operation to continue without waiting for the return value of the called function. In the Java language, simply speaking, it is to start another thread to complete some calculations in the call, so that the call can continue to run or return without waiting for the calculation results. However, the caller still needs to get the calculation result of the thread.

  jdk5 adds a future interface to describe the results of an asynchronous calculation. Although future and related usage methods provide the ability to execute tasks asynchronously, it is very inconvenient to obtain the results. The results of tasks can only be obtained by blocking or polling. The blocking method is obviously contrary to the original intention of our asynchronous programming. The polling method will consume unnecessary CPU resources and can not get the calculation results in time.

private static final ExecutorService POOL = Executors.newFixedThreadPool(TASK_THRESHOLD,new ThreadFactory() {
        AtomicInteger atomicInteger = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"demo15-" + atomicInteger.incrementAndGet());
        }
    });

    public static void main(String[] args) throws ExecutionException,InterruptedException {
        Future<Integer> submit = POOL.submit(() -> 123);
        // 1. get() 方法用户返回计算结果,如果计算还没有完成,则在get的时候会进行阻塞,直到获取到结果为止
        Integer get = submit.get();
        // 2. isDone() 方法用于判断当前Future是否执行完成。
        boolean done = submit.isDone();
        // 3. cancel(boolean mayInterruptIfRunning) 取消当前线程的执行。参数表示是否在线程执行的过程中阻断。
        boolean cancel = submit.cancel(true);
        // 4. isCancelled() 判断当前task是否被取消.
        boolean cancelled = submit.isCancelled();
        // 5. invokeAll 批量执行任务
        Callable<String> callable = () -> "Hello Future";
        List<Callable<String>> callables = Lists.newArrayList(callable,callable,callable);
        List<Future<String>> futures = POOL.invokeAll(callables);
    }

  in Java 8, completable future provides a very powerful extension function of future, which can help us simplify the complexity of asynchronous programming, provide the ability of functional programming, process calculation results through callback, and provide methods to convert and combine completable future.

Tips: completionstage represents a stage in the asynchronous computing process. After one stage is completed, another stage may be triggered.

2、 Completable future uses

1. runAsync、supplyAsync

// 无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
// 有返回值
public static <U> CompletableFuture<U> supplyAsync(supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(supplier<U> supplier,Executor executor)

Runasync and supplyasync methods are methods provided by completable future to create asynchronous operations. It should be noted that if the executor is not specified as the thread pool, forkjoinpool will be used Commonpool () executes asynchronous code as its thread pool; If a thread pool is specified, it runs with the specified thread pool. All of the following methods are similar.

public class Demo1 {

    public static void main(String[] args) throws ExecutionException,InterruptedException {
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println(123));

        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "CompletableFuture");
        System.out.println(supplyAsync.get());
    }
}

2. whenComplete、exceptionally

// 执行完成时,当前任务的线程执行继续执行 whenComplete 的任务。
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
// 执行完成时,把 whenCompleteAsync 这个任务提交给线程池来进行执行。
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

When the calculation of completable future is completed, the whencomplete method will be executed; When an exception is thrown in the completable future calculation, the exceptionally method is executed.

public class Demo2 {

    public static void main(String[] args) throws ExecutionException,InterruptedException {

        CompletableFuture<Integer> runAsync = CompletableFuture.supplyAsync(() -> 123456);
        runAsync.whenComplete((t,throwable) -> {
            System.out.println(t);
            if (throwable != null) {
                throwable.printStackTrace();
            }
        });
        runAsync.whenCompleteAsync((t,throwable) -> {
            System.out.println(t);
            if (throwable != null) {
                throwable.printStackTrace();
            }
        });
        runAsync.exceptionally((throwable) -> {
            if (throwable != null) {
                throwable.printStackTrace();
            }
            return null;
        });
        TimeUnit.SECONDS.sleep(2);
    }
}

3. thenApply、handle

// T:上一个任务返回结果的类型
// U:当前任务的返回值类型

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor)

public <U> CompletionStage<U> handle(BiFunction<? super T,Throwable,? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T,Executor executor);

When a thread depends on another thread, the thenapply method can be used to serialize the two threads

The handle method is handled in much the same way as the thenapply method. The difference is that the handle is executed after the task is completed. It can also handle abnormal tasks. Thenapply can only execute normal tasks. If the task is abnormal, thenapply method will not be executed.

public class Demo3 {

    public static void main(String[] args) throws ExecutionException,InterruptedException {
        // thenApply
        CompletableFuture<Integer> thenApply = CompletableFuture.supplyAsync(() -> 123).thenApply(t -> t * t);
        System.out.println(thenApply.get());

       // handle
        CompletableFuture<Integer> handle = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 0;
            return new Random().nextInt(10);
        }).handle((t,throwable) -> {
            if (throwable != null) {
                throwable.printStackTrace();
                return -1;
            }
            return t * t;
        });
        System.out.println(handle.get());
    }
}

4. thenAccept、thenRun

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

Thenaccept receives the processing result of the task and consumes the processing result. No results returned.

Unlike thenaccept, thenrun does not care about the processing result of the task. As long as the above task is completed, thenrun will be executed.

public class Demo4 {

    public static void main(String[] args) {
        // thenAccept
        CompletableFuture<Void> thenAccept = CompletableFuture.supplyAsync(() -> new Random().nextInt(10)).thenAccept(System.out::println);

       // thenRun
        CompletableFuture<Void> thenRun = CompletableFuture.supplyAsync(() -> new Random().nextInt(10)).thenRun(() -> System.out.println(123));
    }
}

5. thenCombine、thenAcceptBoth

 // T 表示第一个 CompletionStage 的返回结果类型
 // U 表示第二个 CompletionStage 的返回结果类型
 // V表示 thenCombine/thenAcceptBoth 处理结果类型
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,? extends V> fn,Executor executor);

public <U,Executor executor);

Thencombine and thenacceptboth are used to merge tasks - wait until the two completionstage tasks are completed, and then process the results of the two tasks together. The difference is that thencombine has a return value; Thenacceptboth has no return value.

public class Demo5 {

    public static void main(String[] args) throws ExecutionException,InterruptedException {
        // thenCombine
        CompletableFuture<String> thenCombine = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
                .thenCombine(CompletableFuture.supplyAsync(() -> "str"),// 第一个参数是第一个 CompletionStage 的处理结果
                        // 第二个参数是第二个 CompletionStage 的处理结果
                        (i,s) -> i + s
                );
        System.out.println(thenCombine.get());

        // thenAcceptBoth 
        CompletableFuture<Void> thenAcceptBoth = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "str"),(i,s) -> System.out.println(i + s));
    }
}

6. applyToEither、acceptEither、runAfterEither、runAfterBoth

Because these methods have similar meanings and use more similar, we will introduce them with applytoeither

// T 两个 CompletionStage 组合运算后的结果类型
// U 下一步处理运算的结果返回值类型
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T,U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,U> fn,Executor executor);
public class Demo6 {

    public static void main(String[] args) throws ExecutionException,InterruptedException {

        CompletableFuture<Integer> applyToEither = CompletableFuture.supplyAsync(() -> {
            int nextInt = new Random().nextInt(10);
            try {
                Thread.sleep(nextInt);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f1=" + nextInt);
            return nextInt;
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            int nextInt = new Random().nextInt(10);
            try {
                Thread.sleep(nextInt);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f2=" + nextInt);
            return nextInt;
        }),i -> i);

        System.out.println(applyToEither.get());
    }
}

7. thenCompose

public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn,Executor executor) ;

Thenpose method allows you to pipeline two completionstages. When the first operation is completed, the result is passed to the second operation as a parameter.

public class Demo7 {

    public static void main(String[] args) throws ExecutionException,InterruptedException {

        CompletableFuture<Integer> thenCompose = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
                .thenCompose(i -> CompletableFuture.supplyAsync(() -> i * i));
        System.out.println(thenCompose.get());

    }
}

Reference blog: https://www.jianshu.com/p/6bac52527ca4

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>