第6章 Map Reduce上手實踐
導讀學習一門架構程式設計技術,在了解架構整體功能特性和工作機制後,快速上手的方式就是利用這個架構來寫出屬于自己的第一個程式。
本章就以非常典型且能相當好地诠釋MAP REDUCE特性的詞頻統計為例,以詳盡的步驟,引導讀者成功開發并運作自己的第一個MAP REDUCE分布式資料處理程式。
6.1 Map Reduce入門程式設計案例
6.1.1 案例需求
假設有大量文本檔案,内容樣例如下:
hello jack
hello tom
hello jim
tom and jack are good friends
jim and tom are good friends too
需要統計出所有檔案中各單詞出現的總次數,即詞頻統計。
6.1.2 思路設計
回顧MAP REDUCE程式中,整個資料處理的流程分為map和reduce兩個階段,而這兩個階段中真正工作的程式是Map Task和Reduce Task,這兩個程式已在架構中提供,而使用者在編寫自己的MAP REDUCE程式時,則隻需要為Map Task和Reduce Task提供進行資料處理時的具體邏輯函數,即map()函數和reduce()函數。
初學階段開發mapreduce程式時,可以不用過多考慮mapreduce程式運作時的狀況,隻要抓住mapreduce程式設計模型中的要點即可,即考慮map階段讀取到原始資料後産生出什麼樣的key和value,以便能在reduce階段按key分組聚合運算時能得到需求的結果。
本例邏輯相對簡單,說明如下:
- map階段:
- Map Task會從HDFS的源資料檔案中逐行讀取資料,然後對每一行調用一次使用者自定義的map()方法;
- 使用者自定義的map()函數中:
- 将每一行資料切分出單詞;
- 為每一個單詞構造一個鍵值對(單詞,1);
- 将鍵值對傳回給Map Task。
- reduce階段:
- Reduce Task會擷取map階段輸出的單詞鍵值對;
- Reduce Task會将key相同(即單詞相同)的鍵值對彙聚成組;然後對每一組kv資料調用一次使用者自定義的reduce()方法來運算處理;
- 在使用者自定義的reduce()函數中:
- 周遊reduce Task所傳入的一組kv資料,将value值累加求和,即得到每一個單詞的總次數;
- 将計算後得到的(單詞,總次數)傳回給reduce task。
wordcount邏輯示意圖:

