Java多線程并發程式設計實戰之百萬級資料計算性能優化實戰
出自:
出自 <騰訊課堂 700多分鐘幹貨實戰Java多線程高并發高性能實戰全集> , 我學習完了之後, 我給 老師在課上說的話做了個筆記,以及視訊的内容,還有代碼敲了一遍,然後添加了一些注釋,把執行結果也整理了一下, 做了個筆記
案例背景
某應用程式有100萬條資料,且每條資料在計算前的業務校驗邏輯平均耗時5毫秒,請運用多線程高并發變成等相關的基礎知識,實作1分鐘以内,完成計算100萬條資料的平方和.
Master-Worker設計模式
介紹
1.Client發起任務給Master
2.Master将部分任務配置設定給Worker
3.Worker處理完了之後将結果傳回給Master
4.所有Worker處理完了之後,Master将結果累加之後傳回給Client
使用場景
可以分解大的任務并行化處理
角色
Master線程: 配置設定任務,合并Worker線程處理的結果
Worker線程: 處理具體的一個任務
模式優點
串行任務并行化處理,提高效率
頭腦風暴
Master-Worker設計模式是如何提高并發處理能力的?
答: 多個Worker線程并行的處理,處理完了傳回給Master線程合并結果, 這樣并發能力就變強了.
線程狀态類有哪些應用?
用于線程狀态判斷的,比如說Master線程會判斷Worker線程的任務是否結束,如果Worker線程結束了,Master就會将這個結束的線程的結果拿出來進行合并.
1.百萬級别資料計算的特點是什麼?
1.并發量很高,是百萬級别計算,性能比較高,
2.要求性能高,要求一分鐘内計算出來百萬資料的平方和.
測試結果
100工作線程的時候測試:
目前工作線程有: 100 個 ,執行計算結果為:333333833333500000 耗時: 60061毫秒
目前工作線程有: 100 個 ,執行計算結果為:333333833333500000 耗時: 60635毫秒
200工作線程的時候測試:
目前工作線程有: 200 個 ,執行計算結果為:333333833333500000 耗時: 31753毫秒
300工作線程的時候測試:
目前工作線程有: 300 個 ,執行計算結果為:333333833333500000 耗時: 22459毫秒
500工作線程的時候測試:
目前工作線程有: 500 個 ,執行計算結果為:333333833333500000 耗時: 16198毫秒
1000工作線程的時候測試:
目前工作線程有: 1000 個 ,執行計算結果為:333333833333500000 耗時: 17717毫秒
目前工作線程有: 1000 個 ,執行計算結果為:333333833333500000 耗時: 17370毫秒
2000工作線程的時候測試:
目前工作線程有: 2000 個 ,執行計算結果為:333333833333500000 耗時: 18081毫秒
目前工作線程有: 2000 個 ,執行計算結果為:333333833333500000 耗時: 18968毫秒
3000工作線程的時候測試:
目前工作線程有: 3000 個 ,執行計算結果為:333333833333500000 耗時: 18705毫秒
目前工作線程有: 3000 個 ,執行計算結果為:333333833333500000 耗時: 19129毫秒
5000工作線程的時候測試:
電腦直接卡死,程式沒跑出來…我直接手動給關閉了.
代碼
操作說明
執行MultiThreadCompute這個類的testCompute測試類
Master
package com.yrxy.thread.case5;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Master是協調用的,給計算任務分給不同的worker線程來處理.
*/
public class Master {
// 放任務的隊列
protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
//裝載Worker線程
protected Map<String, Thread> workerThreadMap = new HashMap<String, Thread>();
//每個worker計算結果放到這個map裡面
protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
/**
* 構造方法
*
* @param worker 工作線程
* @param countWorker 工作線程數量
*/
public Master(Worker worker, int countWorker) {
// 設定工作隊列
worker.setWorkQueue(workQueue);
// 設定 存放計算結果的Map
worker.setResultMap(resultMap);
for (int i = 0; i < countWorker; i++) {
// 裝載Worker線程 ,!! 需要注意的是,這裡如果工作線程設定的過大的話,這裡裝在Worker線程會比較多
// 如果你電腦組態不高的話,這裡可能會消耗很多時間去執行個體化線程,并且裝到workerThreadMap 裡面
// 是以設定工作線程數量的時候要量力而行.
workerThreadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
}
}
/**
* 會判斷每一個線程狀态是否有結束
*
* @return 沒結束就傳回false, 結束了就傳回true
*/
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : workerThreadMap.entrySet()) {
// Thread.State.TERMINATED : 終止線程的線程狀态。線程已完成執行
if (entry.getValue().getState() != Thread.State.TERMINATED) {
// 沒結束就傳回false
return false;
}
}
// 結束了就傳回true
return true;
}
//送出
public void submit(Object job) {
workQueue.add(job);
}
public Map<String, Object> getResultMap() {
return resultMap;
}
//發起執行
public void execute() {
for (Map.Entry<String, Thread> entry : workerThreadMap.entrySet()) {
entry.getValue().start();
}
}
}
Worker
package com.yrxy.thread.case5;
import java.util.Map;
import java.util.Queue;
public class Worker implements Runnable {
/**
* 工作隊列
*/
protected Queue<Object> workQueue;
/**
* 存放計算結果的Map
*/
protected Map<String, Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
}
/**
* 計算相關的邏輯 ,這裡子類會實作
*/
public Object handle(Object input) {
return input;
}
/**
* 從隊列裡面拉取内容,計算相關的邏輯,把計算的結果放到resultMap裡面去.
*/
@Override
public void run() {
while (true) {
//拉取内容
Object input = workQueue.poll();
if (input == null) {
break;
}
//執行計算邏輯
Object result = handle(input);
// 計算的邏輯結果放到這個map裡面去.
// key就是hashCode,value就是計算的結果
resultMap.put(Integer.toString(input.hashCode()), result);
}
}
}
ComputeWorker
package com.yrxy.thread.case5;
/**
* Worker主要功能類
*/
public class ComputeWorker extends Worker {
/**
* 模拟校驗邏輯和計算邏輯
*
* @param input 計算的内容
* @return 計算好的平方和
*/
@Override
public Object handle(Object input) {
try {
//模拟執行校驗邏輯
System.out.println("我開始校驗了");
Thread.sleep(5); // 這裡模拟校驗耗時五毫秒
System.out.println("我校驗完了");
} catch (InterruptedException e) {
e.printStackTrace();
}
Long i = (Long) input;
return i * i; // 計算平方和
}
}
MultiThreadCompute
package com.yrxy.thread.case5;
import org.junit.Test;
import java.util.Map;
import java.util.Set;
public class MultiThreadCompute {
@Test
public void testCompute() {
Long start = System.currentTimeMillis();
// 初始化Master,初始化100個Worker
int countWorker = 500;
Master master = new Master(new ComputeWorker(), countWorker);
//for循環送出任務,計算100萬條資料
for (long i = 1; i < 1000001; i++) {
master.submit(i);
}
master.execute();
long re = 0;
//擷取計算結果
Map<String, Object> resultMap = master.getResultMap();
//如不滿足這個while結果之後,說明計算完成了
while (resultMap.size() > 0 || !master.isComplete()) {
// 擷取存放任務的Map
Set<String> keys = resultMap.keySet();
String key = null;
// 擷取第一個key然後跳出循環
for (String k : keys) {
key = k;
break;
}
Long singleResult = null;
if (key != null) {
// 如果有key的話,就從resultMap裡面取出這個key對應的計算完的結果
singleResult = (Long) resultMap.get(key);
}
// 如果計算結果不為空的話就進行累加
if (singleResult != null) {
re += singleResult;
}
// 如果這個key不是null的話,就從計算結果裡面删除這個key,原因是因為已經計算完了,留着也沒啥用了.
if (key != null) {
resultMap.remove(key);
}
}
Long end = System.currentTimeMillis();
System.out.println("目前工作線程有: " + countWorker + " 個 ,執行計算結果為:" + re + " 耗時: " + (end - start) + "毫秒");
}
}