任務背景
大三上學期的課程設計任務。大概是要做一個計算加速的模型,通過将計算任務配置設定給兩台計算機來進行計算加速,在其中的每一台計算機上再通過并行運算指令集和多線程加速,盡可能的獲得高的加速比。
共同作者@Chelsea_n
老師給出的大作業要求
利用相關C++需要和加速(sse,多線程)手段,以及通訊技術(1.rpc,命名管道,2.http,socket)等實作函數(浮點數數組求和,求最大值,排序)。處理在兩台計算機協作執行,盡可能挖掘兩個計算機的潛在算力。
計算任務
1280萬浮點數進行求和、求最大值和排序運算,為了增加運算量使加速效果更加明顯,每次通過log(sqrt(raeFloatData[i]/4.0))方式通路資料。
給出的測試資料和計算内容如下:
#define MAX_THREADS 64
#define SUBDATANUM 2000000
#define DATANUM (SUBDATANUM * MAX_THREADS) /*這個數值是總資料量*/
//待測試資料定義為:
float rawFloatData[DATANUM];
//資料初始化
for (size_t i = 0; i < DATANUM; i++)
{
rawFloatData[i] =float((rand() + rand()+ rand() + rand()));//rand傳回short資料類型,增加随機性
}
/*為了模拟任務:每次通路資料時,用log(sqrt(rawFloatData[i]/ 4.0))進行通路!就是說比如計算加法 用
sum+=log(sqrt(rawFloatData[i]/ 4.0)),而不是sum+=rawFloatData[i] !!這裡計算結果和存儲精度之間有損失,
但你們機器的指令集限制,如果使用SSE中的double型的話,單指令隻能處理4個double,如果是float則可以8個。
是以用float加速比會更大。float計算有誤差,正常隻要笨算和加速計算結果一緻即可。
如果實在有精度控的人用double吧。*/
//需要提供的函數:(不加速版本,為同樣的資料量在兩台計算機上各自運作的時間。算法一緻,隻是不采用任何加速手段,比如SSE,多線程或者OPENMP)
float sum(const float data[],const int len); //data是原始資料,len為長度。結果通過函數傳回
float max((const float data[],const int len);//data是原始資料,len為長度。結果通過函數傳回
float sort((const float data[],const int len, float result[]);//data是原始資料,len為長度。排序結果在result中。
//雙機加速版本
float sumSpeedUp(const float data[],const int len); //data是原始資料,len為長度。結果通過函數傳回
float maxSpeedUp((const float data[],const int len);//data是原始資料,len為長度。結果通過函數傳回
float sortSpeedUp((const float data[],const int len, float result[]);//data是原始資料,len為長度。排序結果在result中。
//加速中如果使用SSE,特别注意SSE的指令和資料長度有關,單精度字尾ps,雙精度字尾pd。
測試速度的代碼架構為:
QueryPerformanceCounter(&start);//start
你的函數(…);//包括任務啟動,結果回傳,收集和綜合
QueryPerformanceCounter(&end);//end
std::cout << “Time Consumed:” << (end.QuadPart - start.QuadPart) << endl;
cout<<輸出求和結果<<endl;
cout<<輸出最大值<<endl;
cout<<輸出排序是否正确<<endl;
程式大緻架構
單機版本:(不加速,用最原始的辦法算)
1.初始化資料(1280萬浮點數)
2.求和(for循環即可)
3.求最大值(for循環即可)
4.排序(快排)
5.檢查排序結果(後項減前項,所有結果均大于等于0)
雙機版本(加速版)
兩台計算機分别初始化640萬資料
建立通訊連接配接(TCP)
1.求和(多線程+AVX)
2.求最大值(多線程)
3.排序(多個線程分别排序,再對有序資料塊歸并)
4.回傳資料(求和值+最大值+排序後的640萬資料)
5.對兩台計算機的計算結果進行綜合得到最終的結果
6.檢查排序
完整架構設計
如圖所示,本項目的整體架構可分為準備、計算和處理三個部分。
在準備階段,兩台計算機各自初始化640萬個單精度浮點數,并分别對socket進行初始化。然後,由用戶端向伺服器發起請求,伺服器接受用戶端請求後,配置設定新的socket套接字,完成連接配接并等待通信。
在計算階段,兩台計算機分别順序調用了sumSpeedUp,maxSpeedUp和sortSpeedUp三個函數。其中,sumSpeedUp和maxSpeedUp中分别建立了32個線程用于求和以及求最大值,且未使用其他算法。sortSpeedUp函數則建立了64個線程對全部資料分塊進行快速排序,然後再使用歸并算法将分塊有序的資料排列成整體有序的結果,歸并過程共建立2個線程。此外,除排序算法外,計算階段的所有代碼均使用AVX指令集加速。
在結束處理階段,伺服器将計算結果傳回用戶端。此時,根據兩台計算機各自的計算結果,完成最終的求和、求最大值,并再次調用歸并函數将兩個排序結果進行歸并。然後,檢查排序結果,輸出排序結果是否正确,求和結果以及資料最大值。
程式概要設計
無加速單機版
單機計算程式,主要用于為加速後的程式提供時間參考和結果驗證。在此版程式中,不采用任何加速手段,僅使用最為普通的計算方法:求和函數及求最大值函數均使用for循環實作,排序函數使用快速排序算法實作。其涉及的主要函數接口如下:
//求和
float sum(const float data[], const int len)
//求最大值
float MyMax(const float data[], const int len)
//快速排序
void MySort(const float data[], const int len, float result[])
//排序檢驗
bool checkSort(const float data[], const int len)
加速雙機版
雙機計算程式,主要采用了多線程及AVX指令集加速,并基于TCP設計了通信程式。在此版程式中,權衡運算量與線程開銷,最終決定求和函數及求最大值函數分别建立32個線程,而排序函數則建立64個線程,歸并過程建立2個線程。其涉及的主要函數接口如下:
//求和
float sum(const float data[], const int len)
//求最大值
float MyMax(const float data[], const int len)
//快速排序
void MySort(const float data[], const int len, float result[])
//排序檢驗
bool checkSort(const float data[], const int len)
程式詳細設計
宏定義及全局變量說明
#define MAX_THREADS 64
#define SUBDATANUM 100000
//資料總量
#define DATANUM (SUBDATANUM * MAX_THREADS)
定義三個功能函數各自的線程數,以及每個線程需要處理的資料量:
//sum & max
#define THREADS_NUM 32
#define THREADS_DATANUM SUBDATANUM * 2
//sort
#define THREADS_NUM_SORT 64
#define THREADS_DATANUM_SORT SUBDATANUM
//join
#define JOIN_NUM 2
其他多線程相關量定義如下。其中,排序函數并未定義ThreadID,詳情請見下文。
//信号量
HANDLE hSemaphores[THREADS_NUM];
HANDLE hSemaphores_sort[THREADS_NUM_SORT];
HANDLE hSemaphores_join[JOIN_NUM];
//線程ID max & sum
int ThreadID[THREADS_NUM];
int ThreadID_join[THREADS_NUM];
定義其他全局變量,其中yfsData_max的出現是為了避免在函數内部定義大數組。
//待測試資料
float rawFloatData[DATANUM];
//線程的中間結果 sum & max
float floatResults[THREADS_NUM], floatMaxs[THREADS_NUM];
//線程排序結果
float floatSorts[DATANUM];
//歸并輔助
float joinSorts[DATANUM];
//求最大值
__m256 yfsData_max[THREADS_NUM][THREADS_DATANUM/8];
sumSpeedUp函數詳述
首先建立32個線程,然後逐個開啟。線程函數的核心代碼如下:
1. DWORD WINAPI ThreadProc_sum(LPVOID lpParameter) {
2. ......
3. float a = 4.0;
4. __m256 Denominator = _mm256_set1_ps(a);
5. // AVX批量處理
6. for (size_t i = 0; i < cntBlock; ++i) {
7. yfsLoad = _mm256_load_ps(p);
8. yfsMid =_mm256_log_ps(_mm256_sqrt_ps(_mm256_div_ps(yfsLoad,Denominator)));
9. yfsSum = _mm256_add_ps(yfsSum, yfsMid);
10. p += nBlockWidth;
11. }
12. ......
13. return 0;
14. }
其中,變量yfsMid用于完成對原始資料的除法、開方和取對數操作,而變量yfsSum中存儲着最終計算結果。32個線程均運作結束後,得到一個存儲着32個浮點數的數組,對這32個中間結果求和時,同樣使用了AVX指令集加速,該段代碼與線程函數中的代碼相似。最後關閉線程,函數結束。
maxSpeedUp函數詳述
首先建立32個線程,然後逐個開啟。線程函數的核心代碼如下:
1. DWORD WINAPI ThreadProc_max(LPVOID lpParameter) {
2. ......
3. // 資料處理
4. __m256 yfsData[cntBlock]; // 轉換後的數組
5. ......
6. for (size_t i = 0; i < cntBlock; ++i) {
7. yfsLoad = _mm256_load_ps(pp);
8. yfsData[i] = _mm256_log_ps(_mm256_sqrt_ps(_mm256_div_ps(yfsLoad,Denominator)));
9. pp += nBlockWidth;
10. }
11. // 求最大值
12. __m256 maxVal = yfsData[0];
13. for (size_t i = 1; i < cntBlock; ++i) {
14. maxVal = _mm256_max_ps(maxVal, yfsData[i]);
15. }
16. ......
17. return 0;
18. }
首先對原始資料進行除法、開方和取對數處理,将處理結果存入yfsData數組中,該數組的每個變量類型均為__m256。然後使用_mm256_max_ps對處理後的數組進行for循環求最大值,結果存入變量maxVal中。最後對maxVal中的8個浮點數進行比較,得到一道線程的最大值求解結果。
32個線程均運作結束後,得到一個存儲着32個浮點數的數組,對這32個中間結果求最大值時,同樣使用了AVX指令集加速,該段代碼與線程函數中的代碼相似。最後關閉線程,函數結束
sortSpeedUp函數詳述
過程描述:
首先建立64個線程,然後逐個開啟。排序的線程函數與求最大值的線程函數一樣,需要先對資料進行處理,然後再調用快速排序函數。特别地,每個線程的排序結果都放到同一個數組floatSorts中。也就是說,當所有線程運作結束時,所有的資料都放在同一個數組中,且該數組分塊有序。
64個線程均運作結束後,調用歸并函數對分塊有序的資料進行最終的歸并排序。歸并時,采用兩兩歸并的方式,通過6次歸并将64個分塊有序的資料歸為一塊,完成排序的全部内容。共建立2個線程,每個線程完成一半資料的歸并,然後再進行最終的歸并。歸并控制核心代碼如下:
1. int who = *(int*)lpParameter;
2. float* pin1; //被合并數組
3. float* pin2; //合并後數組
4. int num = 32;
5. bool flag = 0;
6. int length; //被合并數組長度
7. for (size_t j = 1; j < 32; j = j * 2) {
8. if (flag == 0) {
9. pin1 = floatSorts + who * DATANUM / 2;
10. pin2 = joinSorts + who * DATANUM / 2;
11. }
12. else {
13. pin1 = joinSorts + who * DATANUM / 2;
14. pin2 = floatSorts + who * DATANUM / 2;
15. }
16. length = SUBDATANUM * j;
17. for (size_t i = 0; i < num; i = i + 2) {
18. joinsort((pin1 + i * length), (pin1 + (i + 1) * length), pin2 + i * length, length);
19. }
20. num = num / 2;
21. flag = !flag;
22. }
如上述代碼所示,由于每次歸并結束後,結果都會存入另一個數組(例如,64個線程結束後,數組floatSorts以10萬資料為一塊分塊有序。而第一次歸并結束,歸并結果存入joinSorts數組中,數組joinSorts以20萬資料為一塊分塊有序),是以通過變量flag控制floatSorts和joinSorts輪流作為被歸并數組,且歸并結果存入另一個數組。由于總歸并次數剛好為偶數,故最終結果剛好存在floatSorts數組中,符合要求。
在sortSpeedUp函數中對資料的操作流程可用下圖表示。
排序及歸并算法:
下面将簡要介紹程式中采用的快速排序算法以及分塊有序數組的歸并算法。
(1)快速排序算法
由于在排序過程中,需要對資料單個進行通路,無法使用AVX指令集加速,是以隻能通過在算法層面上進行改進來提高計算速度。如果采用較為基礎的冒泡排序等算法,其計算時間會顯著變長,是以我們選擇了快速排序算法,相對于冒泡排序O(n^2)的時間複雜度,快速排序算法平均時間複雜度隻有O(nlogn),其時間消耗在大資料量時變得尤為重要。
快速排序算法采用了分治的主要思想,通過基準數将數組不斷的劃分為不同範圍的兩部分,通過不斷重複劃分過程最終得到整個有序的數組。
在雙機加速時,我們雖然無法使用AVX指令集進行加速,快速排序分而治之的思想與多線程十分契合,我們将數組首先分為N個部分,由N個線程将其進行排序得到N個有序的數組,再通過歸并得到整個有序的數組。
(2)歸并
在得到N個有序數組後,我們需要将他們合并成一個有序的數組。我們采用了兩兩歸并的方式,從兩個數組的最小值開始比較,指針不斷後移,每次選擇兩個資料中較小的放入結果數組中,直到其中一個數組為空,将剩下的一個數組直接移到尾部完成一次歸并。
兩兩歸并的方式可以十分友善的應用于多線程中,但出于線程帶來的額外時間開銷的考慮,我們在此僅采用了兩個線程進行歸并,在将來更複雜資料的歸并中,可以嘗試使用更多的線程以提高加速比。
通信 TCP
在雙機協作版的程式中,由于通信規模較小且對通信可靠性有一定要求,是以我們選用了TCP 作為兩台計算機之間傳輸資料的方式。考慮到傳輸的資料量較大,且網絡傳輸中一幀的資料量約為1500位元組,發送和接受緩沖區的大小也有一定的限制,是以我們試圖将大量的資料拆成小包發送,以期能夠減少丢包和溢出,提高發送速率。基于以上想法我們提出了一下四種傳輸方式,并一一進行了實驗。
- 發送方整包發送,接收方整包接收;
- 發送方整包發送,接收方循環接收直至接收完全部資料;
- 發送方分包發送,接收方每次指定長度接收,直至收完為止;
-
發送方分包發送,接收方每次整包接收,直至收完為止。
其中發送和接收的核心代碼如下所示,通過修改SENDONCE 和RECEIVEONCE的值,可以實作上述四種方式的發送和接收。
發送方:
1. for (int i = 0; i < DATANUM / SENDONCE; i++)
2. {
3. sended=send(newConnection, (char*)&rawFloatData[i*SENDONCE], SENDONCE * sizeof(float), NULL);
4. //printf("%d:sended:%d\n",i,sended);
5.
6. }
7. recv(newConnection, (char*)&count, sizeof(count), NULL);
接收方:
1. while(1){
2. QueryPerformanceCounter(&time_start);//計時開始
3. receivesuccess = recv(Connection, &p[received], RECEIVEONCE * sizeof(float), NULL);
4. //printf("第%d次接收:接收資料量:%d\n", i, receivesuccess);
5. i = i + 1;
6. if (receivesuccess == -1)//列印錯誤
7. {
8. int erron=WSAGetLastError();
9. printf("erron=%d\n", erron);
10. }
11. else (receivesuccess != -1)
12. {
13. received = received + receivesuccess;
14. }
15. if (received >= DATANUM*4)
16. {
17. QueryPerformanceCounter(&time_over); //計時結束
18. run_time = 1000000 * (time_over.QuadPart - time_start.QuadPart) / dqFreq;
19. printf("\nrun_time:%fus\n", run_time);
20. break;
21. }
22. }
23. printf("receive end);
通過對上述四種方式進行測試,我們發現,TCP傳輸的效果并不好,雖然能夠保證資料傳輸正确,但其平均傳輸耗時約為13s,且無論上述四種哪種方式,總傳輸耗時相差不多。當發送方整包發送的時候,發送方能夠較快的執行完send函數(如下圖所示),但接收方在發送方發送完成後,仍舊需要較長的時間接收,且無論接收方采用何種接收方式,均無法進一步改善傳輸時間。當接收方采用較大長度接收時,執行單個recv函數耗時較長,但若減小接收長度則執行次數較多,對于改善傳輸時間,均沒有明顯作用。且在實驗過程中發現,若采用自主分包的方式,容易會在兩個資料包發送的交界處帶來資料傳輸錯誤。
通過查閱相關資料,我們發現send與recv函數實際并不執行具體的發送任務,而隻是實作與發送和接收緩沖區的互動,具體的資料發送實際是由TCP協定完成。即意味着,send函數傳回時,已經将全部資料送入待發送的資料緩沖區,而此時資料開始向接收緩沖區流動,但由于網絡傳輸中可能存在的某些原因,導緻資料在進入接收緩沖區的速度較慢,recv函數每次讀取時,緩沖區内的資料沒有達到預設的接收長度,是以發生了阻塞。
不僅如此,通過查閱上述資料我們發現,recv函數隻是負責從資料緩沖區中拷貝資料,即意為這,recv接收方并不知道發送方的發送方式,無論是s一次end全部資料還是分次放入發送緩沖區,均不影響recv的接收。
關于由于丢包導緻大量資料重傳進而影響傳輸速率的猜測,我們試圖通過調整單次發送資料的長度和接收資料的長度,以期能夠使發送和接收速率相似,進而減少資料重傳,但經過試驗發現并沒有達到預期的效果。
是以,最終我們選擇了最為簡單友善的方式,即send一次性整包發送,recv一次整包接收,由TCP底層協定對其進行拆包操作,并将TCP發送和接收使用的核心緩沖區均修改為資料總量大小。修改後,發送方發送較快,但接收方接收仍舊較慢,并無明顯改善,用較小的資料長度循環接收測試發現,緩沖區内資料流入速度低于recv的讀取速度,經常無法讀取完指定長度的資料。
示範、測試和效果評估
測試環境的搭建
1、使用Visual Studio 2019專業版/社群版,低版本可能不支援AVX指令集中的部分函數。
2、保證兩台計算機在同一區域網路下,且IP位址不可為100.xx.xx.xx或10.xx.xx.xx。
操作步驟
1、本項目為友善測試加速比,共有單機版程式、單機加速版程式與雙機加速版程式三個檔案,需要分别測試時間。
2、單機版程式、單機加速版程式可直接運作,無需任何修改。
3、雙機加速版程式在運作前需要做修改。在reciever的第458行,sender的461行,均需将IP位址改為reciever的IP位址。
注:排序檢查函數checkSort的傳回值為1代表排序正确,為0代表排序錯誤。
測試結果及評價
下面對單機版程式、單機加速版程式和雙機加速版程式進行測試,測試流程為執行5次任務,并求時間的平均值。
單機版程式測試:
1、計算機1測試結果
測試次數 | 運作時間 |
---|---|
1 | 7076944.900000us |
2 | 7416633.700000us |
3 | 8647607.400000us |
4 | 6606958.000000us |
5 | 8503165.200000us |
平均時間:t1=7650261.84us,資料和:6.17993*107,最大值:5.19648,排序結果正确。
2、計算機2測試結果
測試次數 | 運作時間 |
---|---|
1 | 8404677.000000us |
2 | 9434140.900000us |
3 | 9416527.800000us |
4 | 9642741.900000us |
5 | 9403498.000000us |
平均時間:t1=9260317us,資料和:6.17993*107,最大值:5.19648,排序結果正确。
加速版程式單機測試
本節中對加速版程式進行單機測試,資料量為雙機測試時的兩倍,也即與單機版程式測試資料量相同。
1、計算機1測試結果
測試次數 | 運作時間 |
---|---|
1 | 412523.900000us |
2 | 404471.100000us |
3 | 421559.100000us |
4 | 404154.000000us |
5 | 431603.000000us |
平均時間:t1=414862.22us,資料和:6.17993*107,最大值:5.19648,排序結果正确。
與單機相比,加速比約為18.44。
2、計算機2測試結果
測試次數 | 運作時間 |
---|---|
1 | 688923.300000us |
2 | 562931.800000us |
3 | 568764.300000us |
4 | 585099.000000us |
5 | 578702.500000us |
平均時間:t1=596884.2us,資料和:6.17993*107,最大值:5.19648,排序結果正确。
與單機相比,加速比約為15.52。
雙機版程式測試
雙機測試結果如下。其中,接收方運作時間即全部資料的計算時間,發送方運作時間為發送方資料運算與發送資料時間之和。
測試次數 | 接收方運作時間 | 發送方運作時間 |
---|---|---|
1 | 12893272.700000us | 205192.100000us |
2 | 13647811.700000us | 233872.800000us |
3 | 13608242.500000us | 229874.700000us |
4 | 13982430.600000us | 254460.700000us |
5 | 13314847.300000us | 252905.000000us |
平均時間:t1=13489320.96us,資料和:6.17993*107,最大值:5.19648,排序結果正确。
根據4.3.1中的單機測試結果,求得兩台計算機的單機計算平均時間為:8455289.42us,故雙機加速比為:0.6268。
總結:
程式設計中,主要實作了百萬級資料的求和、求最大值以及排序功能。其中,排序功能使用快速排序算法和歸并算法實作。共采用3項加速手段:其一,多線程。綜合考慮運算量與線程開銷,在求和及求最大值的過程中,分别建立了32個線程,而在排序過程中,建立了64個線程,歸并過程中建立了2個線程。其二,AVX指令集。使用AVX指令集對全部程式進行改寫。其三,TCP通信。使用TCP協定進行網絡通信,将全部資料分開在兩台計算機上進行運算。
測試時,分别測試了單機版程式、單機加速版程式以及雙機加速版程式。經過5次運算并求平均值,單機加速版程式的加速比在兩台計算機上分别約為18.44和15.52。但由于傳輸速度過慢,雙機加速版程式的加速比不盡理想,為:0.6268。