天天看点

Java8异步编程-CompletableFuture

总结

创建无返回异步任务

runAsync

创建有返回异步任务

supplyAsync

异步任务正常执行完或者抛出异常时

whenComplete

whenCompleteAsync

异步任务抛出异常时

exceptionally

异步任务串行化,前一个有返回异步任务正常执行完,返回值作为下一个有参有返回异步任务的参数

thenApply

thenApplyAysnc

异步任务串行化,前一个有返回异步任务正常执行完或者抛出异常时,返回值作为下一个有参有返回异步任务的参数

handle

handleAsync

异步任务串行化,前一个有返回异步任务正常执行完,返回值作为下一个有参无返回异步任务的参数

thenAccept

thenAcceptAsync

异步任务串行化,前一个异步任务正常执行完,执行下一个无参无返回的异步任务

thenRun

thenRunAsync

整合异步任务,两个异步任务都执行完,把两个异步任务的结果放到一块处理, 有参有返回

thenCombine

thenCombineAsync

整合异步任务,两个异步任务都执行完,把两个异步任务的结果放到一块处理, 有参无返回

thenAcceptBoth

thenAcceptBothAsync

整合异步任务,哪个返回结果快就使用哪个结果,有参有返回

applyToEither

applyToEitherAsync

整合异步任务,哪个返回结果快就使用哪个结果,有参无返回

acceptEither

acceptEitherAsync

两个异步任务,任何一个执行完成了都会执行下一步操作,无参无返回

runAfterEither

runAfterEitherAsync

两个异步任务,都完成了才会执行下一步操作,无参无返回

runAfterBoth

runAfterBothAsync

所有的异步任务都完成

allOf

任意一个异步任务完成

anyOf

获取异步任务结果

get

join

区别:get会抛异常, join不会抛异常

创建无返回值的异步任务

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}      

创建一个无返回值的异步任务,

第一个方法使用默认线程池ForkJoinPool#commonPool()

第二个方法使用自定义线程池

eg

@Test
  public void testRunAsync() {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("执行异步任务"));
    future.join();
  }      

创建有返回值的异步任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

@FunctionalInterface
public interface Supplier<T> {
    T get();
}      

创建一个有返回值的异步任务

第一个方法使用默认线程池ForkJoinPool#commonPool()

第二个方法使用自定义线程池

eg

@Test
  public void testSupplyAsync() {
      CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
          System.out.println("执行异步任务");
          return true
      });
      boolean result = future.join();
      System.out.println(result);
  }      

异步任务正常执行完成或者发生异常时

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

@FunctionalInterface
public interface BiConsumer<T, U> {
    void accept(T t, U u);
}      

异步任务正常执行完成或者发生异常时

第一个方法,使用执行异步任务的线程继续执行

第二个方法,使用默认线程池中ForkJoinPool#commonPool()的线程执行

第三个方法,使用自定义线程池中的线程执行

eg

可以通过判断第二个参数是否为空,来判断是否发生异常,不过不推荐,推荐使用exceptionally处理异常

@Test
    public void testSupplyAsync() {
        CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行异步任务");
            if (true) {
                throw new NullPointerException("测试抛出空指针");
            }
            return true;
        }).whenCompleteAsync((b, e) -> {
            System.out.println("异步任务执行完成");
            if (Objects.nonNull(e)) {
                System.out.println("异步任务发生异常:" + e.getMessage());
            }
        });

        boolean result = future.join();
        System.out.println(result);
    }      

注意:

这个地方虽然抛出的是NullPointerException, 但是在whenCompleteAsync方法中捕获时,异常已经向上转型成了Throwable,所以这个地方无法调用抛出异常的私有方法

异步任务抛异常时

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}      

异步任务抛出异常时执行

eg

@Test
    public void testSupplyAsync() {
        CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行异步任务");
            if (true) {
                throw new NullPointerException("测试抛出空指针");
            }
            return true;
        }).whenCompleteAsync((b, e) -> {
            System.out.println("异步任务执行完成");
            if (Objects.nonNull(e)) {
                System.out.println("异步任务发生异常:" + e.getMessage());
            }
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return false;
        });

        boolean result = future.join();
        System.out.println(result);
    }      

异步任务串行化, 前一个有返回值的异步任务正常执行完, 返回值作为下一个有返回值的异步任务的参数

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}      

异步任务串行化, 前一个有返回值的异步任务正常执行完, 返回值作为下一个有返回值的异步任务的参数

第一个方法,使用执行异步任务的线程继续执行

第二个方法,使用默认线程池中ForkJoinPool#commonPool()的线程执行

第三个方法,使用自定义线程池中的线程执行

eg

@Test
    public void testWhenCompele() {
        CompletableFuture<Boolean> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程1开始执行");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步线程1执行结束");
            return "线程1";
        }).thenApply(str -> {
            System.out.println(str);
            return false;
        });

        System.out.println(future1.join());
    }      

异步任务串行化, 前一个有返回值的异步任务正常执行完, 返回值作为下一个无返回值的异步任务的参数

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}      

异步任务串行化, 前一个有返回值的异步任务正常执行完, 返回值作为下一个无返回值的异步任务的参数

第一个方法,使用执行异步任务的线程继续执行

第二个方法,使用默认线程池中ForkJoinPool#commonPool()的线程执行

第三个方法,使用自定义线程池中的线程执行

eg

@Test
    public void testWhenCompele() {
        CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程1开始执行");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步线程1执行结束");
            return "线程1";
        }).thenAccept(str -> System.out.println(str));

        System.out.println(future1.join());
    }      

异步任务串行化, 前一个有返回值的异步任务正常执行完或者发生异常时, 返回值作为下一个有返回值的异步任务的参数

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

@FunctionalInterface
public interface BiFunction<T, U, R> {
    R apply(T t, U u);
}      

异步任务串行化, 前一个有返回值的异步任务正常执行完或者发生异常时, 返回值作为下一个有返回值的异步任务的参数

第一个方法,使用执行异步任务的线程继续执行

第二个方法,使用默认线程池中ForkJoinPool#commonPool()的线程执行

第三个方法,使用自定义线程池中的线程执行

eg

@Test
    public void testWhenCompele() {
        CompletableFuture<Boolean> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步线程1开始执行");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步线程1执行结束");
            return "线程1";
        }).handle((str, e) -> {
            System.out.println(str);
            if (Objects.nonNull(e)) {
                System.out.println(e.getMessage());
            }
            return false;
        });

        System.out.println(future1.join());
    }      

异步任务串行化, 前一个有返回值的异步任务正常执行完, 继续执行下一个无参数无返回值异步任务

常见API

runAsync

创建无返回值的异步任务, 类似于execute方法

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);      

supplyAsync

创建有返回值的异步任务, 类似于submit方法

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);      

注意:

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码

在 JVM 的后台,使用通用的 fork/join 池,该池是所有并行流共享的。

默认情况,fork/join 池会为每个处理器分配一个线程。假设你有一台16核的机器,这样你就只能创建16个线程。

ForkJoinPool 最适合的是计算密集型的任务

对 CPU 密集型的任务来说,这样是有意义的,因为你的机器确实只能执行16个线程。但是真实情况下,不是所有的任务都是 CPU 密集型的。

whenComplete

异步任务执行完成时的回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);      
public CompletableFuture<T> exceptionally (Function<Throwable, ? extends T> fn);      

继续阅读