天天看點

Java多線程并發程式設計實戰之百萬級資料計算性能優化實戰

Java多線程并發程式設計實戰之百萬級資料計算性能優化實戰

出自:

出自 <騰訊課堂 700多分鐘幹貨實戰Java多線程高并發高性能實戰全集> , 我學習完了之後, 我給 老師在課上說的話做了個筆記,以及視訊的内容,還有代碼敲了一遍,然後添加了一些注釋,把執行結果也整理了一下, 做了個筆記

案例背景

某應用程式有100萬條資料,且每條資料在計算前的業務校驗邏輯平均耗時5毫秒,請運用多線程高并發變成等相關的基礎知識,實作1分鐘以内,完成計算100萬條資料的平方和.

Master-Worker設計模式

介紹

Java多線程并發程式設計實戰之百萬級資料計算性能優化實戰

1.Client發起任務給Master

2.Master将部分任務配置設定給Worker

3.Worker處理完了之後将結果傳回給Master

4.所有Worker處理完了之後,Master将結果累加之後傳回給Client

使用場景

可以分解大的任務并行化處理

角色

Master線程: 配置設定任務,合并Worker線程處理的結果

Worker線程: 處理具體的一個任務

模式優點

串行任務并行化處理,提高效率

頭腦風暴

Master-Worker設計模式是如何提高并發處理能力的?

答: 多個Worker線程并行的處理,處理完了傳回給Master線程合并結果, 這樣并發能力就變強了.

線程狀态類有哪些應用?

用于線程狀态判斷的,比如說Master線程會判斷Worker線程的任務是否結束,如果Worker線程結束了,Master就會将這個結束的線程的結果拿出來進行合并.

1.百萬級别資料計算的特點是什麼?

1.并發量很高,是百萬級别計算,性能比較高,

2.要求性能高,要求一分鐘内計算出來百萬資料的平方和.

測試結果

100工作線程的時候測試:

目前工作線程有: 100 個  ,執行計算結果為:333333833333500000 耗時: 60061毫秒
目前工作線程有: 100 個  ,執行計算結果為:333333833333500000 耗時: 60635毫秒      

200工作線程的時候測試:

目前工作線程有: 200 個  ,執行計算結果為:333333833333500000 耗時: 31753毫秒      

300工作線程的時候測試:

目前工作線程有: 300 個  ,執行計算結果為:333333833333500000 耗時: 22459毫秒      

500工作線程的時候測試:

目前工作線程有: 500 個  ,執行計算結果為:333333833333500000 耗時: 16198毫秒      

1000工作線程的時候測試:

目前工作線程有: 1000 個  ,執行計算結果為:333333833333500000 耗時: 17717毫秒
目前工作線程有: 1000 個  ,執行計算結果為:333333833333500000 耗時: 17370毫秒      

2000工作線程的時候測試:

目前工作線程有: 2000 個  ,執行計算結果為:333333833333500000 耗時: 18081毫秒

目前工作線程有: 2000 個  ,執行計算結果為:333333833333500000 耗時: 18968毫秒      

3000工作線程的時候測試:

目前工作線程有: 3000 個  ,執行計算結果為:333333833333500000 耗時: 18705毫秒
目前工作線程有: 3000 個  ,執行計算結果為:333333833333500000 耗時: 19129毫秒      

5000工作線程的時候測試:

電腦直接卡死,程式沒跑出來…我直接手動給關閉了.

代碼

操作說明

執行MultiThreadCompute這個類的testCompute測試類

Master

package com.yrxy.thread.case5;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Master是協調用的,給計算任務分給不同的worker線程來處理.
 */
public class Master {
    // 放任務的隊列
    protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
    //裝載Worker線程
    protected Map<String, Thread> workerThreadMap = new HashMap<String, Thread>();
    //每個worker計算結果放到這個map裡面

    protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

    /**
     * 構造方法
     *
     * @param worker      工作線程
     * @param countWorker 工作線程數量
     */
    public Master(Worker worker, int countWorker) {
        // 設定工作隊列
        worker.setWorkQueue(workQueue);
        // 設定 存放計算結果的Map
        worker.setResultMap(resultMap);

        for (int i = 0; i < countWorker; i++) {
            // 裝載Worker線程 ,!! 需要注意的是,這裡如果工作線程設定的過大的話,這裡裝在Worker線程會比較多
            // 如果你電腦組態不高的話,這裡可能會消耗很多時間去執行個體化線程,并且裝到workerThreadMap 裡面
            // 是以設定工作線程數量的時候要量力而行.
            workerThreadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
        }
    }

