天天看點

java線程池使用

在Java1.5中提供了一個非常高效實用的多線程包:java.util.concurrent,提供了大量進階工具,可以幫助開發者編寫高效易維護、結構清晰的Java多線程程式。

線程池

之前我們在使用多線程都是用Thread的start()來建立啟動一個線程,但是在實際開發中,如果每個請求到達就建立一個新線程,開銷是相當大的。伺服器在建立和銷毀線程上花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的用請求的時間和資源要多的多。除了建立和銷毀線程的開銷之外,活動的線程也需要消耗系統資源。如果在一個jvm裡建立太多的線程,可能會使系統由于過度消耗記憶體或“切換過度”而導緻系統資源不足。這就引入了線程池概念。

線程池的原理其實就是對多線程的一個管理,為了實作異步機制的一種方法,其實就是多個線程執行多個任務,最終這些線程通過線程池進行管理…不用手動去維護…一次可以處理多個任務,這樣就可以迅速的進行相應…比如說一個網站成為了熱點網站,那麼對于大量的點選量,就必須要對每一次的點選做出迅速的處理,這樣才能達到更好的互動效果…這樣就需要多個線程去處理這些請求,以便能夠更好的提供服務…

在java.util.concurrent包下,提供了一系列與線程池相關的類。合理的使用線程池,可以帶來多個好處:

(1) 降低資源消耗。通過重複利用已建立的線程降低線程建立和銷毀造成的消耗;

(2) 提高響應速度。當任務到達時,任務可以不需要等到線程建立就能立即執行;

(3) 提高線程的可管理性。線程是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的配置設定,調優和監控。

線程池可以應對突然大爆發量的通路,通過有限個固定線程為大量的操作服務,減少建立和銷毀線程所需的時間。

使用線程池:

  • 1、建立線程池
  • 2、建立任務
  • 3、執行任務
  • 4、關閉線程池

建立線程池

一般通過工具類Executors的靜态方法來擷取線程池或靜态方法。介紹四種常用建立方法

ExecutorService service1 = Executors.newSingleThreadExecutor();

說明: 單例線程,表示在任意的時間段内,線程池中隻有一個線程在工作

ExecutorService service2 = Executors.newCacheThreadPool();

說明: 緩存線程池,先檢視線程池中是否有目前執行線程的緩存,如果有就resue(複用),如果沒有,那麼需要建立一個線程來完成目前的調用.并且這類線程池隻能完成一些生存期很短的一些任務.并且這類線程池内部規定能resue(複用)的線程,空閑的時間不能超過60s,一旦超過了60s,就會被移出線程池

ExecutorService service3 = Executors.newFixedThreadPool(10);

說明: 固定型線程池,和newCacheThreadPool()差不多,也能夠實作resue(複用),但是這個池子規定了線程的最大數量,也就是說當池子有空閑時,那麼新的任務将會在空閑線程中被執行,一旦線程池内的線程都在進行工作,那麼新的任務就必須等待線程池有空閑的時候才能夠進入線程池,其他的任務繼續排隊等待.這類池子沒有規定其空閑的時間到底有多長.這一類的池子更适用于伺服器.

ExecutorService service4 = Executors.newScheduledThreadPool(10);

說明: 排程型線程池,排程型線程池會根據Scheduled(任務清單)進行延遲執行,或者是進行周期性的執行.适用于一些周期性的工作.

package com.reapal.brave.main;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by jack-cooper on 2017/2/23.
 */
