天天看點

Hadoop架構設計、運作原理詳解1、Map-Reduce的邏輯過程3、Map-Reduce資料流(data flow)

假設我們需要處理一批有關天氣的資料,其格式如下:

按照ascii碼存儲,每行一條記錄

每一行字元從0開始計數,第15個到第18個字元為年

第25個到第29個字元為溫度,其中第25位是符号+/-

0067011990999991950051507+0000+

0043011990999991950051512+0022+

0043011990999991950051518-0011+

0043012650999991949032412+0111+

0043012650999991949032418+0078+

0067011990999991937051507+0001+

0043011990999991937051512-0002+

0043011990999991945051518+0001+

0043012650999991945032412+0002+

0043012650999991945032418+0078+

現在需要統計出每年的最高溫度。

map-reduce主要包括兩個步驟:map和reduce

每一步都有key-value對作為輸入和輸出:

map階段的key-value對的格式是由輸入的格式所決定的,如果是預設的textinputformat,則每行作為一個記錄程序處理,其中key為此行的開頭相對于檔案的起始位置,value就是此行的字元文本

map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應

對于上面的例子,在map過程,輸入的key-value對如下:

(0,0067011990999991950051507+0000+)

(33,0043011990999991950051512+0022+)

(66,0043011990999991950051518-0011+)

(99,0043012650999991949032412+0111+)

(132,0043012650999991949032418+0078+)

(165,0067011990999991937051507+0001+)

(198,0043011990999991937051512-0002+)

(231,0043011990999991945051518+0001+)

(264,0043012650999991945032412+0002+)

(297,0043012650999991945032418+0078+)

在map過程中,通過對每一行字元串的解析,得到年-溫度的key-value對作為輸出:

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937, -2)

(1945, 1)

(1945, 2)

(1945, 78)

在reduce過程,将map過程中的輸出,按照相同的key将value放到同一個清單中作為reduce的輸入

(1950, [0, 22, –11])

(1949, [111, 78])

(1937, [1, -2])

(1945, [1, 2, 78])

在reduce過程中,在清單中選擇出最大的溫度,将年-最大溫度的key-value作為輸出:

其邏輯過程可用如下圖表示:

下圖大概描述了map-reduce的job運作的基本原理:

下面我們讨論jobconf,其有很多的項可以進行配置:

setinputformat:設定map的輸入格式,預設為textinputformat,key為longwritable,value為text

setnummaptasks:設定map任務的個數,此設定通常不起作用,map任務的個數取決于輸入的資料所能分成的inputsplit的個數

setmapperclass:設定mapper,預設為identitymapper

setmaprunnerclass:設定maprunner, maptask是由maprunner運作的,預設為maprunnable,其功能為讀取inputsplit的一個個record,依次調用mapper的map函數

setmapoutputkeyclass和setmapoutputvalueclass:設定mapper的輸出的key-value對的格式

setoutputkeyclass和setoutputvalueclass:設定reducer的輸出的key-value對的格式

setpartitionerclass和setnumreducetasks:設定partitioner,預設為hashpartitioner,其根據key的hash值來決定進入哪個partition,每個partition被一個reduce task處理,是以partition的個數等于reducetask的個數

setreducerclass:設定reducer,預設為identityreducer

setoutputformat:設定任務的輸出格式,預設為textoutputformat

fileinputformat.addinputpath:設定輸入檔案的路徑,可以使一個檔案,一個路徑,一個通配符。可以被調用多次添加多個路徑

fileoutputformat.setoutputpath:設定輸出檔案的路徑,在job運作前此路徑不應該存在

當然不用所有的都設定,由上面的例子,可以編寫map-reduce程式如下:

public class maxtemperature {

    publicstatic void main(string[] args) throws ioexception {

       if (args.length != 2) {

           system.err.println("usage: maxtemperature <inputpath> <outputpath>");

           system.exit(-1);

       }

       jobconf conf = new jobconf(maxtemperature.class);

       conf.setjobname("max temperature");

       fileinputformat.addinputpath(conf, new path(args[0]));

       fileoutputformat.setoutputpath(conf, new path(args[1]));

       conf.setmapperclass(maxtemperaturemapper.class);

       conf.setreducerclass(maxtemperaturereducer.class);

       conf.setoutputkeyclass(text.class);

       conf.setoutputvalueclass(intwritable.class);

       jobclient.runjob(conf);

    }

}