圖6.1 WordCount資料流轉流程
6.1.3 關鍵技術點解析
我們在開發自定義的Mapper類時,需要定義輸入資料的泛型類型(KEY IN,VALUE IN)。而這些泛型的确定,依賴于輸入資料的類型。
輸入資料的類型決定機制描述如下:
Map Task預設情況下是調用TextInputFormat擷取一個LineRecordReader來讀取檔案,其機制為逐行讀取,每讀到一行資料,就将這一行在檔案中的起始偏移量(類型為long)作為key資料,行的文本内容(類型為String)作為value資料;
map()函數處理後産生的資料也需要組織成一個key和一個value,也需要為此聲明輸出資料的泛型類型(KEY OUT, VALUE OUT);這兩個泛型的聲明,則取決于我們自定義的map()方法将要生成的結果key-value資料的類型;
而mapreduce中,資料經常需要持久化到磁盤檔案,或經網絡連接配接進行傳輸,而不管持久化到檔案還是通過網絡傳輸,key-value資料都需要序列化和反序列化操作。為了降低資料的體積,提高效率,Hadoop開發了一種自己的系列化架構。上述的key-value資料類型也就不能直接采用JDK中提供的原生類型,而是需要使用實作了Hadoop序列化接口的類型,如下:
- KEY IN : 行的起始偏移量,原生類型long,對應的Hadoop序列化類型:LongWritable
- VALUE IN:行的文本内容,原生類型String,對應的Hadoop序列化類型:Text
- KEY OUT: 一個單詞,原生類型String,對應的Hadoop序列化類型:Text
- VALUE OUT: 單詞次數1,原生類型int,對應的Hadoop序列化類型:IntWritable
所謂的序列化,就是将記憶體中的java對象結構資料“壓扁”成一個線性的二進制資料序列,反序列化則反之。由jdk中的序列化機制所生成的二進制序列中,不僅包含對象中的資料,還包含很多附加資訊(如類名、繼承關系等),換句話說就是jdk的序列化機制所産生的序列化結果非常臃腫和備援。
在資料量很大且需要頻繁進行磁盤序列化和網絡傳輸的情況下,如果用jdk的原生序列化機制,會導緻資料傳輸和存儲的負擔太大。是以Hadoop開發了一套自己的序列化架構,對象序列化後包含的資訊僅限于對象中的資料内容,進而大大降低磁盤存儲和網絡傳輸的負擔,提高效率。
由于本書側重于快速上手和實戰,限于篇幅所限,對于細節性原理的闡述不便面面俱到,如有需要更詳細更透徹地了解,可掃描二維碼來擷取更多文檔資料和視訊講解進行學習。
6.1.4 工程搭建
在eclipse中建立一個工程,并引入用于開發MapReduce程式的jar包;
方式1:使用maven的方式引入jar包
使用maven來引入依賴,可以在工程的pom.xml中,添加如下依賴配置:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.1</version>
</dependency>
方式2:直接使用本地jar包
從hadoop的安裝目錄中,拷貝所有jar包到項目工程中,并添加到buildpath即可,如圖:
6.1.5 Mapper開發
map函數的邏輯:
- 将map task所讀取到的一行資料按照單詞分隔符切分得到一個字元串數組;
- 周遊字元串數組;
- 對周遊到的每一個單詞,通過map task所提供的context來傳回一個<單詞,1>的key-value資料對;
在工程中建立一個類WordCountMapper并繼承Mapper,具體請看示例代碼及代碼注釋:
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* maptask程式在自己的任務範圍資料中(資料切片)
* 每讀取一行資料,就調一次本map()方法,并将讀到的資料以key-value形式作為參數傳給本map()方法
* map()方法按照所需的資料處理邏輯對傳入的資料進行處理,然後通過context傳回給map task程式
*/
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
// key在預設情況下是maptask所讀取到的一行文本的起始偏移量
// value在預設情況下是maptask所讀取到的一行文本的字元内容
// 将文本按空格符号切分成單詞數組
String[] words = value.toString().split(" ");
// 周遊數組中的每一個單詞,将每個單詞變成 (單詞,1)這樣的kv對資料,通過context傳回給maptask
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
6.1.6 Reducer類開發
reduce函數的邏輯:
- Reduce Task會将自己所收到的key-value資料按照key進行排序和分組,相同key的資料會連續排放在一起,key“較小”的資料排在前面,然後,将key相同的key-value資料作為一組來進行聚合處理;
- Reduce Task對每一組資料處理時,都是調用使用者提供的reduce方法,并傳入一個key對象和一個疊代器對象給reduce方法來擷取這一組資料中的每一個key-value資料;
- 在reduce函數中,我們隻需要使用架構所提供的疊代器來疊代reduce task提供的一組key-value資料(具體來說,就是循環反複調用疊代器的next()方法來逐一擷取這組資料中的每一對key-value資料);
- 将每次疊代到的value值進行累加,疊代完成之後,累加值即為某一個key(即某個單詞)的總詞頻;
- 将統計出的這個單詞及其總詞頻,通過reduce task所提供的context進行傳回。
在工程中建立一個類WordCountReducer并繼承Reducer,具體請看示例代碼及注釋:
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* Reduce Task收集到一組相同key的資料,調一次本方法,比如(a,1)(a,1)(a,1)......(a,1)
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count = 0;
// 每疊代一次,疊代器就會取一對key-value資料:
// 取到的key資料重新設定給方法中的參數key
// 取到的value資料則會設定給以下for循環中的臨時變量value
for (IntWritable value : values) {
// 将本次疊代到的value值累加到count變量
count += value.get();
}
// 将這一組資料的聚合結果通過context傳回給reduce task
context.write(key, new IntWritable(count));
}
}
6.1.7 job送出用戶端開發
自定義Mapper類和自定義Reducer類開發好後,還需要開發一個程式,用于設定關于這個mapreduce job的相關參數,并将這些參數及整個程式jar包送出到Yarn上去啟動執行。
在工程中建立一個類WordCountDriver類,并定義一個main方法,具體請看示例代碼及注釋:
public class WordcountDriver {
public static void main(String[] args) throws Exception {
// 需要将大量的跟jar包程式運作相關的參數進行描述
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//job.setJar("/root/wc.jar");
job.setJarByClass(WordcountDriver.class);
// 指定本次job所要用的mapper類和reducer類
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountCombiner.class);
job.setReducerClass(WordCountReducer.class);
// 指定本次job的mapper類和reducer類輸出結果的資料類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 指定本次job讀取源資料時所需要用的輸入元件:源檔案在hdfs的文本檔案中,用TextInputFormat
job.setInputFormatClass(TextInputFormat.class);
// 指定本次job輸出資料時所需要用的輸出元件:輸出到hdfs文本檔案中,用TextOutputFormat
job.setOutputFormatClass(TextOutputFormat.class);
// 指定reduce task運作執行個體的個數
job.setNumReduceTasks(3);
// 指定源資料檔案所在的路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定輸出結果資料檔案所在的路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 核心代碼: 送出jar包給yarn
// job.submit(); // 送出完任務,用戶端就退出了
// 送出完任務,用戶端列印叢集中的運作進度資訊,并等待最終運作狀态
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}