public class Test {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(new Runnable() {
            @Override
            public void run() {
                while(true){
                    System.out.println("hello world !");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.out.println(" ===> main Thread execute here ! " );
    }
}
           

建立任務

任務分為兩種:一種是有傳回值的( callable ),一種是沒有傳回值的( runnable ). Callable與 Future 兩功能是Java在後續版本中為了适應多并法才加入的,Callable是類似于Runnable的接口,實作Callable接口的類和實作Runnable的類都是可被其他線程執行的任務。

  • 無傳回值的任務就是一個實作了runnable接口的類.使用run方法.
  • 有傳回值的任務是一個實作了callable接口的類.使用call方法.

Callable和Runnable的差別如下:

  • Callable定義的方法是call,而Runnable定義的方法是run。
  • Callable的call方法可以有傳回值,而Runnable的run方法不能有傳回值。
  • Callable的call方法可抛出異常,而Runnable的run方法不能抛出異常。

Future 介紹

Future表示異步計算的結果,它提供了檢查計算是否完成的方法,以等待計算的完成,并檢索計算的結果。Future的cancel方法可以取消任務的執行,它有一布爾參數,參數為 true 表示立即中斷任務的執行,參數為 false 表示允許正在運作的任務運作完成。Future的 get 方法等待計算完成,擷取計算結果。

package com.reapal.brave.main;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableAndFuture {

    public static class  MyCallable  implements Callable{
        private int flag = 0;
        public MyCallable(int flag){
            this.flag = flag;
        }
        public String call() throws Exception{
            if (this.flag == 0){
                return "flag = 0";
            }
            if (this.flag == 1){
                try {
                    while (true) {
                        System.out.println("looping.");
                        Thread.sleep(2000);
                    }
                } catch (InterruptedException e) {
                    System.out.println("Interrupted");
                }
                return "false";
            } else {
                throw new Exception("Bad flag value!");
            }
        }
    }

    public static void main(String[] args) {
        // 定義3個Callable類型的任務
        MyCallable task1 = new MyCallable(0);
        MyCallable task2 = new MyCallable(1);
        MyCallable task3 = new MyCallable(2);
        // 建立一個執行任務的服務
        ExecutorService es = Executors.newFixedThreadPool(3);
        try {
            // 送出并執行任務,任務啟動時傳回了一個Future對象,
            // 如果想得到任務執行的結果或者是異常可對這個Future對象進行操作
            Future future1 = es.submit(task1);
            // 獲得第一個任務的結果,如果調用get方法,目前線程會等待任務執行完畢後才往下執行
            System.out.println("task1: " + future1.get());
            Future future2 = es.submit(task2);
            // 等待5秒後,再停止第二個任務。因為第二個任務進行的是無限循環
            Thread.sleep(5000);
            System.out.println("task2 cancel: " + future2.cancel(true));
            // 擷取第三個任務的輸出,因為執行第三個任務會引起異常
            // 是以下面的語句将引起異常的抛出
            Future future3 = es.submit(task3);
            System.out.println("task3: " + future3.get());
        } catch (Exception e){
            System.out.println(e.toString());
        }
        // 停止任務執行服務
        es.shutdownNow();
    }
}
           

執行任務

通過java.util.concurrent.ExecutorService接口對象來執行任務,該對象有兩個方法可以執行任務execute和submit。execute這種方式送出沒有傳回值,也就不能判斷是否執行成功。submit這種方式它會傳回一個Future對象,通過future的get方法來擷取傳回值,get方法會阻塞住直到任務完成。

execute與submit差別:

  • 接收的參數不一樣
  • submit有傳回值,而execute沒有
  • submit友善Exception處理
  • execute是Executor接口中唯一定義的方法;submit是ExecutorService(該接口繼承Executor)中定義的方法

關閉線程池

線程池使用完畢,需要對其進行關閉,有兩種方法

shutdown()

說明:shutdown并不是直接關閉線程池,而是不再接受新的任務…如果線程池内有任務,那麼把這些任務執行完畢後,關閉線程池

shutdownNow()

說明:這個方法表示不再接受新的任務,并把任務隊列中的任務直接移出掉,如果有正在執行的,嘗試進行停止

綜合使用案例(FutureTask)

import java.util.concurrent.*;
 
/**
 * Author  : Slogen
 * AddTime : 17/6/4
 * Email   : [email protected]
 */
public class CallDemo {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
 
        /**
         * 第一種方式:Future + ExecutorService
         * Task task = new Task();
         * ExecutorService service = Executors.newCachedThreadPool();
         * Future<Integer> future = service.submit(task1);
         * service.shutdown();
         */
 
 
        /**
         * 第二種方式: FutureTask + ExecutorService
         * ExecutorService executor = Executors.newCachedThreadPool();
         * Task task = new Task();
         * FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
         * executor.submit(futureTask);
         * executor.shutdown();
         */
 
        /**
         * 第三種方式:FutureTask + Thread
         */
 
        // 2. 建立FutureTask,需要一個實作了Callable接口的類的執行個體作為構造函數參數
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());
        // 3. 建立Thread對象并啟動
        Thread thread = new Thread(futureTask);
        thread.setName("Task thread");
        thread.start();
 
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");
 
        // 4. 調用isDone()判斷任務是否結束
        if(!futureTask.isDone()) {
            System.out.println("Task is not done");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int result = 0;
        try {
            // 5. 調用get()方法擷取任務結果,如果任務沒有執行完成則阻塞等待
            result = futureTask.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
 
        System.out.println("result is " + result);
 
    }
 
    // 1. 繼承Callable接口,實作call()方法,泛型參數為要傳回的類型
    static class Task  implements Callable<Integer> {
 
        @Override
        public Integer call() throws Exception {
            System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");
            int result = 0;
            for(int i = 0; i < 100;++i) {
                result += i;
            }
 
            Thread.sleep(3000);
            return result;
        }
    }
}
           

綜合使用案例一

需求:從資料庫中擷取url,并利用httpclient循環通路url位址,并對傳回結果進行操作

分析:由于是循環的對多個url進行通路并擷取資料,為了執行的效率,考慮使用多線程,url數量未知如果每個任務都建立一個線程将消耗大量的系統資源,最後決定使用線程池。

public class GetMonitorDataService {
 
    private Logger logger = LoggerFactory.getLogger(GetMonitorDataService.class);
    @Resource
    private MonitorProjectUrlMapper groupUrlMapper;
    @Resource
    private MonitorDetailBatchInsertMapper monitorDetailBatchInsertMapper;
    public void sendData(){
        //調用dao查詢所有url
        MonitorProjectUrlExample example=new MonitorProjectUrlExample();
        List<MonitorProjectUrl> list=groupUrlMapper.selectByExample(example);
        logger.info("此次查詢資料庫中監控url個數為"+list.size());
 
        //擷取系統處理器個數,作為線程池數量
        int nThreads=Runtime.getRuntime().availableProcessors();
 
        //定義一個裝載多線程傳回值的集合
        List<MonitorDetail> result= Collections.synchronizedList(new ArrayList<MonitorDetail>());
        //建立線程池,這裡定義了一個建立線程池的工具類,避免了建立多個線程池,ThreadPoolFactoryUtil可以使用單例模式設計
        ExecutorService executorService = ThreadPoolFactoryUtil.getExecutorService(nThreads);
        //周遊資料庫取出的url
        if(list!=null&&list.size()>0) {
            for (MonitorProjectUrl monitorProjectUrl : list) {
                String url = monitorProjectUrl.getMonitorUrl();
                //建立任務
                ThreadTask threadTask = new ThreadTask(url, result);
                //執行任務
                executorService.execute(threadTask);
                //注意區分shutdownNow
                executorService.shutdown();
                try {//等待直到所有任務完成
                          executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //對資料進行操作
            saveData(result);
        }
    }

           

任務

public class ThreadTask implements Runnable{
    //這裡實作runnable接口
    private String url;
    private List<MonitorDetail> list;
    public ThreadTask(String url,List<MonitorDetail> list){
        this.url=url;
        this.list=list;
    }
    //把擷取的資料進行處理
    @Override
    public void run() {
        MonitorDetail detail = HttpClientUtil.send(url, MonitorDetail.class);
        list.add(detail);
    }

}
           

綜合使用案例二(countDownLatch)

package com.br.lucky.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 10400
 * @create 2018-04-19 20:38
 */
public class FatureTest {

    //1、配置線程池
    private static ExecutorService es = Executors.newFixedThreadPool(20);

    //2、封裝響應Feature
    class BizResult{
        public String orderId;
        public String  data;

        public String getOrderId() {
            return orderId;
        }

        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }

        public String getData() {
            return data;
        }

        public void setData(String data) {
            this.data = data;
        }
    }


    //3、實作Callable接口
    class BizTask implements Callable {

        private String orderId;

        private Object data;

        //可以用其他方式
        private CountDownLatch countDownLatch;

        public BizTask(String orderId, Object data, CountDownLatch countDownLatch) {
            this.orderId = orderId;
            this.data = data;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public Object call() {
            try {
                //todo business
                System.out.println("目前線程Id = " + this.orderId);
                BizResult br = new BizResult();
                br.setOrderId(this.orderId);
                br.setData("some key about your business" + this.getClass());
                return br;
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                //線程結束時,将計時器減一
                countDownLatch.countDown();
            }
            return null;
        }
    }

    /**
     * 業務邏輯入口
     */
    public List<Future> beginBusiness() throws InterruptedException {
        //模拟批量業務資料
        List<String> list = new ArrayList<>();
        for (int i = 0 ; i < 1000 ; i++) {
            list.add(String.valueOf(i));
        }
        //設定計數器
        CountDownLatch countDownLatch = new CountDownLatch(list.size());

        //接收多線程響應結果
        List<Future> resultList = new ArrayList<>();
        //begin thread
        for( int i = 0 ,size = list.size() ; i<size; i++){
            //todo something befor thread
            resultList.add(es.submit(new BizTask(list.get(i), null, countDownLatch)));
        }
        //wait finish
        countDownLatch.await();
        return resultList;
    }

    public static void main(String[] args) throws InterruptedException {
        FatureTest ft = new FatureTest();
            List<Future> futures = ft.beginBusiness();
            System.out.println("futures.size() = " + futures.size());
            //todo some operate
            System.out.println(" ==========================end========================= " );
    }

}


           

綜合使用案例三(future.get())

package com.br.lucky.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 10400
 * @create 2018-04-19 20:38
 */
public class FatureTest {

    //1、配置線程池
    private static ExecutorService es = Executors.newFixedThreadPool(20);

    //2、封裝響應Feature
    class BizResult{
        public String orderId;
        public String  data;

        public String getOrderId() {
            return orderId;
        }

        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }

        public String getData() {
            return data;
        }

        public void setData(String data) {
            this.data = data;
        }
    }


    //3、實作Callable接口
    class BizTask implements Callable {

        private String orderId;

        private Object data;


        public BizTask(String orderId, Object data) {
            this.orderId = orderId;
            this.data = data;
        }

        @Override
        public Object call() {
            try {
                //todo business
                System.out.println("目前線程Id = " + this.orderId);
                BizResult br = new BizResult();
                br.setOrderId(this.orderId);
                br.setData("some key about your business" + this.getClass());
                Thread.sleep(3000);
                return br;
            }catch (Exception e){
                e.printStackTrace();
            }
            return null;
        }
    }

    /**
     * 業務邏輯入口
     */
    public List<Future> beginBusiness() throws InterruptedException, ExecutionException {
        //模拟批量業務資料
        List<String> list = new ArrayList<>();
        for (int i = 0 ; i < 100 ; i++) {
            list.add(String.valueOf(i));
        }

        //接收多線程響應結果
        List<Future> resultList = new ArrayList<>();
        //begin thread
        for( int i = 0 ,size = list.size() ; i<size; i++){
            //todo something befor thread
            Future future = es.submit(new BizTask(list.get(i), null));
            resultList.add(future);
        }

        for (Future f : resultList) {
            f.get();
        }

        System.out.println(" =====多線程執行結束====== ");

        //wait finish
        return resultList;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        FatureTest ft = new FatureTest();
            List<Future> futures = ft.beginBusiness();
            System.out.println("futures.size() = " + futures.size());
            //todo some operate
            System.out.println(" ==========================end========================= " );
    }

}


           
https://yq.aliyun.com/articles/5952 http://www.importnew.com/25286.html