1.Master-Worker原理
1.1) Master-Worker模式是一種常見的并行設計模式之一,并行展現在一下兩個方面:
1.1.1) Client端把待處理的任務交給Master,Master将這個任務拆分成多個子任務交給Worker線程進行處理,
此時Client不需要阻塞的等待Master的處理結果,可以去處理其他的事情
1.1.2) Master在彙總結果的時候,不用等到Worker把所有的子任務都完成,才一起去彙總處理結果,Master可
以一邊先處理Worker已經處理完的子任務,Worker也可以繼續處理未完成的子任務
1.2) Master-Worker工作圖
1.3)Master-Worker模式的結構
1.3.1)Master-Worker模式是一種使用多線程技術進行資料處理的結果模型
1.3.2)Master線程為主線程,它維護了一個Worker線程隊列,子任務隊列和子結果集;Worker線程隊列,
不斷的從子任務隊列中提取任務處理,并把處理結果寫入子結果集中去
2.Master-Worker模式的簡單代碼實作
2.1)Master類
package com.roger.multi_thread;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 主線程類
*/
public class Master {
//Worker線程隊列
protected Map<String,Thread> workerThreadMap = new HashMap<>();
//子任務隊列,Worker線程隊列,不斷從子任務隊列中,擷取待處理的任務進行執行
protected Queue<Object> subTaskWorkerQueue = new ConcurrentLinkedQueue<>();
//子任務的結果集存儲容器
protected Map<String,Object> resultMap = new ConcurrentHashMap<>();
/**
* 構造Master的時候,需要一個Worker線程邏輯,
* 以及準備構造幾個Worker線程進行子任務的處理
*/
public Master(Worker worker, int workerCount) {
worker.setSubTaskWorkerQueue(subTaskWorkerQueue);
worker.setResultMap(resultMap);
for(int i = 1; i<= workerCount; i ++){
workerThreadMap.put(Integer.toString(i),new Thread(worker));
}
}
/**
* 是否所有的任務都全部執行完成
* @return
* true 所有任務均已結束
* false 還有任務待進行中
*/
public boolean isComplete(){
for(Map.Entry<String,Thread> threadMap : workerThreadMap.entrySet()){
if(threadMap.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
/**
* 執行Worker子線程
*/
public void execute(){
for(Map.Entry<String,Thread> threadMap : workerThreadMap.entrySet()){
threadMap.getValue().start();
}
}
/**
* 送出子任務
* @param task
*/
public void submit(Object task){
subTaskWorkerQueue.add(task);
}
/**
* 擷取所有子任務的處理結果集
* @return
*/
public Map<String, Object> getResultMap() {
return resultMap;
}
}
2.2)Worker線程類的代碼
package com.roger.multi_thread;
import java.util.Map;
import java.util.Queue;
public abstract class Worker implements Runnable {
//Master線程中的所有子任務隊列
public Queue<Object> subTaskWorkerQueue;
//Master線程中子任務處理的結果集
public Map<String,Object> resultMap;
@Override
public void run() {
//這裡做非空判斷的原因是保證代碼的高可用
if(subTaskWorkerQueue == null || subTaskWorkerQueue.isEmpty()){
return;
}
for(;;){
//使用poll()方法處理,彈出并删除對首的對象
Object task = subTaskWorkerQueue.poll();
if(task == null){
//這個線程處理結束
break;
}
//處理子任務
Object result = bizProcess(task);
//這裡沒有做非空判斷的原因,是這裡位址傳遞
//在構造Master的時候,把Master中的子任務結果集
//傳入Worker子線程中來
//if(resultMap == null){
//resultMap = new HashMap<>();
//}
//把處理結果存入結果集中去
resultMap.put(Integer.toString(task.hashCode()),result);
}
}
public void setSubTaskWorkerQueue(Queue<Object> subTaskWorkerQueue) {
this.subTaskWorkerQueue = subTaskWorkerQueue;
}
public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
}
/**
* 子任務的業務處理結果
* 具體使用是通過重寫bizProcess方法
* @param task
* @return
*/
public abstract Object bizProcess(Object task);
}
2.3) 具體業務實作類,繼承Worker線程類,重寫抽象方法 bizProcess(Object task)方法,以計算
1*1*1 + 2*2*2 + 3*3*3 + ... + 100*100+100 = ?
為例。
2.3.1) 分析這個任務,Master需要把這個任務拆分成100個子任務進行處理,每個任務隻需處理1個自然數字的立方即可,
Master負責計算總和
2.3.2) 子任務的代碼實作,繼承Worker線程類,重寫抽象方法 bizProcess方法
package com.roger.multi_thread;
public class PlusWorker extends Worker {
@Override
public Object bizProcess(Object task) {
Integer num = (Integer) task;
return num.intValue() * num.intValue()*num.intValue();
}
}
2.3.3) 啟動類的實作代碼
package com.roger.multi_thread;
import java.util.Map;
import java.util.Set;
/**
* Client向Master遞交一個 1到100的立方和相加的任務
* 1*1*1 + 2*2*2 + 3*3*3 + ... + 100*100*100
*/
public class Bootstrap {
public static void main(String[] args) {
PlusWorker plusWorker = new PlusWorker();
Master master = new Master(plusWorker,5);
//Master線程拆分子任務,送出到Master管理的子任務隊列中去
for(int i = 1; i <= 100; i ++){
master.submit(i);
}
//Master啟動worker子線程,進行子任務的處理,并把子任務處理的結果彙總到
//Master管理的子任務結果集容器中去
master.execute();
//定義一個用戶端送出的總任務處理結果的變量
int result = 0;
//彙總Master中管理的子任務結果集
Map<String, Object> resultMap = master.getResultMap();
//worker線程一邊處理子任務,master線程一邊彙總結果集
//結果集中已有Worker線程處理完的子任務結果集
//或者
//Master的子任務還沒有全部處理完成
while (!resultMap.isEmpty() || !master.isComplete()){
//每次隻處理一個key的值,循環處理,知道任務結束為止
Set<String> keySet = resultMap.keySet();
String key = null;
for(String k : keySet){
key = k;
break;
}
int reValue = 0;
if(key != null){
reValue = (Integer) resultMap.get(key);
//移除已經取出的子任務計算結果
resultMap.remove(key);
}
result = result + reValue;
}
System.out.println("1*1*1 + 2*2*2 + 3*3*3 + ... + 100*100*100 = " + result);
}
}
2.3.4)啟動類運作結果: