Mapper類
我們自定義MyMapper類并繼承Mapper,
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* KEYIN:輸入資料的k類型,預設偏移量
* VALUEIN:該行内容,字元串,hadoop.io.Text下的Text
* KEYOUT:輸出資料的k類型,Text
* VALUEOUT:輸出資料的v類型,1
* */
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
String[] wordsArr = value.toString().split(" ");//按照空格切割value,得到一個數組
//得到數組,開始周遊
for (int i = 0; i < wordsArr.length; i++) {
String word = wordsArr[i];//取出數組中的值
Text outKey = new Text(word);//字元串
IntWritable outValue = new IntWritable(1);//單詞出現一次,就是1
context.write(outKey, outValue);//向外輸出
}
}
}
map輸出資料要經過聚合,調用比較器。優先調用自定義比較器,若未自定義,就調用預設的比較器(按照ASCII值比較)。聚合後的資料value為1 。
分區器不用管,于是此刻資料就是“K-V-P”,之後放到記憶體緩沖區、溢寫。之後調用比較器,鍵K的比較器就是Text。
關于Text的比較,源碼Comparator提供了比較方法,比較ASCII值,使用字典序。
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
}
a和b的ASCII分别為65、66,是以a在前,b在後。
Reducer類
自定義MyReducer類繼承Reducer,參數四個類型。上一步Mapper的輸出作為Reducer的輸入。
傳進來的iter就是“假疊代器”。比較鍵(單詞,比較ASCII值)經過比較之後單詞都是一樣的,經過疊代累加後,輸出。
package com.husky.hadoop.wc;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> iter,
Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable i : iter){
sum+=i.get();
}
context.write(key, new IntWritable(sum));
}
}
用戶端
用戶端先和RM聯系,通過配置檔案,知道叢集内角色在哪。
package com.husky.hadoop.wc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MyWC {
public static void main(String[] args) throws Exception {
用戶端自動讀取配置檔案,并把配置資訊加載到conf對象中
Configuration conf = new Configuration(true);
//job
Job job = Job.getInstance(conf);
FileSystem fs = FileSystem.get(conf);
//必須要配置的,入口類
job.setJarByClass(MyWC.class);
//設定job name
job.setJobName("wc");
//設定Mapper和Reducer
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//設定輸出的K-V類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定reduce的數量,預設1
job.setNumReduceTasks(1);
//設定計算輸入資料,path就是hdfs上的檔案路徑
FileInputFormat.addInputPath(job, new Path("/user/root/test.txt"));
//設定計算輸出目錄,最後的計算結果要在這該目錄中
//該目錄必須不存在,否則計算容易出錯。submitter.submitJobInternal(Job.this, cluster)
//方法中調用了checkSpecs(job)方法來檢查該目錄是否存在。
Path outPath = new Path("/output/wc/");
if (fs.exists(outPath)) { //如果目錄存在就删除
fs.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job, outPath);
//開始執行
boolean f = job.waitForCompletion(true);
if (f) {
System.out.println("MapReduce程式執行成功!");
}
}
}
點選檢視Client送出Job作業相關流程的源碼分析!
執行
把編寫的這幾個類,export成一個jar包,并上傳。執行以下指令:
#最後的參數為包名+類名
hadoop jar ./MyWC.jar com.husky.hadoop.wc.MyWC
在執行的過程中,jps會出現YarnChild和MRAppMaster程序。
執行結束,結果檔案就已經按照字典序排好順序了,且會列印出我們的輸出資訊:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLxkERNVzaq1kMNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL1QzN5EzMyMTM0EjNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
根據為我們提供的插件檢視結果:
注意
筆者在進行以上操作時,遇見了以下問題:
這裡省略了一萬行報錯資訊...
Note: System times on machines may be out of sync. Check system time and time zones.
報錯資訊提示我的叢集時間并不同步,導緻我的task跑不起來。
解決方案:
安裝ntpdate工具
yum -y install ntp ntpdate
設定系統時間與網絡時間同步
ntpdate cn.pool.ntp.org
最後
WordCount案例是最簡單的入門案例,我們需要和MR整個流程對應起來,這能幫助我們了解掌握Shrffle機制原理。是以,這個小案例,最好能做到手寫并口述流程。