    /**
     * 會判斷每一個線程狀态是否有結束
     *
     * @return 沒結束就傳回false, 結束了就傳回true
     */
    public boolean isComplete() {
        for (Map.Entry<String, Thread> entry : workerThreadMap.entrySet()) {
            // Thread.State.TERMINATED : 終止線程的線程狀态。線程已完成執行

            if (entry.getValue().getState() != Thread.State.TERMINATED) {
                // 沒結束就傳回false

                return false;
            }
        }
        // 結束了就傳回true
        return true;
    }

    //送出
    public void submit(Object job) {
        workQueue.add(job);
    }


    public Map<String, Object> getResultMap() {
        return resultMap;
    }

    //發起執行
    public void execute() {
        for (Map.Entry<String, Thread> entry : workerThreadMap.entrySet()) {
            entry.getValue().start();
        }
    }
}      

Worker

package com.yrxy.thread.case5;

import java.util.Map;
import java.util.Queue;


public class Worker implements Runnable {
    /**
     * 工作隊列
     */
    protected Queue<Object> workQueue;
    /**
     * 存放計算結果的Map
     */
    protected Map<String, Object> resultMap;

    public void setWorkQueue(Queue<Object> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    /**
     * 計算相關的邏輯 ,這裡子類會實作
     */
    public Object handle(Object input) {
        return input;
    }

    /**
     * 從隊列裡面拉取内容,計算相關的邏輯,把計算的結果放到resultMap裡面去.
     */
    @Override
    public void run() {
        while (true) {
            //拉取内容
            Object input = workQueue.poll();
            if (input == null) {
                break;
            }
            //執行計算邏輯
            Object result = handle(input);

            // 計算的邏輯結果放到這個map裡面去.
            // key就是hashCode,value就是計算的結果
            resultMap.put(Integer.toString(input.hashCode()), result);
        }
    }
}      

ComputeWorker

package com.yrxy.thread.case5;

/**
 * Worker主要功能類
 */
public class ComputeWorker extends Worker {

    /**
     * 模拟校驗邏輯和計算邏輯
     *
     * @param input 計算的内容
     * @return 計算好的平方和
     */
    @Override
    public Object handle(Object input) {
        try {
            //模拟執行校驗邏輯
            System.out.println("我開始校驗了");
            Thread.sleep(5);  // 這裡模拟校驗耗時五毫秒
            System.out.println("我校驗完了");
        } catch (InterruptedException e) {

            e.printStackTrace();
        }
        Long i = (Long) input;
        return i * i; // 計算平方和
    }
}      

MultiThreadCompute

package com.yrxy.thread.case5;

import org.junit.Test;

import java.util.Map;
import java.util.Set;

public class MultiThreadCompute {

    @Test
    public void testCompute() {
        Long start = System.currentTimeMillis();
        // 初始化Master,初始化100個Worker

        int countWorker = 500;
        Master master = new Master(new ComputeWorker(), countWorker);
        //for循環送出任務,計算100萬條資料
        for (long i = 1; i < 1000001; i++) {
            master.submit(i);
        }
        master.execute();

        long re = 0;
        //擷取計算結果
        Map<String, Object> resultMap = master.getResultMap();

        //如不滿足這個while結果之後,說明計算完成了

        while (resultMap.size() > 0 || !master.isComplete()) {

            // 擷取存放任務的Map
            Set<String> keys = resultMap.keySet();

            String key = null;
            // 擷取第一個key然後跳出循環
            for (String k : keys) {
                key = k;
                break;
            }

            Long singleResult = null;
            if (key != null) {
                // 如果有key的話,就從resultMap裡面取出這個key對應的計算完的結果
                singleResult = (Long) resultMap.get(key);
            }
            // 如果計算結果不為空的話就進行累加
            if (singleResult != null) {
                re += singleResult;
            }
            // 如果這個key不是null的話,就從計算結果裡面删除這個key,原因是因為已經計算完了,留着也沒啥用了.
            if (key != null) {
                resultMap.remove(key);
            }
        }


        Long end = System.currentTimeMillis();

        System.out.println("目前工作線程有: " + countWorker + " 個  ,執行計算結果為:" + re + " 耗時: " + (end - start) + "毫秒");
    }


}      

代碼Git位址