天天看点

异步&线程池

初始化线程的4种方式:

1.继承Thread

2.实现Runnable接口

3.实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常)

4.线程池

方法1 和 方法2:主进程无法获取线程的运算结果。

方法3 主进程可以获取线程的运算结果,但是不流于控制服务器中的线程资源。可以导致服务器资源耗尽。

方法4:通过如下两种方式初始化线程池

Executors.newFixedThreadPool(3);

new ThreadPoolExecutor(corePoolSize,naximumPoolSize,keepAliveTime,TimeUnit,unit,workQueue,threadFactory,handler);

通过线程池性能稳定,也可以获取执行结果,并捕获一成。但是,在业务复杂情况下,一个异步调用可能会依赖另一个异步调用的执行结果。

线程池:

创建:

Executors

ThreadPoolExecutor executor = new ThreadPoolExecutor();

七大参数:

corePoolSize:核心线程数(一直存在,除非设置了allowCoreThreadTimeOut);线程池,创建好以后就准备就绪的线程数量,就等待接收异步任务去执行。相当于线程池一开始就有 Thread thread = new Thread(); 但是没有开启(start),只要一开启strat就开始执行了。

maximumPoolSize: 最大线程数量;控制资源。

keepAliveTime:存活时间。如果当前线程数量大于core数量(核心数量)。释放空闲的线程,只要空闲大于指定的存活时间。释放的线程指的是,最大的大小减去核心的大小。

unit:时间单位

BlockingQueue<Runnable> workQueue 阻塞队列。如果任务有很多,并发数大于最大线程数,就会将目前多的任务放在队列里面,只要有空闲的线程就会从队列里面取出新的任务继续执行。

threadFactory:线程的创建工厂。

RejectedExcutionHandler:handler:如果队列满了,按照我们指定的拒绝策略,拒绝执行任务。

工作流程:

1、线程池创建,准备好core数量的核心线程,准备接收任务。

2、新的任务进来,用core准备好的空闲线程执行。

2.1)、core满了,就会将再进来的任务放入阻塞队列中。空闲的core就会自己去阻塞队列获取任务执行

2.2)、阻塞队列满了,就直接开新线程执行,最大智能开到max指定的数量

2.3)、max都执行好了。max-core数量的空闲线程会在keepAliveTime指定的时间后自动销毁。最终保持到core大小

2.4)、如果线程数开到了max的数量,还有新的任务进来,就会使用reject指定拒绝策略进行处理

3、所有的线程创建都是由指定的factory创建的。

new LinkedBlockingDeque<>()

默认是Interger的最大值,这样容易导致内存占满,一定要穿入业务定制的数量,要指定数量。

Executors.*defaultThreadFactory*()

默认的线程工厂

new ThreadPoolExecutor.AbortPolicy()

丢弃策略,直接把任务丢掉。

面试:

一个线程池: core 7;max 20; queue :50 100并发进来怎么分配的;

鲜有7个能直接得到执行,接下来50个进入队列,在多开13个继续执行。现在70个被安排上了。剩下的30个默认拒绝策略。如果不想抛弃,还要执行。使用CallerRunPolicy

使用Executores快速创建出线程池:

Executors.*newCachedThreadPool*()

核心线程是0的,所有都可以回收。

Executors.newFixedThreadPool();

固定线程数的线程池,最大线程数和核心线程数相当,都不可以回收。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
           

Executors.newScheduledThreadPool()

定时任务的线程池;

Executors.newScheduledThreadPool()

单线程的线程池;从队列里面获取任务,挨个执行。

开发中为什么使用线程池:

降低资源消耗,通过反复利用已经创建好的线程降低线程的创建和销毁带来的损耗

提高响应速度,因为线程池中的线程数没有超过线程池最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行,

提高线程的可管理性:

线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配。

CompletableFuture 异步编排

业务场景:

