天天看點

MapReduce—第一個WordCount程式

Mapper類

我們自定義MyMapper類并繼承Mapper,

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * KEYIN:輸入資料的k類型,預設偏移量
 * VALUEIN:該行内容,字元串,hadoop.io.Text下的Text
 * KEYOUT:輸出資料的k類型,Text
 * VALUEOUT:輸出資料的v類型,1
 * */
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

	
	protected void map(LongWritable key, Text value, 
			Context context)
			throws IOException, InterruptedException {
		String[] wordsArr = value.toString().split(" ");//按照空格切割value,得到一個數組
		
		//得到數組,開始周遊
		for (int i = 0; i < wordsArr.length; i++) {
			String word = wordsArr[i];//取出數組中的值
			Text outKey = new Text(word);//字元串
			IntWritable outValue = new IntWritable(1);//單詞出現一次,就是1
			context.write(outKey, outValue);//向外輸出
		}
	}

}
           

map輸出資料要經過聚合,調用比較器。優先調用自定義比較器,若未自定義,就調用預設的比較器(按照ASCII值比較)。聚合後的資料value為1 。

分區器不用管,于是此刻資料就是“K-V-P”,之後放到記憶體緩沖區、溢寫。之後調用比較器,鍵K的比較器就是Text。

關于Text的比較,源碼Comparator提供了比較方法,比較ASCII值,使用字典序。

@Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      int n1 = WritableUtils.decodeVIntSize(b1[s1]);
      int n2 = WritableUtils.decodeVIntSize(b2[s2]);
      return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
    }
           

a和b的ASCII分别為65、66,是以a在前,b在後。

Reducer類

自定義MyReducer類繼承Reducer,參數四個類型。上一步Mapper的輸出作為Reducer的輸入。

傳進來的iter就是“假疊代器”。比較鍵(單詞,比較ASCII值)經過比較之後單詞都是一樣的,經過疊代累加後,輸出。

package com.husky.hadoop.wc;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	protected void reduce(Text key, Iterable<IntWritable> iter,
			Context context) throws IOException, InterruptedException {
		int sum = 0;
		for(IntWritable i : iter){
			sum+=i.get();
		}
		context.write(key, new IntWritable(sum));
	}
}
           

用戶端

用戶端先和RM聯系,通過配置檔案,知道叢集内角色在哪。

package com.husky.hadoop.wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MyWC {
	public static void main(String[] args) throws Exception {
		
		用戶端自動讀取配置檔案,并把配置資訊加載到conf對象中
		Configuration conf = new Configuration(true);
		
		//job
		Job job = Job.getInstance(conf);
		FileSystem fs = FileSystem.get(conf);
		
		//必須要配置的,入口類
		job.setJarByClass(MyWC.class);
		//設定job name
		job.setJobName("wc");
		
		//設定Mapper和Reducer
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		
		//設定輸出的K-V類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//設定reduce的數量,預設1
		job.setNumReduceTasks(1);
		
		//設定計算輸入資料,path就是hdfs上的檔案路徑
		FileInputFormat.addInputPath(job, new Path("/user/root/test.txt"));
		//設定計算輸出目錄,最後的計算結果要在這該目錄中
		//該目錄必須不存在,否則計算容易出錯。submitter.submitJobInternal(Job.this, cluster)
		//方法中調用了checkSpecs(job)方法來檢查該目錄是否存在。
		Path outPath = new Path("/output/wc/");
		if (fs.exists(outPath)) { //如果目錄存在就删除
			fs.delete(outPath,true);
		}
		
		FileOutputFormat.setOutputPath(job, outPath);
		
		//開始執行
		boolean f = job.waitForCompletion(true);
		if (f) {
			System.out.println("MapReduce程式執行成功!");
		}

	}

}
           

點選檢視Client送出Job作業相關流程的源碼分析!

執行

把編寫的這幾個類,export成一個jar包,并上傳。執行以下指令:

#最後的參數為包名+類名
hadoop jar ./MyWC.jar com.husky.hadoop.wc.MyWC
           

在執行的過程中,jps會出現YarnChild和MRAppMaster程序。

執行結束,結果檔案就已經按照字典序排好順序了,且會列印出我們的輸出資訊:

MapReduce—第一個WordCount程式

根據為我們提供的插件檢視結果:

MapReduce—第一個WordCount程式

注意

筆者在進行以上操作時,遇見了以下問題:

這裡省略了一萬行報錯資訊...
Note: System times on machines may be out of sync. Check system time and time zones.
           

報錯資訊提示我的叢集時間并不同步,導緻我的task跑不起來。

解決方案:

安裝ntpdate工具

yum -y install ntp ntpdate
           

設定系統時間與網絡時間同步

ntpdate cn.pool.ntp.org
           

最後

WordCount案例是最簡單的入門案例,我們需要和MR整個流程對應起來,這能幫助我們了解掌握Shrffle機制原理。是以,這個小案例,最好能做到手寫并口述流程。

繼續閱讀