package com.cdkj.project.syn.common;
import org.apache.commons.compress.utils.Lists;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class AcquireResultTest {
@Resource(name="threadPoolTaskExecutor")
ThreadPoolTaskExecutor executor;
public static void main(String[] args) throws Exception {
//getNow方法测试
/* CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
System.out.println("测试函数2");
try {
Thread.sleep(10 );
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("测试函数");
return "hello world";
});*/
/**
* 立即获取结果不阻塞,结果计算已完成将返回结果或计算过程中的异常,如果未计算完成将返回设定的valueIfAbsent值
* cp1.getNow 的调用与方法执行没有关系
*/
/* System.out.println(cp1.getNow("0"));
if("0".equals(cp1.getNow("0"))){
Thread.sleep(100 );
System.out.println(cp1.getNow("02"));
}*/
//join方法测试 方法里不会抛出异常
// CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((()-> 1 / 0));
// System.out.println(cp2.join());
//get方法测试 get()和get(long timeout, TimeUnit unit) => 在Future中就已经提供了,后者提供超时处理,如果在指定时间内未获取结果将抛出超时异常
// CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((()-> 1 / 0));
// System.out.println(cp3.get());
List<Consumer<String>> consumers = Lists.newArrayList();
// 验证allof/join 方案
Consumer<String> consumer = value -> {
try {
System.out.println(Thread.currentThread().getName()+value + "测试allof");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+value + "测试allof444");
};
consumers.add(consumer);
Consumer<String> consumer2 = value -> {
try {
System.out.println(Thread.currentThread().getName()+value + "测试allof22");
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+value + "测试allof44422");
};
consumers.add(consumer2);
Consumer<String> consumer3 = value -> {
try {
System.out.println(Thread.currentThread().getName()+value + "测试allof33");
Thread.sleep(20000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+value + "测试allof44433");
};
consumers.add(consumer3);
System.out.println("*****开始******");
consumers.stream().map(a ->CompletableFuture.runAsync(()->a.accept("hahahha"))).map(CompletableFuture::join).collect(Collectors.toList());
System.out.println("*****结束******");
}
}
import java.util.Random;
import java.util.concurrent.CompletableFuture;
/**allOf方法没有返回值,适合没有返回值并且需要前面所有任务执行完毕才能执行后续任务的应用场景
* 适合前边的执行完成之后在执行后面的
*/
public class AllOfTest {
private static Random random = new Random();
public static void main(String[] args) throws Exception {
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello");
return null;
});
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
randomSleep();
System.out.println("world"); return null;
});
CompletableFuture<Void> future3 = CompletableFuture.supplyAsync(() -> {
randomSleep();
System.out.println("world2"); return null;
});
//allOf方法没有返回值,适合没有返回值并且需要前面所有任务执行完毕才能执行后续任务的应用场景
CompletableFuture<Void> result = CompletableFuture.allOf(future1, future2,future3);
// 起到一个阻塞作用,所有前程都执行完成之后再返回
System.out.println(result.get());
randomSleep();
System.out.println("hahah");
}
private static void randomSleep() {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 两个线程都会将结果打印出来,但是get方法只会返回最先完成任务的结果。
* 该方法比较适合只要有一个返回值就可以继续执行其他任务的应用场景
*/
public class AnyOfTest {
private static Random random = new Random();
public static void main(String[] args) throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
randomSleep();
System.out.println("hello");
return "hello";});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
randomSleep();
System.out.println("world");
return "world";
});
CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
System.out.println(result.get());
}
private static void randomSleep() {
try {
Thread.sleep(random.nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* thenApply也可以归类为对结果的处理,thenAccept和thenApply的区别就是没有返回值
* 提供了三个方法:
*/
public class ThenAcceptTest { //无返回值
public static void main(String[] args) {
CompletableFuture.supplyAsync(ThenAcceptTest::getList).thenAccept(strList -> strList.stream()
.forEach(m -> System.out.println(m)));
}
public static List<String> getList() {
return Arrays.asList("a", "b", "c");
}
}
/** 又返回值
* 将前面计算结果的的CompletableFuture传递给thenApply,
* 返回thenApply处理后的结果。
* 可以认为通过thenApply方法实现CompletableFuture<T>至CompletableFuture<U>的转换。
* 白话一点就是将CompletableFuture的计算结果作为thenApply方法的参数,返回thenApply方法处理后的结果
*/
public class ThenApplyTest {
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenApplyTest::randomInteger).thenApply((i) -> i * 8);
System.out.println(result.get());
}
public static Integer randomInteger() {
return 10;
}
}
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
/**
* thenCombine方法将两个无关的CompletableFuture组合起来,
* 第二个Completable并不依赖第一个Completable的结果
*/
public class ThenCombineTest {
private static Random random = new Random();
public static void main(String[] args) throws Exception {
/**
* 将两个线程的结果组合起来
*/
/* CompletableFuture<Map<String,Object>> result = CompletableFuture.supplyAsync(()->{
Map<String,Object> map=new HashMap<>();
map.put("sign","sign");
return map;
}).thenCombine(
CompletableFuture.supplyAsync(()->{
Map<String,Object> map=new HashMap<>();
map.put("study","study");
return map;
}), (i, j) -> {
return new HashMap(){{
put("sign",i);
put("study",j);
}};
}
);
*/
/* CompletableFuture<Map<String,Object>> result = CompletableFuture.supplyAsync(()->{
Map<String,Object> map=new HashMap<>();
map.put("sign","sign");
return map;
}).thenCombine(
CompletableFuture.supplyAsync(()->{
Map<String,Object> map=new HashMap<>();
map.put("study","study");
return map;
}), (i, j) -> {
return new HashMap(){{
put("sign",i);
put("study",j);
}};
}
);*/
//System.out.println(result.get());
}
public static Integer randomInteger() {
return random.nextInt(100);
}
}
/**
* thenCompose方法可以将两个异步操作进行流水操作
*/
public class ThenComposeTest {
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenComposeTest::getInteger)
.thenCompose(i -> CompletableFuture.supplyAsync(() -> i * 10));
System.out.println(result.get());
}
private static int getInteger() {
return 666;
}
private static int expandValue(int num) {
return num * 10;
}
}
import java.util.concurrent.CompletableFuture;
public class WhenCompleteTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (Exception e) {
}
return "hello";
}
);
// 对前面计算结果进行处理,不返回新值
CompletableFuture<String> cf2 = cf1.whenComplete((v, e) -> {
v = v + "111";
System.out.println(String.format("value:%s, exception:%s", v, e));
return;
}
);
System.out.println(cf2.join());
}
}