查询商品详情页面的逻辑比较复杂,有些数据还需要远程调用没必然需要花费更多的时间。

1、获取sku的基本信息 0.5s

2、获取sku的图片信息 0.5s

3、获取sku的促销信息 1s

4、获取spu的所有销售属性 1s

5、获取规格参数及组下的规格参数 1.5s

6、spu详情 1s

加入商品详情页的查询,需要上面标注的时间才能完成,那么用户需要6.5s后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这6步操作,也许只需要1.5s即可完成响应。

创建异步对象:

两种方式,一种没有返回值

CompletableFuture.runAsync

,一种有返回值

CompletableFuture.*supplyAsync*

// 没返回值
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果" + i);
        }, executor);
           
// 有返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结果" + i);
    return i;
},executor);
Integer integer = future.get();
System.out.println("main ..... end...." + integer);
           

计算完成时回掉方法:

链式调用:

方法成功完成后的处理

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("运行结果" + i);
            return i;
        }, executor).whenComplete((result,exception)->{
            //虽然能得到异常信息,但是没法修改返回数据。
                System.out.println("异步任务成功完成了....结果是:" + result + "异常是:" + exception);
        }).exceptionally(throwable -> {
            //可以感知异常同时返回默认值。
            return 10;
        });
        // R apply(T t)
        Integer integer = future.get();
        System.out.println("main ..... end...." +integer );
           

结果打印:

异步&amp;线程池

handle 方法完成后的感知

// 方法执行完成后的处理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 4;
    System.out.println("运行结果" + i);
    return i;
},executor).handle((res,thr)->{
    if(res!= null){
        return  res * 2;
    }
    if(thr != null){
        return 0;
    }
    return 0;
});
           

执行结果:结果本应该是2,通过handle修改了结果。

异步&amp;线程池

线程串行化方法:

场景: B 任务 必须等待 A任务结束后拿到它的结果才能执行。

三种方式:

1)、 thenRun :不能或得到上一步的执行结果,无返回值

.thenRunAsync(() -> {
           System.out.println("任务2启动了");
        });
           

2)、thenAcceptAsync() 能接收上一步结果,但是无返回值

.thenAcceptAsync(res->{
             System.out.println("任务2启动了" + res);
         });
           

3)、 thenApplyAsync() 既能接收到上一步的结果,有返回值

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 4;
    System.out.println("运行结果" + i);
    return i;
},executor).thenApplyAsync(res -> {
    System.out.println("任务2 启动了 " + res);
    return "hello" + res;
},executor);
// R apply(T t)
//Integer integer = future1.get();
System.out.println("main ..... end...." + future.get());
           
异步&amp;线程池

两任务组合-都要完成

不能感知到前两个的执行结果

future01.runAfterBothAsync(future02,()->{
            System.out.println("任务3开始");
        },executor);
           

可以获取执行结果

future01.thenAcceptBothAsync(future02,(f1,f2)->{
            System.out.println("任务3开始。。之前的结果:" + f1 + "----->" + f2);
        },executor);
           

可以获得结果并且有返回值

CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
            return f1 + ":" + f2 + "--> hahaa";
        }, executor);
           

两个任务执行一个就完成任务3:

不感知结果,自己也无返回值

future01.runAfterEitherAsync(future02,()->{
            System.out.println("任务3开始。。之前的结果:" );
        },executor);
           

感知结果,没有返回值

future01.acceptEitherAsync(future02,(res)->{
            System.out.println("任务3开始。。之前的结果:" + res);
        },executor);
           

自己感知结果,且自己有返回值

CompletableFuture<String> future = future01.applyToEitherAsync(future02, (res) -> {
    System.out.println("任务3开始。。之前的结果:" + res);
    return res + "----> hahah";
},executor);
           

多任务组合:

allof: 等待所有任务完成

anyof:只要有一个任务完成

CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
        allOf.get();//等待所有结果完成
           
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
anyOf.get();//只有有一个结果完成