一、MapReduce 概述
1.1 MapReduce定義
MapReduce 是一個分布式運算程式中的程式設計架構, 是使用者開發“基于 Hadoop 的資料分析應用”的核心架構.
MapReduce 核心功能是将使用者編寫的業務邏輯代碼和自帶預設元件整合成一個完整的分布式0運算程式,并發運作在一個 Hadoop 叢集上。
1.2 MapReduce 優缺點
優點
- 易于程式設計
它簡單的實作一些借口,就可以完成一個分布式程式,這個分布式程式可以分不到大量廉價的 PC 機器上運作。也就是說你寫一個分布式程式嗎,跟寫 一個簡單的串行程式是一樣的,就是因為這個特點使用 MapReduce 程式設計很流行。
- 良好的擴充性
當你的計算資源不能滿足的時候,可以通過簡單的增加機器來擴充它的計算能力。
- 高容錯性
MapReduce 設計的初衷就是使程式能夠部署在廉價的 PC 機器上,這就要求它具有很高的容錯性。比如一台機器挂了,它可以把上面的計算任務轉移到另外一個節點上運作,不至于這個任務運作失敗,而且這個過程不需要人工的參與,而完全是由 Hadoop 内部完成的。
- 适合PB級以上海量資料的離線處理
可以實作千台伺服器叢集開發工作,提供資料處理能力。
二、WordCount 案例
2.1 hello.txt
hello world
hello OK
are you ok
I am ok
hello OK
2.2 代碼實作
package com.kangna.mapreducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/********************************
* @Author: kangna
* @Date: 2020/1/25 11:14
* @Version: 1.0
* @Desc:
********************************/
public class WordCountMain {
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
private IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 取到一行個資料
String line = value.toString();
// 按照空格切分
String[] words = line.split(" ");
// 周遊資料
for (String word : words) {
this.word.set(word);
context.write(this.word, this.one);
}
}
}
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable > {
private IntWritable total = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 作累加
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
// 包裝 結構并輸出
total.set(sum);
context.write(key, total);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 擷取一個 Job 執行個體
Job job = Job.getInstance(new Configuration());
// 2. 設定 類的路徑
job.setJarByClass(WordCountMain.class);
// 3. 設定 Mapper 和 Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4. 設定 Mapper 和 Reducer 的輸出類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 5. 設定輸入輸出資料
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6. 送出Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
參數的設定
檢視輸出的檔案結果
打包在叢集中也是可以運作的。