https://blog.csdn.net/weixin_43823423/article/details/85986538
1、MapReduce的shuffle機制
1.1、概述
MapReduce中,mapper階段處理的資料如何傳遞給reduce階段,是MapReduce架構中最關鍵的一個流程,這個流程就叫shuffle.
Shuffle:資料混洗---------(核心機制:資料分區,排序,局部聚合,緩存,拉取,再合并排序)
具體來說,就是将MapTask輸出的處理資料結果,按照Partitioner元件制定的規則分發ReduceTask,并在分發的過程中,對資料按key進行分區和排序
1.2、主要流程
Shuffle是MapReduce處理流程中的一個核心,它的每一個處理步驟是分散在各個Maptask和reducetask節點上完成的,整體來看,分為3個操作:
1、分區partition(如果reduceTask隻有一個或者沒有,那麼partition将不起作用。設定沒設定相當于沒有)
2、Sort根據key排序(MapReduce程式設計中sort是一定會做的,并且隻能按照key排序,當然如果沒有reduce階段,那麼就不會對key排序)
3、Combiner進行局部value的合并(Combiner是可選的元件,作用是為了提高任務的執行效率)
1.3、詳細流程
1、mapTask收集我們map()方法輸出的kv對,放在記憶體緩沖區kvbuffer(環形緩沖區:記憶體中的一種首尾相連的資料結構,kvbuffer包含資料區和索引區)中,在存資料的時候,會調用partitioner進行分區編号的計算,并存入中繼資料中
2、當記憶體緩沖區的資料達到100*0.8時,就會開始溢寫到本地磁盤檔案file.out,可能會溢出多次,則會有多個檔案,相應的緩沖區中的索引區資料溢出為磁盤索引檔案file.out.index
3、在溢寫前,會先根據分區編号排序,相同的分區的資料,排在一起,再根據map的key排序(快排)
4、多個溢寫檔案會被合并成大的溢出檔案(歸并排序)
5、在資料量大的時.候,可以對maptask結果啟用壓縮,将mapreduce.map.ouput.compress設為true,并使用
mapreduce.map.output.compress.codec設定使用的壓縮算法,可以提高資料傳輸到reduce端的效率
6、reduceTask根據自己的分區号,去各個mapTask機器上取相應的結果分區資料
7、reduceTask會取到同一個分區的來自不同mapTask的結果檔案,reduceTask會将這些檔案再進行合并(歸并排序)
8、合并成r大檔案後,shuffle的過程也就結束了,後面進入reduceTask的邏輯運算過程(從檔案中取出一個一個的鍵值對group,調用使用者自定義的reduce()方法)
2、自定義Shuffle過程中的元件
1、自定義輸入
預設輸入類:TextInputFormat
自定義:
模仿 org.apache.hadoop.mapreduce.lib.input.LineRecordReader 和org.apache.hadoop.mapreduce.lib.input.TextInputFormat
1、自定義類繼承FileInputFormat
public class MyFileInputFormat extends FileInputFormat<Text, LongWritable>{
@Override
public RecordReader<Text, LongWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
//執行個體化一個
MyAllFileRecodReader reader = new MyAllFileRecodReader();
//split參數和context都是架構自動傳入的,把這兩個參數傳給reader進行處理,以便擷取相關資訊
reader.initialize(split, context);
return reader;
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
2、自定義類實作RecordReader
public class MyFileRecodReader extends RecordReader<Text, LongWritable>{
//用于存儲檔案系統輸入流
private FSDataInputStream open = null;
//儲存檔案長度
private int fileSplitLength = 0;
private Text key = new Text();
private LongWritable value = new LongWritable();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//通過InputSplit對象擷取檔案路徑
FileSplit fileSplit = (FileSplit)split;
Path path = fileSplit.getPath();
//擷取檔案長度
fileSplitLength = (int)fileSplit.getLength();
//通過context對象擷取到配置檔案資訊,通過配置檔案擷取到一個目前檔案系統
Configuration configuration = context.getConfiguration();
FileSystem fs = FileSystem.get(configuration);
//擷取檔案系統的一個輸入流
open = fs.open(path);
}
private boolean isRead = false;
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//如果沒有讀取過檔案就進入
if(!isRead){
//準備一個位元組數組長度為檔案的長度
byte[] buffer = new byte[fileSplitLength];
//一次性把真個檔案讀入位元組數組中
IOUtils.readFully(open, buffer);
//把讀取到的檔案傳給key
key.set(buffer, 0, fileSplitLength);
//設定已讀标記為true
isRead = true;
//傳回讀取一個檔案成功标記
return true;
}else{
return false;
}
}
//擷取key的方法
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
//擷取目前value值
@Override
public LongWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
//已讀為真傳回1.0,沒有讀傳回0
return isRead ? 1.0F : 0F;
}
@Override
public void close() throws IOException {
//關閉輸入流
IOUtils.closeQuietly(open);
}
2、自定義分區
需要: 1、繼承 partitioner
2、重寫getpartition()方法
3、在main方法中指定分區類 job.setPartitionclass()
package homework;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class Mypartition extends Partitioner<Student, Text> {
@Override
public int getPartition(Student key, Text arg1, int arg2) {
if(key.getType().equals("math")){
return 0;
}
if(key.getType().equals("english")){
return 1;
}
if(key.getType().equals("computer")){
return 2;
}else{
return 3;
}
}
}
3、自定義排序
需要 : 1、實作writableComparable
2、重新write()、readFields()、compareTo()方法
package homework;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Student implements WritableComparable<Student> {
private String type;
private String name;
private Double avg;
public Student() {
super();
}
public Student(String type, String name, Double avg) {
super();
this.type = type;
this.name = name;
this.avg = avg;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Double getAvg() {
return avg;
}
public void setAvg(Double avg) {
this.avg = avg;
}
@Override
public String toString() {
return type + "\t" + name + "\t" + avg ;
}
@Override
public void readFields(DataInput in) throws IOException {
this.type=in.readUTF();
this.name=in.readUTF();
this.avg=in.readDouble();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(type);
out.writeUTF(name);
out.writeDouble(avg);
}
@Override
public int compareTo(Student o) {
int temp=o.getType().compareTo(this.getType());
if(temp==0){
if(o.getAvg()>this.getAvg()){
return 1;
}else if(o.getAvg()<this.getAvg()){
return -1;
}else{
return 0;
}
}
return temp;
}
}
4、自定義分組
需要 : 1、繼承writableComparable
2、重寫compare()方法
3、指定分組類 job.setGroupingComparatorClass(MyGroup.class);
4、既有分區又有排序的時候,分組字段一定在排序字段中
package homework;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroup extends WritableComparator {
public MyGroup() {
super(Student.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Student aa=(Student)a;
Student bb=(Student)b;
return aa.getType().compareTo(bb.getType());
}
}
5、自定義輸出
1)模仿 org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
public class MyMultipePathOutputFormat extends FileOutputFormat<Text, NullWritable>{
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
//獲得目前的檔案系統傳給自定義的RecordWriter元件
Configuration configuration = job.getConfiguration();
FileSystem fs = FileSystem.get(configuration);
try {
//傳回一個RecordWriter正在處理輸出資料的元件
return new MyMutiplePathRecordWriter(fs);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
2)繼承RecordWriter 并實作write()方法
public class MyMutiplePathRecordWriter extends RecordWriter<Text, NullWritable>{
//聲明要輸出的兩個路徑
private DataOutputStream out_jige;
private DataOutputStream out_bujige;
public MyMutiplePathRecordWriter(FileSystem fs) throws Exception {
//建立系統輸出流
out_jige = fs.create(new Path("E:\\bigdata\\cs\\jige\\my_output_jige.txt"));
out_bujige = fs.create(new Path("E:\\bigdata\\cs\\bujige\\my_output_bujige.txt"));
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
//接受到的key格式為:course + "\t" + name + "\t" + avgScore
String keyStr = key.toString();
String[] split = keyStr.split("\t");
//擷取到平均分字段
double score = Double.parseDouble(split[2]);
//沒一行資料加入個換行符
byte[] bytes = (keyStr + "\n").getBytes();
//如果平均分大于60就用DataOutputStream寫出到jige目錄
if(score >= 60){
out_jige.write(bytes, 0, bytes.length);
}else{//小于60分的寫道bujige目錄
out_bujige.write(bytes, 0, bytes.length);
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeQuietly(out_jige);
IOUtils.closeQuietly(out_bujige);
}
}
————————————————
版權聲明:本文為CSDN部落客「逆水行舟如何」的原創文章,遵循 CC 4.0 BY-SA 版權協定,轉載請附上原文出處連結及本聲明。
原文連結:https://blog.csdn.net/weixin_43823423/article/details/85986538
MapReduce的Shuffle階段
https://wenku.baidu.com/view/db239bbd6aec0975f46527d3240c844769eaa0db.html