天天看点

CompletableFuture案例详解

作者:互联网高级架构师

CompletableFuture继承于java.util.concurrent.Future,它本身具备Future的所有特性,并且基于JDK1.8的流式编程以及Lambda表达式等实现一元操作符、异步性以及事件驱动编程模型,可以用来实现多线程的串行关系,并行关系,聚合关系。它的灵活性和更强大的功能是Future无法比拟的。

获取返回结果

get():阻塞获取返回结果

get()方式是阻塞的获取返回结果,它会一直等到有返回结果为止,程序才会继续往下执行,所以建议放在程序最后执行

public static void main(String[] args) {
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        return 1;
    });
    try {
       
        completableFuture.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}           

completableFuture.get(1,TimeUnit.SECONDS):阻塞限时获取返回结果

也是阻塞获取返回结果,但是在1秒后没有获取到返回结果就会抛出一个超时异常,工作中建议使用这个,避免get()无限期阻塞等待

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    return 1;
});
try {
    
    completableFuture.get(1,TimeUnit.SECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}           

completableFuture.getNow(1):立即返回

立即返回一个结果,如果程序完成了就返回,如果程序没有完成那么就返回一个默认值

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    return 1;
});
completableFuture.getNow(1);           

completableFuture.join():阻塞获取返回结果

join()与get()一样,区别是join()不会抛出异常

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    return 1;
});
completableFuture.join();           

completableFuture.complete()

complete()要与get()方法一起使用 complete是打断的意思,如果打断成功,那么就返回默认值44,如果打断失败就获取到真实的返回结果 当执行complete()方法时,我们异步方法已经执行完成了,那么就打断失败,可以获取到真实结果 如果这时候异步方法还在执行,那么直接打断,最后返回一个默认值

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    return 1;
});
completableFuture.complete(44);
completableFuture.get();           

对计算结果进行处理

thenApply

有输入,也有输出,它会将上一步的输出作为这一步的输入,然后顺序的执行下去,直到最后返回结果,但是如果其中一步出现了异常,那么就直接执行最后的抛出异常步骤exceptionally,不会继续执行剩下的业务逻辑

CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    return 1;
}).thenApply(f -> {
    System.out.println(Thread.currentThread().getName());
    return f + 1;
}).thenApply(f -> {
    System.out.println(Thread.currentThread().getName());
    return f + 1;
}).whenComplete((r,e) -> {
    System.out.println(Thread.currentThread().getName());
    if (e == null) {
        System.out.println(r);
    }
}).exceptionally(e -> {
    e.printStackTrace();
    return null;
});           

handle

handle和thenApply一样,区别就是即使其中某一个步骤出现了异常,程序还是会执行剩下的步骤,还会将异常信息作为参数一起传递到下一个步骤中,e:就是异常信息

CompletableFuture.supplyAsync(() -> {
    return 1;
}).handle((f,e) -> {
    return f + 1;
}).handle((f,e) -> {
    return f + 1;
}).whenComplete((f,e) -> {
    if(e == null) {
        System.out.println("result = " + f);
    }
}).exceptionally(e -> {
    e.printStackTrace();
    return null;
});           

thenAccept

thenAccept与 thenApply和handle不同,它是没有返回结果的,是对计算结果直接进行了消费

CompletableFuture.supplyAsync(() -> {
    return 1;
}).thenApply(f -> {
    return  f + 1;
}).thenApply(f -> {
    return  f + 1;
}).thenAccept(f -> {
    System.out.println("accept = " + f);
});           

总结

  • thenRun:任务A执行完继续执行任务B,并且任务B不需要任务A的返回结果
  • thenApply:任务A执行完继续执行任务B,并且任务B需要任务A的返回结果,同时任务B有返回值
  • thenAccept:任务A执行完继续执行任务B,并且任务B需要任务A的返回结果,但是任务B没有返回值

谁先快就返回谁的结果:applyToEither()

在一些游戏中,我们可以看到就是谁先完成了谁就赢了的游戏,其实这就是一个获取最快的返回结果的问题。applyToEither就是一个谁先快就返回谁的结果

我们让两个线程一个休眠5秒,一个休眠3秒,那么最后就会返回这个休眠3秒的结果

Integer result = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 5;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 3;
}), r -> {
    return r;
}).join();           

三个甚至多个的线程

CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
    return 5;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
    return 3;
}), r -> {
    return r;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    return 2;
}),r -> {
    return r;
}).join();           

对计算结果进行合并

就类似CountDownlatch一样,等所有线程都完成了,然后对所有结果进行合并计算,如果有线程先完成了,那么就只能等着

CompletableFuture.supplyAsync(() -> {
    return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
    return 20;
}),(r1,r2) -> {
    return r1 + r2;
}).join();           

三个甚至多个线程

CompletableFuture.supplyAsync(() -> {
    return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
    return 20;
}),(r1,r2) -> {
    return r1 + r2;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
    return 30;
}),(r1,r2) -> {
    return r1 + r2;
}).join();           

作者:我是小趴菜

链接:https://juejin.cn/post/7256590619413676087

来源:稀土掘金

CompletableFuture案例详解