在Java1.5中提供了一個非常高效實用的多線程包:java.util.concurrent,提供了大量進階工具,可以幫助開發者編寫高效易維護、結構清晰的Java多線程程式。
線程池
之前我們在使用多線程都是用Thread的start()來建立啟動一個線程,但是在實際開發中,如果每個請求到達就建立一個新線程,開銷是相當大的。伺服器在建立和銷毀線程上花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的用請求的時間和資源要多的多。除了建立和銷毀線程的開銷之外,活動的線程也需要消耗系統資源。如果在一個jvm裡建立太多的線程,可能會使系統由于過度消耗記憶體或“切換過度”而導緻系統資源不足。這就引入了線程池概念。
線程池的原理其實就是對多線程的一個管理,為了實作異步機制的一種方法,其實就是多個線程執行多個任務,最終這些線程通過線程池進行管理…不用手動去維護…一次可以處理多個任務,這樣就可以迅速的進行相應…比如說一個網站成為了熱點網站,那麼對于大量的點選量,就必須要對每一次的點選做出迅速的處理,這樣才能達到更好的互動效果…這樣就需要多個線程去處理這些請求,以便能夠更好的提供服務…
在java.util.concurrent包下,提供了一系列與線程池相關的類。合理的使用線程池,可以帶來多個好處:
(1) 降低資源消耗。通過重複利用已建立的線程降低線程建立和銷毀造成的消耗;
(2) 提高響應速度。當任務到達時,任務可以不需要等到線程建立就能立即執行;
(3) 提高線程的可管理性。線程是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的配置設定,調優和監控。
線程池可以應對突然大爆發量的通路,通過有限個固定線程為大量的操作服務,減少建立和銷毀線程所需的時間。
使用線程池:
- 1、建立線程池
- 2、建立任務
- 3、執行任務
- 4、關閉線程池
建立線程池
一般通過工具類Executors的靜态方法來擷取線程池或靜态方法。介紹四種常用建立方法
ExecutorService service1 = Executors.newSingleThreadExecutor();
說明: 單例線程,表示在任意的時間段内,線程池中隻有一個線程在工作
ExecutorService service2 = Executors.newCacheThreadPool();
說明: 緩存線程池,先檢視線程池中是否有目前執行線程的緩存,如果有就resue(複用),如果沒有,那麼需要建立一個線程來完成目前的調用.并且這類線程池隻能完成一些生存期很短的一些任務.并且這類線程池内部規定能resue(複用)的線程,空閑的時間不能超過60s,一旦超過了60s,就會被移出線程池
ExecutorService service3 = Executors.newFixedThreadPool(10);
說明: 固定型線程池,和newCacheThreadPool()差不多,也能夠實作resue(複用),但是這個池子規定了線程的最大數量,也就是說當池子有空閑時,那麼新的任務将會在空閑線程中被執行,一旦線程池内的線程都在進行工作,那麼新的任務就必須等待線程池有空閑的時候才能夠進入線程池,其他的任務繼續排隊等待.這類池子沒有規定其空閑的時間到底有多長.這一類的池子更适用于伺服器.
ExecutorService service4 = Executors.newScheduledThreadPool(10);
說明: 排程型線程池,排程型線程池會根據Scheduled(任務清單)進行延遲執行,或者是進行周期性的執行.适用于一些周期性的工作.
package com.reapal.brave.main;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by jack-cooper on 2017/2/23.
*/
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new Runnable() {
@Override
public void run() {
while(true){
System.out.println("hello world !");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
System.out.println(" ===> main Thread execute here ! " );
}
}
建立任務
任務分為兩種:一種是有傳回值的( callable ),一種是沒有傳回值的( runnable ). Callable與 Future 兩功能是Java在後續版本中為了适應多并法才加入的,Callable是類似于Runnable的接口,實作Callable接口的類和實作Runnable的類都是可被其他線程執行的任務。
- 無傳回值的任務就是一個實作了runnable接口的類.使用run方法.
- 有傳回值的任務是一個實作了callable接口的類.使用call方法.
Callable和Runnable的差別如下:
- Callable定義的方法是call,而Runnable定義的方法是run。
- Callable的call方法可以有傳回值,而Runnable的run方法不能有傳回值。
- Callable的call方法可抛出異常,而Runnable的run方法不能抛出異常。
Future 介紹
Future表示異步計算的結果,它提供了檢查計算是否完成的方法,以等待計算的完成,并檢索計算的結果。Future的cancel方法可以取消任務的執行,它有一布爾參數,參數為 true 表示立即中斷任務的執行,參數為 false 表示允許正在運作的任務運作完成。Future的 get 方法等待計算完成,擷取計算結果。
package com.reapal.brave.main;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableAndFuture {
public static class MyCallable implements Callable{
private int flag = 0;
public MyCallable(int flag){
this.flag = flag;
}
public String call() throws Exception{
if (this.flag == 0){
return "flag = 0";
}
if (this.flag == 1){
try {
while (true) {
System.out.println("looping.");
Thread.sleep(2000);
}
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
return "false";
} else {
throw new Exception("Bad flag value!");
}
}
}
public static void main(String[] args) {
// 定義3個Callable類型的任務
MyCallable task1 = new MyCallable(0);
MyCallable task2 = new MyCallable(1);
MyCallable task3 = new MyCallable(2);
// 建立一個執行任務的服務
ExecutorService es = Executors.newFixedThreadPool(3);
try {
// 送出并執行任務,任務啟動時傳回了一個Future對象,
// 如果想得到任務執行的結果或者是異常可對這個Future對象進行操作
Future future1 = es.submit(task1);
// 獲得第一個任務的結果,如果調用get方法,目前線程會等待任務執行完畢後才往下執行
System.out.println("task1: " + future1.get());
Future future2 = es.submit(task2);
// 等待5秒後,再停止第二個任務。因為第二個任務進行的是無限循環
Thread.sleep(5000);
System.out.println("task2 cancel: " + future2.cancel(true));
// 擷取第三個任務的輸出,因為執行第三個任務會引起異常
// 是以下面的語句将引起異常的抛出
Future future3 = es.submit(task3);
System.out.println("task3: " + future3.get());
} catch (Exception e){
System.out.println(e.toString());
}
// 停止任務執行服務
es.shutdownNow();
}
}
執行任務
通過java.util.concurrent.ExecutorService接口對象來執行任務,該對象有兩個方法可以執行任務execute和submit。execute這種方式送出沒有傳回值,也就不能判斷是否執行成功。submit這種方式它會傳回一個Future對象,通過future的get方法來擷取傳回值,get方法會阻塞住直到任務完成。
execute與submit差別:
- 接收的參數不一樣
- submit有傳回值,而execute沒有
- submit友善Exception處理
- execute是Executor接口中唯一定義的方法;submit是ExecutorService(該接口繼承Executor)中定義的方法
關閉線程池
線程池使用完畢,需要對其進行關閉,有兩種方法
shutdown()
說明:shutdown并不是直接關閉線程池,而是不再接受新的任務…如果線程池内有任務,那麼把這些任務執行完畢後,關閉線程池
shutdownNow()
說明:這個方法表示不再接受新的任務,并把任務隊列中的任務直接移出掉,如果有正在執行的,嘗試進行停止
綜合使用案例(FutureTask)
import java.util.concurrent.*;
/**
* Author : Slogen
* AddTime : 17/6/4
* Email : [email protected]
*/
public class CallDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 第一種方式:Future + ExecutorService
* Task task = new Task();
* ExecutorService service = Executors.newCachedThreadPool();
* Future<Integer> future = service.submit(task1);
* service.shutdown();
*/
/**
* 第二種方式: FutureTask + ExecutorService
* ExecutorService executor = Executors.newCachedThreadPool();
* Task task = new Task();
* FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
* executor.submit(futureTask);
* executor.shutdown();
*/
/**
* 第三種方式:FutureTask + Thread
*/
// 2. 建立FutureTask,需要一個實作了Callable接口的類的執行個體作為構造函數參數
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());
// 3. 建立Thread對象并啟動
Thread thread = new Thread(futureTask);
thread.setName("Task thread");
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");
// 4. 調用isDone()判斷任務是否結束
if(!futureTask.isDone()) {
System.out.println("Task is not done");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int result = 0;
try {
// 5. 調用get()方法擷取任務結果,如果任務沒有執行完成則阻塞等待
result = futureTask.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("result is " + result);
}
// 1. 繼承Callable接口,實作call()方法,泛型參數為要傳回的類型
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");
int result = 0;
for(int i = 0; i < 100;++i) {
result += i;
}
Thread.sleep(3000);
return result;
}
}
}
綜合使用案例一
需求:從資料庫中擷取url,并利用httpclient循環通路url位址,并對傳回結果進行操作
分析:由于是循環的對多個url進行通路并擷取資料,為了執行的效率,考慮使用多線程,url數量未知如果每個任務都建立一個線程将消耗大量的系統資源,最後決定使用線程池。
public class GetMonitorDataService {
private Logger logger = LoggerFactory.getLogger(GetMonitorDataService.class);
@Resource
private MonitorProjectUrlMapper groupUrlMapper;
@Resource
private MonitorDetailBatchInsertMapper monitorDetailBatchInsertMapper;
public void sendData(){
//調用dao查詢所有url
MonitorProjectUrlExample example=new MonitorProjectUrlExample();
List<MonitorProjectUrl> list=groupUrlMapper.selectByExample(example);
logger.info("此次查詢資料庫中監控url個數為"+list.size());
//擷取系統處理器個數,作為線程池數量
int nThreads=Runtime.getRuntime().availableProcessors();
//定義一個裝載多線程傳回值的集合
List<MonitorDetail> result= Collections.synchronizedList(new ArrayList<MonitorDetail>());
//建立線程池,這裡定義了一個建立線程池的工具類,避免了建立多個線程池,ThreadPoolFactoryUtil可以使用單例模式設計
ExecutorService executorService = ThreadPoolFactoryUtil.getExecutorService(nThreads);
//周遊資料庫取出的url
if(list!=null&&list.size()>0) {
for (MonitorProjectUrl monitorProjectUrl : list) {
String url = monitorProjectUrl.getMonitorUrl();
//建立任務
ThreadTask threadTask = new ThreadTask(url, result);
//執行任務
executorService.execute(threadTask);
//注意區分shutdownNow
executorService.shutdown();
try {//等待直到所有任務完成
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//對資料進行操作
saveData(result);
}
}
任務
public class ThreadTask implements Runnable{
//這裡實作runnable接口
private String url;
private List<MonitorDetail> list;
public ThreadTask(String url,List<MonitorDetail> list){
this.url=url;
this.list=list;
}
//把擷取的資料進行處理
@Override
public void run() {
MonitorDetail detail = HttpClientUtil.send(url, MonitorDetail.class);
list.add(detail);
}
}
綜合使用案例二(countDownLatch)
package com.br.lucky.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author 10400
* @create 2018-04-19 20:38
*/
public class FatureTest {
//1、配置線程池
private static ExecutorService es = Executors.newFixedThreadPool(20);
//2、封裝響應Feature
class BizResult{
public String orderId;
public String data;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
//3、實作Callable接口
class BizTask implements Callable {
private String orderId;
private Object data;
//可以用其他方式
private CountDownLatch countDownLatch;
public BizTask(String orderId, Object data, CountDownLatch countDownLatch) {
this.orderId = orderId;
this.data = data;
this.countDownLatch = countDownLatch;
}
@Override
public Object call() {
try {
//todo business
System.out.println("目前線程Id = " + this.orderId);
BizResult br = new BizResult();
br.setOrderId(this.orderId);
br.setData("some key about your business" + this.getClass());
return br;
}catch (Exception e){
e.printStackTrace();
}finally {
//線程結束時,将計時器減一
countDownLatch.countDown();
}
return null;
}
}
/**
* 業務邏輯入口
*/
public List<Future> beginBusiness() throws InterruptedException {
//模拟批量業務資料
List<String> list = new ArrayList<>();
for (int i = 0 ; i < 1000 ; i++) {
list.add(String.valueOf(i));
}
//設定計數器
CountDownLatch countDownLatch = new CountDownLatch(list.size());
//接收多線程響應結果
List<Future> resultList = new ArrayList<>();
//begin thread
for( int i = 0 ,size = list.size() ; i<size; i++){
//todo something befor thread
resultList.add(es.submit(new BizTask(list.get(i), null, countDownLatch)));
}
//wait finish
countDownLatch.await();
return resultList;
}
public static void main(String[] args) throws InterruptedException {
FatureTest ft = new FatureTest();
List<Future> futures = ft.beginBusiness();
System.out.println("futures.size() = " + futures.size());
//todo some operate
System.out.println(" ==========================end========================= " );
}
}
綜合使用案例三(future.get())
package com.br.lucky.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author 10400
* @create 2018-04-19 20:38
*/
public class FatureTest {
//1、配置線程池
private static ExecutorService es = Executors.newFixedThreadPool(20);
//2、封裝響應Feature
class BizResult{
public String orderId;
public String data;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
//3、實作Callable接口
class BizTask implements Callable {
private String orderId;
private Object data;
public BizTask(String orderId, Object data) {
this.orderId = orderId;
this.data = data;
}
@Override
public Object call() {
try {
//todo business
System.out.println("目前線程Id = " + this.orderId);
BizResult br = new BizResult();
br.setOrderId(this.orderId);
br.setData("some key about your business" + this.getClass());
Thread.sleep(3000);
return br;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
}
/**
* 業務邏輯入口
*/
public List<Future> beginBusiness() throws InterruptedException, ExecutionException {
//模拟批量業務資料
List<String> list = new ArrayList<>();
for (int i = 0 ; i < 100 ; i++) {
list.add(String.valueOf(i));
}
//接收多線程響應結果
List<Future> resultList = new ArrayList<>();
//begin thread
for( int i = 0 ,size = list.size() ; i<size; i++){
//todo something befor thread
Future future = es.submit(new BizTask(list.get(i), null));
resultList.add(future);
}
for (Future f : resultList) {
f.get();
}
System.out.println(" =====多線程執行結束====== ");
//wait finish
return resultList;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
FatureTest ft = new FatureTest();
List<Future> futures = ft.beginBusiness();
System.out.println("futures.size() = " + futures.size());
//todo some operate
System.out.println(" ==========================end========================= " );
}
}
https://yq.aliyun.com/articles/5952 http://www.importnew.com/25286.html