天天看點

多線程設計模式之一——Master-Worker設計模式

1.Master-Worker原理

     1.1) Master-Worker模式是一種常見的并行設計模式之一,并行展現在一下兩個方面:

              1.1.1) Client端把待處理的任務交給Master,Master将這個任務拆分成多個子任務交給Worker線程進行處理,

此時Client不需要阻塞的等待Master的處理結果,可以去處理其他的事情

              1.1.2) Master在彙總結果的時候,不用等到Worker把所有的子任務都完成,才一起去彙總處理結果,Master可

以一邊先處理Worker已經處理完的子任務,Worker也可以繼續處理未完成的子任務

    1.2) Master-Worker工作圖

多線程設計模式之一——Master-Worker設計模式

      1.3)Master-Worker模式的結構

             1.3.1)Master-Worker模式是一種使用多線程技術進行資料處理的結果模型

             1.3.2)Master線程為主線程,它維護了一個Worker線程隊列,子任務隊列和子結果集;Worker線程隊列,

不斷的從子任務隊列中提取任務處理,并把處理結果寫入子結果集中去

多線程設計模式之一——Master-Worker設計模式

2.Master-Worker模式的簡單代碼實作

   2.1)Master類

package com.roger.multi_thread;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 主線程類
 */
public class Master {

    //Worker線程隊列
    protected Map<String,Thread> workerThreadMap = new HashMap<>();
    //子任務隊列,Worker線程隊列,不斷從子任務隊列中,擷取待處理的任務進行執行
    protected Queue<Object> subTaskWorkerQueue = new ConcurrentLinkedQueue<>();
    //子任務的結果集存儲容器
    protected Map<String,Object> resultMap = new ConcurrentHashMap<>();

    /**
     * 構造Master的時候,需要一個Worker線程邏輯,
     * 以及準備構造幾個Worker線程進行子任務的處理
     */
    public Master(Worker worker, int workerCount) {
        worker.setSubTaskWorkerQueue(subTaskWorkerQueue);
        worker.setResultMap(resultMap);

        for(int i = 1; i<= workerCount; i ++){
            workerThreadMap.put(Integer.toString(i),new Thread(worker));
        }
    }

    /**
     * 是否所有的任務都全部執行完成
     * @return
     *      true 所有任務均已結束
     *      false 還有任務待進行中
     */
    public boolean isComplete(){
        for(Map.Entry<String,Thread> threadMap : workerThreadMap.entrySet()){
            if(threadMap.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    /**
     * 執行Worker子線程
     */
    public void execute(){
        for(Map.Entry<String,Thread> threadMap : workerThreadMap.entrySet()){
            threadMap.getValue().start();
        }
    }

    /**
     * 送出子任務
     * @param task
     */
    public void submit(Object task){
        subTaskWorkerQueue.add(task);
    }

    /**
     * 擷取所有子任務的處理結果集
     * @return
     */
    public Map<String, Object> getResultMap() {
        return resultMap;
    }
}
           

   2.2)Worker線程類的代碼

package com.roger.multi_thread;

import java.util.Map;
import java.util.Queue;

public abstract class Worker implements Runnable {
    //Master線程中的所有子任務隊列
    public Queue<Object> subTaskWorkerQueue;
    //Master線程中子任務處理的結果集
    public Map<String,Object> resultMap;

    @Override
    public void run() {
        //這裡做非空判斷的原因是保證代碼的高可用
       if(subTaskWorkerQueue == null || subTaskWorkerQueue.isEmpty()){
           return;
       }
       for(;;){
           //使用poll()方法處理,彈出并删除對首的對象
           Object task = subTaskWorkerQueue.poll();
           if(task == null){
               //這個線程處理結束
               break;
           }
           //處理子任務
           Object result = bizProcess(task);
           //這裡沒有做非空判斷的原因,是這裡位址傳遞
           //在構造Master的時候,把Master中的子任務結果集
           //傳入Worker子線程中來
           //if(resultMap == null){
               //resultMap = new HashMap<>();
           //}
           //把處理結果存入結果集中去
           resultMap.put(Integer.toString(task.hashCode()),result);
       }
    }

    public void setSubTaskWorkerQueue(Queue<Object> subTaskWorkerQueue) {
        this.subTaskWorkerQueue = subTaskWorkerQueue;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    /**
     * 子任務的業務處理結果
     * 具體使用是通過重寫bizProcess方法
     * @param task
     * @return
     */
    public abstract Object bizProcess(Object task);
}
           

     2.3) 具體業務實作類,繼承Worker線程類,重寫抽象方法 bizProcess(Object task)方法,以計算

                                    1*1*1 + 2*2*2 + 3*3*3 + ... + 100*100+100 = ?

為例。

           2.3.1) 分析這個任務,Master需要把這個任務拆分成100個子任務進行處理,每個任務隻需處理1個自然數字的立方即可,

Master負責計算總和

           2.3.2) 子任務的代碼實作,繼承Worker線程類,重寫抽象方法 bizProcess方法

package com.roger.multi_thread;

public class PlusWorker extends Worker {

    @Override
    public Object bizProcess(Object task) {
        Integer num = (Integer) task;
        return num.intValue() * num.intValue()*num.intValue();
    }
}
           

           2.3.3) 啟動類的實作代碼

package com.roger.multi_thread;

import java.util.Map;
import java.util.Set;

/**
 * Client向Master遞交一個 1到100的立方和相加的任務
 *      1*1*1 + 2*2*2 + 3*3*3 + ... + 100*100*100
 */
public class Bootstrap {

    public static void main(String[] args) {
        PlusWorker plusWorker = new PlusWorker();
        Master master = new Master(plusWorker,5);
        //Master線程拆分子任務,送出到Master管理的子任務隊列中去
        for(int i = 1; i <= 100; i ++){
            master.submit(i);
        }

        //Master啟動worker子線程,進行子任務的處理,并把子任務處理的結果彙總到
        //Master管理的子任務結果集容器中去
        master.execute();
        //定義一個用戶端送出的總任務處理結果的變量
        int result = 0;
        //彙總Master中管理的子任務結果集
        Map<String, Object> resultMap = master.getResultMap();
        //worker線程一邊處理子任務,master線程一邊彙總結果集
        //結果集中已有Worker線程處理完的子任務結果集
        //或者
        //Master的子任務還沒有全部處理完成
        while (!resultMap.isEmpty() || !master.isComplete()){
            //每次隻處理一個key的值,循環處理,知道任務結束為止
            Set<String> keySet = resultMap.keySet();
            String key = null;
            for(String k : keySet){
                key = k;
                break;
            }
            int reValue = 0;
            if(key != null){
                reValue = (Integer) resultMap.get(key);
                //移除已經取出的子任務計算結果
                resultMap.remove(key);
            }
            result = result + reValue;
        }

        System.out.println("1*1*1 + 2*2*2 + 3*3*3 + ... + 100*100*100 = " + result);
    }
}
           

          2.3.4)啟動類運作結果:

多線程設計模式之一——Master-Worker設計模式

繼續閱讀