天天看點

Java多線程并行計算

在實際業務開發中如何降低接口響應時間,即如何提高程式的并行計算能力。

本文主要包含如下内容:

1、順序執行

2、線程池+Future

3、使用Java8的CompletableFuture

4、使用Guava的ListenableFuture

1、順序執行

直接上代碼:

package com.c306.test;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

@Slf4j
public class ParallelTest {

    /**
     * 測試方法
     *
     * @return
     */
    private int testMethod() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1;
    }

    /**
     * 順序執行
     */
    private void test01() {
        long start = System.currentTimeMillis();

        List<Integer> list = new ArrayList<>(5);
        list.add(this.testMethod());
        list.add(this.testMethod());
        list.add(this.testMethod());
        list.add(this.testMethod());
        list.add(this.testMethod());

        log.info("costs: {}ms", System.currentTimeMillis() - start);
    }

    @Test
    public void testSequentialExec() {
        this.test01();
        this.test01();
        this.test01();
    }
}
           

多次順序執行耗時結果:

2020-04-19 16:33:43,310+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 5046ms
2020-04-19 16:33:48,329+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 5003ms
2020-04-19 16:33:53,332+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 5003ms
           

2、線程池+Future

順序執行确實很慢,是以我們需要并行執行,即同時調用testMethod()這5個方法。每個方法單獨開啟一個線程異步去執行,全部執行完成輸出結果。

注意:這樣執行有個問題是,每次調用都需要建立5個線程,線程的建立和銷毀都是需要開銷的,是以我們使用池化技術

直接上代碼:

package com.c306.test;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

@Slf4j
public class ParallelTest {

    /**
     * IO密集型建議:2*CPU,因為IO密集型線程不是一直在運作,是以可以配置多一點;
     * CPU密集型建議:因為一直在使用CPU,是以要保證線程數不能太多,可以CPU數+1;
     */
    private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);

    /**
     * 測試方法
     *
     * @return
     */
    private int testMethod() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1;
    }

    /**
     * 線程池+Future
     */
    private void test02() {
        long start = System.currentTimeMillis();
        Future<Integer> test1 = executor.submit(this::testMethod);
        Future<Integer> test2 = executor.submit(this::testMethod);
        Future<Integer> test3 = executor.submit(this::testMethod);
        Future<Integer> test4 = executor.submit(this::testMethod);
        Future<Integer> test5 = executor.submit(this::testMethod);
        List<Future<Integer>> futureList = Arrays.asList(test1, test2, test3, test4, test5);

        List<Integer> list = futureList.stream().map(future -> {
            try {
                return future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            return null;
        }).collect(Collectors.toList());
        log.info("costs: {}ms", System.currentTimeMillis() - start);
    }

    @Test
    public void testFutureExec() {
        this.test02();
        this.test02();
        this.test02();
    }
}
           

多次并行執行耗時結果:

2020-04-19 16:53:38,983+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 1208ms
2020-04-19 16:53:40,002+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 1000ms
2020-04-19 16:53:41,002+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 1000ms
           

效果很明顯,直接相當于一個方法的調用耗時,這種通過線程池+Future并行計算的方式,直接可以把接口性能提高上去了。

3、使用Java8的CompletableFuture

Future是java.util.concurrent并發包中的接口類,用來表示一個線程異步執行後的結果,核心方法包含:

Future.get() 阻塞調用線程,直到計算結果傳回

Future.isDone() 判斷線程是否執行完畢

Future.cancel() 取消目前線程的執行

Future.get()是阻塞調用的,想要拿到線程執行的結果,必須是Future.get()阻塞或者while(Future.isDone())輪詢方式調用。這種方式叫“主動拉(pull)”,現在流行響應式程式設計,即“主動推(push)”的方式,當線程執行完了,你告訴我就可以了。

直接上代碼:

package com.c306.test;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

@Slf4j
public class ParallelTest {

