天天看點

hadoop臨時檔案 jar包_實戰案例玩轉Hadoop --Map Reduce上手實踐

第6章 Map Reduce上手實踐

導讀

學習一門架構程式設計技術,在了解架構整體功能特性和工作機制後,快速上手的方式就是利用這個架構來寫出屬于自己的第一個程式。

本章就以非常典型且能相當好地诠釋MAP REDUCE特性的詞頻統計為例,以詳盡的步驟,引導讀者成功開發并運作自己的第一個MAP REDUCE分布式資料處理程式。

6.1 Map Reduce入門程式設計案例

6.1.1 案例需求

假設有大量文本檔案,内容樣例如下:

hello jack
hello tom
hello jim
tom and jack are good friends
jim and tom are good friends too
           

需要統計出所有檔案中各單詞出現的總次數,即詞頻統計。

6.1.2 思路設計

回顧

MAP REDUCE程式中,整個資料處理的流程分為map和reduce兩個階段,而這兩個階段中真正工作的程式是Map Task和Reduce Task,這兩個程式已在架構中提供,而使用者在編寫自己的MAP REDUCE程式時,則隻需要為Map Task和Reduce Task提供進行資料處理時的具體邏輯函數,即map()函數和reduce()函數。

初學階段開發mapreduce程式時,可以不用過多考慮mapreduce程式運作時的狀況,隻要抓住mapreduce程式設計模型中的要點即可,即考慮map階段讀取到原始資料後産生出什麼樣的key和value,以便能在reduce階段按key分組聚合運算時能得到需求的結果。

本例邏輯相對簡單,說明如下:

  • map階段:
  1. Map Task會從HDFS的源資料檔案中逐行讀取資料,然後對每一行調用一次使用者自定義的map()方法;
  2. 使用者自定義的map()函數中:
  3. 将每一行資料切分出單詞;
  4. 為每一個單詞構造一個鍵值對(單詞,1);
  5. 将鍵值對傳回給Map Task。
  • reduce階段:
  1. Reduce Task會擷取map階段輸出的單詞鍵值對;
  2. Reduce Task會将key相同(即單詞相同)的鍵值對彙聚成組;然後對每一組kv資料調用一次使用者自定義的reduce()方法來運算處理;
  3. 在使用者自定義的reduce()函數中:
  4. 周遊reduce Task所傳入的一組kv資料,将value值累加求和,即得到每一個單詞的總次數;
  5. 将計算後得到的(單詞,總次數)傳回給reduce task。

wordcount邏輯示意圖:

hadoop臨時檔案 jar包_實戰案例玩轉Hadoop --Map Reduce上手實踐

圖6.1 WordCount資料流轉流程

6.1.3 關鍵技術點解析

我們在開發自定義的Mapper類時,需要定義輸入資料的泛型類型(KEY IN,VALUE IN)。而這些泛型的确定,依賴于輸入資料的類型。

輸入資料的類型決定機制描述如下:

Map Task預設情況下是調用TextInputFormat擷取一個LineRecordReader來讀取檔案,其機制為逐行讀取,每讀到一行資料,就将這一行在檔案中的起始偏移量(類型為long)作為key資料,行的文本内容(類型為String)作為value資料;

map()函數處理後産生的資料也需要組織成一個key和一個value,也需要為此聲明輸出資料的泛型類型(KEY OUT, VALUE OUT);這兩個泛型的聲明,則取決于我們自定義的map()方法将要生成的結果key-value資料的類型;

而mapreduce中,資料經常需要持久化到磁盤檔案,或經網絡連接配接進行傳輸,而不管持久化到檔案還是通過網絡傳輸,key-value資料都需要序列化和反序列化操作。為了降低資料的體積,提高效率,Hadoop開發了一種自己的系列化架構。上述的key-value資料類型也就不能直接采用JDK中提供的原生類型,而是需要使用實作了Hadoop序列化接口的類型,如下:

  1. KEY IN : 行的起始偏移量,原生類型long,對應的Hadoop序列化類型:LongWritable
  2. VALUE IN:行的文本内容,原生類型String,對應的Hadoop序列化類型:Text
  3. KEY OUT: 一個單詞,原生類型String,對應的Hadoop序列化類型:Text
  4. VALUE OUT: 單詞次數1,原生類型int,對應的Hadoop序列化類型:IntWritable
提示

所謂的序列化,就是将記憶體中的java對象結構資料“壓扁”成一個線性的二進制資料序列,反序列化則反之。由jdk中的序列化機制所生成的二進制序列中,不僅包含對象中的資料,還包含很多附加資訊(如類名、繼承關系等),換句話說就是jdk的序列化機制所産生的序列化結果非常臃腫和備援。

在資料量很大且需要頻繁進行磁盤序列化和網絡傳輸的情況下,如果用jdk的原生序列化機制,會導緻資料傳輸和存儲的負擔太大。是以Hadoop開發了一套自己的序列化架構,對象序列化後包含的資訊僅限于對象中的資料内容,進而大大降低磁盤存儲和網絡傳輸的負擔,提高效率。

由于本書側重于快速上手和實戰,限于篇幅所限,對于細節性原理的闡述不便面面俱到,如有需要更詳細更透徹地了解,可掃描二維碼來擷取更多文檔資料和視訊講解進行學習。

6.1.4 工程搭建