map-reduce的處理過程主要涉及以下四個部分:

用戶端client:用于送出map-reduce任務job

jobtracker:協調整個job的運作,其為一個java程序,其main class為jobtracker

tasktracker:運作此job的task,處理input split,其為一個java程序,其mainclass為tasktracker

hdfs:hadoop分布式檔案系統,用于在各個程序間共享job相關的檔案

jobclient.runjob()建立一個新的jobclient執行個體,調用其submitjob()函數。

向jobtracker請求一個新的job id

檢測此job的output配置

計算此job的input splits

将job運作所需的資源拷貝到jobtracker的檔案系統中的檔案夾中,包括jobjar檔案,job.xml配置檔案,input splits

通知jobtracker此job已經可以運作了

送出任務後,runjob每隔一秒鐘輪詢一次job的進度,将進度傳回到指令行,直到任務運作完畢。

當jobtracker收到submitjob調用的時候,将此任務放到一個隊列中,job排程器将從隊列中擷取任務并初始化任務。

初始化首先建立一個對象來封裝job運作的tasks, status以及progress。

在建立task之前,job排程器首先從共享檔案系統中獲得jobclient計算出的input splits。

其為每個input split建立一個map task。

每個task被配置設定一個id。

tasktracker周期性的向jobtracker發送heartbeat。

在heartbeat中,tasktracker告知jobtracker其已經準備運作一個新的task,jobtracker将配置設定給其一個task。

在jobtracker為tasktracker選擇一個task之前,jobtracker必須首先按照優先級選擇一個job,在最高優先級的job中選擇一個task。

tasktracker有固定數量的位置來運作map task或者reduce task。

預設的排程器對待map task優先于reduce task

當選擇reduce task的時候,jobtracker并不在多個task之間進行選擇,而是直接取下一個,因為reducetask沒有資料本地化的概念。

tasktracker被配置設定了一個task,下面便要運作此task。

首先,tasktracker将此job的jar從共享檔案系統中拷貝到tasktracker的檔案系統中。

tasktracker從distributed cache中将job運作所需要的檔案拷貝到本地磁盤。

其次,其為每個task建立一個本地的工作目錄,将jar解壓縮到檔案目錄中。

其三,其建立一個taskrunner來運作task。

taskrunner建立一個新的jvm來運作task。

被建立的child jvm和tasktracker通信來報告運作進度。

maprunnable從inputsplit中讀取一個個的record,然後依次調用mapper的map函數,将結果輸出。

map的輸出并不是直接寫入硬碟,而是将其寫入緩存memory buffer。

當buffer中資料的到達一定的大小,一個背景線程将資料開始寫入硬碟。

在寫入硬碟之前,記憶體中的資料通過partitioner分成多個partition。

在同一個partition中,背景線程會将資料按照key在記憶體中排序。

每次從記憶體向硬碟flush資料,都生成一個新的spill檔案。

當此task結束之前,所有的spill檔案被合并為一個整的被partition的而且排好序的檔案。

reducer可以通過http協定請求map的輸出檔案,tracker.http.threads可以設定http服務線程數。

當map task結束後,其通知tasktracker,tasktracker通知jobtracker。

對于一個job,jobtracker知道tasktracer和map輸出的對應關系。

reducer中一個線程周期性的向jobtracker請求map輸出的位置,直到其取得了所有的map輸出。

reduce task需要其對應的partition的所有的map輸出。

reduce task中的copy過程即當每個map task結束的時候就開始拷貝輸出,因為不同的maptask完成時間不同。

reduce task中有多個copy線程,可以并行拷貝map輸出。

當很多map輸出拷貝到reduce task後,一個背景線程将其合并為一個大的排好序的檔案。

當所有的map輸出都拷貝到reduce task後,進入sort過程,将所有的map輸出合并為大的排好序的檔案。

最後進入reduce過程,調用reducer的reduce函數,處理排好序的輸出的每個key,最後的結果寫入hdfs。

當jobtracker獲得最後一個task的運作成功的報告後,将job得狀态改為成功。

當jobclient從jobtracker輪詢的時候,發現此job已經成功結束,則向使用者列印消息,從runjob函數中傳回。

如有不懂,歡迎撥打10010或10086,轉何哲江。

繼續閱讀