    /**
     * IO密集型建議:2*CPU,因為IO密集型線程不是一直在運作,是以可以配置多一點;
     * CPU密集型建議:因為一直在使用CPU,是以要保證線程數不能太多,可以CPU數+1;
     */
    private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);

    /**
     * 測試方法
     *
     * @return
     */
    private int testMethod() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1;
    }

    /**
     * 使用Java8的CompletableFuture
     */
    private void test03() {
        long start = System.currentTimeMillis();
        List<CompletableFuture<Integer>> futureList = new ArrayList<>(5);
        futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor));
        futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor));
        futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor));
        futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor));
        futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor));

        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[5]));
        CompletableFuture<List<Integer>> resultList = allFuture.thenApplyAsync(value ->
                        futureList.stream().map(CompletableFuture::join).collect(Collectors.toList()),
                executor
        );
        List<Integer> list = resultList.join();

        log.info("costs: {}ms", System.currentTimeMillis() - start);
    }

    @Test
    public void testCompletableFutureExec() {
        this.test03();
        this.test03();
        this.test03();
    }
}
           

可以看到實作方式和Future并沒有什麼不同,但是CompletableFuture提供了很多友善的方法,比如代碼中的allOf,thenApplyAsync,可以将多個CompletableFuture組合成一個CompletableFuture,再調用join方法阻塞拿到結果

多次并行執行耗時結果:

2020-04-19 17:08:17,800+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 1212ms
2020-04-19 17:08:18,844+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 1011ms
2020-04-19 17:08:19,847+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 1002ms
           

CompletableFuture類中有很多方法可以供大家使用,不像Future隻要那麼幾個方法可以使用,這也是Java自有庫對Future的一個增強。這裡隻是簡單展示了CompletableFuture的一種用法,實際開發中需要根據不同的場景去選擇使用不同的方法。

4、使用Guava的ListenableFuture

谷歌開源的Guava中的ListenableFuture接口對java自帶的Future接口做了進一步擴充和封裝,并且提供了靜态工具類Futures。

直接上代碼:

package com.qx.test;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

@Slf4j
public class ParallelTest {

    /**
     * IO密集型建議:2*CPU,因為IO密集型線程不是一直在運作,是以可以配置多一點;
     * CPU密集型建議:因為一直在使用CPU,是以要保證線程數不能太多,可以CPU數+1;
     */
    private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);

    /**
     * 測試方法
     *
     * @return
     */
    private int testMethod() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1;
    }

    /**
     * 使用Guava的ListenableFuture
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private void test04() throws ExecutionException, InterruptedException {
        // guava需要包裝一下線程池
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executor);
        long start = System.currentTimeMillis();
        ListenableFuture<Integer> test1 = listeningExecutorService.submit(this::testMethod);
        ListenableFuture<Integer> test2 = listeningExecutorService.submit(this::testMethod);
        ListenableFuture<Integer> test3 = listeningExecutorService.submit(this::testMethod);
        ListenableFuture<Integer> test4 = listeningExecutorService.submit(this::testMethod);
        ListenableFuture<Integer> test5 = listeningExecutorService.submit(this::testMethod);

        ListenableFuture<List<Integer>> listListenableFuture = Futures.allAsList(test1, test2, test3, test4, test5);
        List<Integer> list = listListenableFuture.get();

        log.info("costs: {}ms", System.currentTimeMillis() - start);
    }

    @Test
    public void testListenableFutureExec() throws ExecutionException, InterruptedException {
        this.test04();
        this.test04();
        this.test04();
    }
}
           

多次并行執行耗時結果:

2020-04-19 17:21:11,919+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 1203ms
2020-04-19 17:21:12,944+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 1002ms
2020-04-19 17:21:13,945+0800 INFO  [main]  com.qx.test.ParallelTest - costs: 1001ms
           

總結:以上就是如何讓接口并行計算的三種實作方式,屬于日常開發中比較常用的代碼優化技巧。這裡沒有做過多的說明和比較,需要大家查閱更多的相關源碼和資料。