天天看點

多線程之Callable/Future擷取多個任務結果并進行彙總

許多不能立即獲得計算結果的延遲計算,都可以使用Callable/Future這種形式。例如資料庫查詢等。Callable是對任務的一種抽象,通過在Executor中包含封裝的Callable任務來執行,将結果封裝為Future。Future提供了相應的方法判斷是否已經完成,以及擷取任務的結果(future.get())等友善進行操作。

Get方法的行為取決于任務的狀态(尚未開始、正在運作、已完成)。如果任務已經完成,那麼get會立即傳回或者抛出一個Exception,如果任務沒有完成,那麼get将阻塞并直到任務完成。如果任務抛出了異常,那麼get将該異常封裝為ExecutionException并重新抛出。如果任務被取消,那麼get将抛出CanclellationException。如果get抛出了ExecutionException,那麼可以通過getCause來獲得被封裝的初始異常。

尚一章的例子,如果用這個方法寫将友善許多,現嘗試如下:

package com.cc.mutilineExample.callableAndFuture.test3_future_every;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @ClassName: FutureResponseTest 
 * @Description: 根據交易請求的額度和方式,進行扣款彙總計算
 * 計算交易一共扣款的數額:
 * 		利用Callable和Future來計算,future.get()會擷取任務結果,如果沒有擷取到會一直阻塞直到任務結束。
 *
 * 想優化一下,即每一個都一個future  這樣會更快
 * 
 * 這樣雖可行,但是卻有些繁瑣。還有一種更好的辦法:CompletionService
 * 等理論學習後,繼續寫一個例子
 * @author CC  
 * @date 2018年12月6日 上午10:44:20 
 * @version V1.0 
 */
public class FutureResponseTest {
	//線程池
	private final static ExecutorService executor = Executors.newCachedThreadPool();

	//模拟第三方服務
	public static double requestForService(Request request) throws InterruptedException, Exception{
		if(null == request) {
			throw new Exception("請求為空!");
		}
		if(request.getParam() <= 0) {
			throw new Exception("參數小于0,無法進行扣款!" + request);
		}
		
		System.out.println("開始處理請求...");
		
		//為了簡便直接傳回一個結果即可
		double result = 0.0;
		if("WeiXin".equals(request.getMethod())) {
			System.out.println("微信支付扣3%");
//			result = request.getParam() * 0.03;//double類型計算結果不準确  例如17 * 0.05 傳回  扣款數  0.8500000000000001
			result = new BigDecimal(String.valueOf(request.getParam())).multiply(new BigDecimal("0.03")).doubleValue();
		}else {
			System.out.println("其他支付直接扣5%");
			result = new BigDecimal(String.valueOf(request.getParam())).multiply(new BigDecimal("0.05")).doubleValue();
		}
		
		//模拟-使消耗時間長一些
		Thread.sleep(3000);
		System.out.println(request + " 傳回扣款結果:" + result);
		return result;
	}
	
	
	//排程請求,獲得傳回結果,并進行彙總處理
	public static void main(String[] args) throws Exception {
		final String[] methodStr = new String[] {"WeiXin","ZhiFuBao","WangYin"};
		final String[] serviceStr = new String[] {"TaoBao","JingDong","TianMao"};
		
		//為了友善,我們将請求先初始化完畢
		final List<Request> requestList = new ArrayList<Request>();
		for (int i = 0; i < 20; i++) {
			Request request = new Request();
			request.setMethod(methodStr[(int) (Math.random() * 3)]);
			request.setParam((int) (Math.random() * 300));
			request.setServieName(serviceStr[(int) (Math.random() * 3)]);
			requestList.add(request);
		}
		
		long startTime = System.currentTimeMillis();//開始時間
		
		//累積計算所有請求的總扣款數--計算任務提前開始且每個都是分開的不互相影響
		List<Future<Double>> futureList = new ArrayList<Future<Double>>();
		for (int i = 0; i < requestList.size(); i++) {
			Request request = requestList.get(i);
			Callable<Double> task = new Callable<Double>() {
				@Override
				public Double call() throws Exception {
					return  requestForService(request);
				}
			};
			Future<Double> future = executor.submit(task);
			futureList.add(future);
		}

		try {
			
			BigDecimal sum = BigDecimal.ZERO;//同理  double計算結果不精确
			
			//方法get具有“狀态依賴”的内在特性,因而調用者不需要知道任務的狀态,此外在任務送出和獲得
			//結果中包含的安全釋出屬性也確定了這個方法是線程安全的。
			//獲得結果
			for (int i = 0; i < futureList.size(); i++) {
				//送出任務請求
				double payMent = futureList.get(i).get();
				sum = sum.add(new BigDecimal(String.valueOf(payMent)));
			}
			System.out.println("一共扣款了多少錢?" + sum.doubleValue());
		} catch (InterruptedException e) {
			// TODO: 任務調用get的線程在獲得結果之前被中斷
			//重新設定線程的中斷狀态
			Thread.currentThread().interrupt();
			System.out.println("任務調用get的線程在獲得結果之前被中斷!" + e);
		} catch (ExecutionException e) {
			throw launderThrowable(e.getCause());
		} finally {
			executor.shutdown();
		}
		
		
		long endTime = System.currentTimeMillis();//結束時間
		System.out.println("消耗時間:" + (endTime - startTime) + "毫秒!");
	}