在eclipse中建立一個工程,并引入用于開發MapReduce程式的jar包;

方式1:使用maven的方式引入jar包

使用maven來引入依賴,可以在工程的pom.xml中,添加如下依賴配置:

<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-client</artifactId>
		<version>2.8.1</version>
	</dependency>
           

方式2:直接使用本地jar包

從hadoop的安裝目錄中,拷貝所有jar包到項目工程中,并添加到buildpath即可,如圖:

hadoop臨時檔案 jar包_實戰案例玩轉Hadoop --Map Reduce上手實踐
圖6.2 工程結構

6.1.5 Mapper開發

map函數的邏輯:

  1. 将map task所讀取到的一行資料按照單詞分隔符切分得到一個字元串數組;
  2. 周遊字元串數組;
  3. 對周遊到的每一個單詞,通過map task所提供的context來傳回一個<單詞,1>的key-value資料對;

在工程中建立一個類WordCountMapper并繼承Mapper,具體請看示例代碼及代碼注釋:

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 
	/**
	 * maptask程式在自己的任務範圍資料中(資料切片)
	 * 每讀取一行資料,就調一次本map()方法,并将讀到的資料以key-value形式作為參數傳給本map()方法
	 * map()方法按照所需的資料處理邏輯對傳入的資料進行處理,然後通過context傳回給map task程式
	 */
	@Override
 	protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
		// key在預設情況下是maptask所讀取到的一行文本的起始偏移量
		// value在預設情況下是maptask所讀取到的一行文本的字元内容
 
		// 将文本按空格符号切分成單詞數組
		String[] words = value.toString().split(" ");
		// 周遊數組中的每一個單詞,将每個單詞變成  (單詞,1)這樣的kv對資料,通過context傳回給maptask
 		for (String word : words) {
			context.write(new Text(word), new IntWritable(1));
		}
	}
}
           

6.1.6 Reducer類開發

reduce函數的邏輯:

  1. Reduce Task會将自己所收到的key-value資料按照key進行排序和分組,相同key的資料會連續排放在一起,key“較小”的資料排在前面,然後,将key相同的key-value資料作為一組來進行聚合處理;
  2. Reduce Task對每一組資料處理時,都是調用使用者提供的reduce方法,并傳入一個key對象和一個疊代器對象給reduce方法來擷取這一組資料中的每一個key-value資料;
  3. 在reduce函數中,我們隻需要使用架構所提供的疊代器來疊代reduce task提供的一組key-value資料(具體來說,就是循環反複調用疊代器的next()方法來逐一擷取這組資料中的每一對key-value資料);
  4. 将每次疊代到的value值進行累加,疊代完成之後,累加值即為某一個key(即某個單詞)的總詞頻;
  5. 将統計出的這個單詞及其總詞頻,通過reduce task所提供的context進行傳回。

在工程中建立一個類WordCountReducer并繼承Reducer,具體請看示例代碼及注釋:

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
 
	/**
	 * Reduce Task收集到一組相同key的資料,調一次本方法,比如(a,1)(a,1)(a,1)......(a,1)
	 */
	@Override
 	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
 
 		int count  = 0;
 
		// 每疊代一次,疊代器就會取一對key-value資料:
		// 取到的key資料重新設定給方法中的參數key
		// 取到的value資料則會設定給以下for循環中的臨時變量value
 		for (IntWritable value : values) {
			// 将本次疊代到的value值累加到count變量
			count += value.get();
		}
 
		// 将這一組資料的聚合結果通過context傳回給reduce task
		context.write(key, new IntWritable(count));
	}

}
           

6.1.7 job送出用戶端開發

自定義Mapper類和自定義Reducer類開發好後,還需要開發一個程式,用于設定關于這個mapreduce job的相關參數,并将這些參數及整個程式jar包送出到Yarn上去啟動執行。

在工程中建立一個類WordCountDriver類,并定義一個main方法,具體請看示例代碼及注釋:

public class WordcountDriver {
 
	public static void main(String[] args) throws Exception {
		// 需要将大量的跟jar包程式運作相關的參數進行描述
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
 
		//job.setJar("/root/wc.jar");
		job.setJarByClass(WordcountDriver.class);
 
		// 指定本次job所要用的mapper類和reducer類
		job.setMapperClass(WordCountMapper.class);
		job.setCombinerClass(WordCountCombiner.class); 
		job.setReducerClass(WordCountReducer.class);
 
		// 指定本次job的mapper類和reducer類輸出結果的資料類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
 
		// 指定本次job讀取源資料時所需要用的輸入元件:源檔案在hdfs的文本檔案中,用TextInputFormat
		job.setInputFormatClass(TextInputFormat.class);
		// 指定本次job輸出資料時所需要用的輸出元件:輸出到hdfs文本檔案中,用TextOutputFormat
		job.setOutputFormatClass(TextOutputFormat.class);
 
		// 指定reduce task運作執行個體的個數
		job.setNumReduceTasks(3);
 
		// 指定源資料檔案所在的路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		// 指定輸出結果資料檔案所在的路徑
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
		// 核心代碼: 送出jar包給yarn
		// job.submit();  // 送出完任務,用戶端就退出了
		// 送出完任務,用戶端列印叢集中的運作進度資訊,并等待最終運作狀态
 		boolean res = job.waitForCompletion(true);  		
		System.exit(res?0:1); 
 
	}
}
           

繼續閱讀