	/**
	 * @Title: launderThrowable 
	 * @Description: 任務執行過程中遇到異常,根據包裝的ExecutionException重新抛出異常,并列印異常資訊
	 * @param @param cause
	 * @param @return   
	 * @return Exception  
	 * @throws
	 * @author CC
	 * @date 2018年12月7日 上午9:36:27
	 * @version V1.0
	 */
	private static Exception launderThrowable(Throwable cause) {
		//抛出
		Exception exception = new Exception("任務執行過程中遇到異常!" + cause);
		//列印
		cause.printStackTrace();
		return exception;
	}
}

           

運作結果:

=正常運作結果========

開始處理請求…

其他支付直接扣5%

開始處理請求…

微信支付扣3%

開始處理請求…

其他支付直接扣5%

開始處理請求…

微信支付扣3%

開始處理請求…

其他支付直接扣5%

開始處理請求…

微信支付扣3%

開始處理請求…

其他支付直接扣5%

開始處理請求…

其他支付直接扣5%

開始處理請求…

其他支付直接扣5%

開始處理請求…

開始處理請求…

微信支付扣3%

開始處理請求…

其他支付直接扣5%

其他支付直接扣5%

開始處理請求…

微信支付扣3%

開始處理請求…

其他支付直接扣5%

開始處理請求…

其他支付直接扣5%

開始處理請求…

其他支付直接扣5%

開始處理請求…

其他支付直接扣5%

開始處理請求…

微信支付扣3%

開始處理請求…

微信支付扣3%

開始處理請求…

其他支付直接扣5%

Request [method=ZhiFuBao, servieName=TaoBao, param=40] 傳回扣款結果:2.0

Request [method=ZhiFuBao, servieName=JingDong, param=82] 傳回扣款結果:4.1

Request [method=WeiXin, servieName=TaoBao, param=243] 傳回扣款結果:7.29

Request [method=ZhiFuBao, servieName=JingDong, param=29] 傳回扣款結果:1.45

Request [method=WangYin, servieName=TianMao, param=97] 傳回扣款結果:4.85

Request [method=WeiXin, servieName=TaoBao, param=200] 傳回扣款結果:6.0

Request [method=ZhiFuBao, servieName=TaoBao, param=240] 傳回扣款結果:12.0

Request [method=ZhiFuBao, servieName=TaoBao, param=40] 傳回扣款結果:2.0

Request [method=WangYin, servieName=TaoBao, param=102] 傳回扣款結果:5.1

Request [method=WeiXin, servieName=JingDong, param=140] 傳回扣款結果:4.2

Request [method=ZhiFuBao, servieName=TaoBao, param=131] 傳回扣款結果:6.55

Request [method=ZhiFuBao, servieName=JingDong, param=219] 傳回扣款結果:10.95

Request [method=WeiXin, servieName=TianMao, param=233] 傳回扣款結果:6.99

Request [method=WeiXin, servieName=JingDong, param=203] 傳回扣款結果:6.09

Request [method=WeiXin, servieName=TaoBao, param=120] 傳回扣款結果:3.6

Request [method=ZhiFuBao, servieName=TaoBao, param=139] 傳回扣款結果:6.95

Request [method=WangYin, servieName=TaoBao, param=244] 傳回扣款結果:12.2

Request [method=ZhiFuBao, servieName=TaoBao, param=236] 傳回扣款結果:11.8

Request [method=WangYin, servieName=JingDong, param=130] 傳回扣款結果:6.5

Request [method=WeiXin, servieName=TaoBao, param=205] 傳回扣款結果:6.15

一共扣款了多少錢?126.77

消耗時間:3010毫秒!

可以發現,同樣實作了效率提升。且代碼友善了許多。

多線程中為了簡便Callable/Future對計算完成後獲得結果,這種程式更好的支援,提供了一種新的接口CompletionService。将BlockingQueue與FutureTask很好地結合起來,下一章例子介紹。

繼續閱讀