天天看點

MapReduce的shuffle機制MapReduce的Shuffle階段

https://blog.csdn.net/weixin_43823423/article/details/85986538

1、MapReduce的shuffle機制

1.1、概述

 MapReduce中,mapper階段處理的資料如何傳遞給reduce階段,是MapReduce架構中最關鍵的一個流程,這個流程就叫shuffle.

Shuffle:資料混洗---------(核心機制:資料分區,排序,局部聚合,緩存,拉取,再合并排序)

具體來說,就是将MapTask輸出的處理資料結果,按照Partitioner元件制定的規則分發ReduceTask,并在分發的過程中,對資料按key進行分區和排序

1.2、主要流程

Shuffle是MapReduce處理流程中的一個核心,它的每一個處理步驟是分散在各個Maptask和reducetask節點上完成的,整體來看,分為3個操作:

1、分區partition(如果reduceTask隻有一個或者沒有,那麼partition将不起作用。設定沒設定相當于沒有)

2、Sort根據key排序(MapReduce程式設計中sort是一定會做的,并且隻能按照key排序,當然如果沒有reduce階段,那麼就不會對key排序)

3、Combiner進行局部value的合并(Combiner是可選的元件,作用是為了提高任務的執行效率)

1.3、詳細流程

1、mapTask收集我們map()方法輸出的kv對,放在記憶體緩沖區kvbuffer(環形緩沖區:記憶體中的一種首尾相連的資料結構,kvbuffer包含資料區和索引區)中,在存資料的時候,會調用partitioner進行分區編号的計算,并存入中繼資料中

2、當記憶體緩沖區的資料達到100*0.8時,就會開始溢寫到本地磁盤檔案file.out,可能會溢出多次,則會有多個檔案,相應的緩沖區中的索引區資料溢出為磁盤索引檔案file.out.index

3、在溢寫前,會先根據分區編号排序,相同的分區的資料,排在一起,再根據map的key排序(快排)

4、多個溢寫檔案會被合并成大的溢出檔案(歸并排序)

5、在資料量大的時.候,可以對maptask結果啟用壓縮,将mapreduce.map.ouput.compress設為true,并使用

mapreduce.map.output.compress.codec設定使用的壓縮算法,可以提高資料傳輸到reduce端的效率

6、reduceTask根據自己的分區号,去各個mapTask機器上取相應的結果分區資料

7、reduceTask會取到同一個分區的來自不同mapTask的結果檔案,reduceTask會将這些檔案再進行合并(歸并排序)

8、合并成r大檔案後,shuffle的過程也就結束了,後面進入reduceTask的邏輯運算過程(從檔案中取出一個一個的鍵值對group,調用使用者自定義的reduce()方法)

2、自定義Shuffle過程中的元件

1、自定義輸入

    預設輸入類:TextInputFormat

自定義:   

模仿   org.apache.hadoop.mapreduce.lib.input.LineRecordReader  和org.apache.hadoop.mapreduce.lib.input.TextInputFormat

1、自定義類繼承FileInputFormat

public class MyFileInputFormat extends FileInputFormat<Text, LongWritable>{

    @Override                                            

    public RecordReader<Text, LongWritable> createRecordReader(InputSplit split, TaskAttemptContext context)

            throws IOException, InterruptedException {

        //執行個體化一個

        MyAllFileRecodReader reader = new MyAllFileRecodReader();

        //split參數和context都是架構自動傳入的,把這兩個參數傳給reader進行處理,以便擷取相關資訊

        reader.initialize(split, context);

        return reader;

    }

    @Override

    protected boolean isSplitable(JobContext context, Path filename) {

        return false;

    }

}

2、自定義類實作RecordReader

public class MyFileRecodReader extends RecordReader<Text, LongWritable>{

    //用于存儲檔案系統輸入流

    private FSDataInputStream open = null;

    //儲存檔案長度

    private int fileSplitLength = 0;

    private Text key = new Text();

    private LongWritable value = new LongWritable();

    @Override

    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        //通過InputSplit對象擷取檔案路徑

        FileSplit fileSplit = (FileSplit)split;

        Path path = fileSplit.getPath();

        //擷取檔案長度

        fileSplitLength = (int)fileSplit.getLength();

        //通過context對象擷取到配置檔案資訊,通過配置檔案擷取到一個目前檔案系統

        Configuration configuration = context.getConfiguration();

        FileSystem fs  = FileSystem.get(configuration);

        //擷取檔案系統的一個輸入流

        open = fs.open(path);

    }

    private boolean isRead = false;

    @Override

    public boolean nextKeyValue() throws IOException, InterruptedException {

        //如果沒有讀取過檔案就進入

        if(!isRead){

            //準備一個位元組數組長度為檔案的長度

            byte[] buffer = new byte[fileSplitLength];

            //一次性把真個檔案讀入位元組數組中

            IOUtils.readFully(open, buffer);

            //把讀取到的檔案傳給key

            key.set(buffer, 0, fileSplitLength);

            //設定已讀标記為true

            isRead = true;

            //傳回讀取一個檔案成功标記

            return true;

        }else{

            return false;

        }

    }

    //擷取key的方法

    @Override

    public Text getCurrentKey() throws IOException, InterruptedException {

        return key;

    }

    //擷取目前value值

    @Override

    public LongWritable getCurrentValue() throws IOException, InterruptedException {

        return value;

    }

    @Override

    public float getProgress() throws IOException, InterruptedException {

        //已讀為真傳回1.0,沒有讀傳回0

        return  isRead ? 1.0F : 0F;

    }

    @Override

    public void close() throws IOException {

        //關閉輸入流

        IOUtils.closeQuietly(open);

    }

2、自定義分區

需要:  1、繼承 partitioner

             2、重寫getpartition()方法

             3、在main方法中指定分區類  job.setPartitionclass()

package homework;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

public class Mypartition extends Partitioner<Student, Text> {

    @Override

    public int getPartition(Student key, Text arg1, int arg2) {

        if(key.getType().equals("math")){

            return 0;

        }

        if(key.getType().equals("english")){

            return 1;

        }

        if(key.getType().equals("computer")){

            return 2;

        }else{

            return 3;

        }

    }

}

3、自定義排序

  需要  :    1、實作writableComparable

                   2、重新write()、readFields()、compareTo()方法

package homework;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Student implements WritableComparable<Student> {

    private String type;

    private String name;

    private Double avg;

    public Student() {

        super();

    }

    public Student(String type, String name, Double avg) {

        super();

        this.type = type;

        this.name = name;

        this.avg = avg;

    }

    public String getType() {

        return type;

    }

    public void setType(String type) {

        this.type = type;

    }

    public String getName() {

        return name;

    }

    public void setName(String name) {

        this.name = name;

    }

    public Double getAvg() {

        return avg;

    }

    public void setAvg(Double avg) {

        this.avg = avg;

    }

    @Override

    public String toString() {

        return   type + "\t" + name + "\t" + avg ;

    }

    @Override

    public void readFields(DataInput in) throws IOException {

        this.type=in.readUTF();

        this.name=in.readUTF();

        this.avg=in.readDouble();    

    }

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeUTF(type);

        out.writeUTF(name);

        out.writeDouble(avg);    

    }

    @Override

    public int compareTo(Student o) {

        int temp=o.getType().compareTo(this.getType());

        if(temp==0){

            if(o.getAvg()>this.getAvg()){

                return 1;

            }else if(o.getAvg()<this.getAvg()){

                return -1;

            }else{

                return 0;

            }

        }

        return temp;

    }

}

4、自定義分組

需要  :     1、繼承writableComparable

                  2、重寫compare()方法

                  3、指定分組類  job.setGroupingComparatorClass(MyGroup.class);

                  4、既有分區又有排序的時候,分組字段一定在排序字段中

package homework;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

public class MyGroup extends WritableComparator {

    public MyGroup() {

        super(Student.class,true);

    }

    @Override

    public int compare(WritableComparable a, WritableComparable b) {

        Student aa=(Student)a;

        Student bb=(Student)b;

        return aa.getType().compareTo(bb.getType());

    } 

}

5、自定義輸出

1)模仿 org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

public class MyMultipePathOutputFormat extends FileOutputFormat<Text, NullWritable>{

    @Override

    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {

        //獲得目前的檔案系統傳給自定義的RecordWriter元件

        Configuration configuration = job.getConfiguration();

        FileSystem fs = FileSystem.get(configuration);

        try {

            //傳回一個RecordWriter正在處理輸出資料的元件

            return new MyMutiplePathRecordWriter(fs);

        } catch (Exception e) {

            e.printStackTrace();

        }

        return null;

    }

}

2)繼承RecordWriter 并實作write()方法

public class MyMutiplePathRecordWriter extends RecordWriter<Text, NullWritable>{

    //聲明要輸出的兩個路徑

    private DataOutputStream out_jige;

    private DataOutputStream out_bujige;

    public MyMutiplePathRecordWriter(FileSystem fs) throws Exception {

        //建立系統輸出流

        out_jige = fs.create(new Path("E:\\bigdata\\cs\\jige\\my_output_jige.txt"));

        out_bujige = fs.create(new Path("E:\\bigdata\\cs\\bujige\\my_output_bujige.txt"));

    }

    @Override

    public void write(Text key, NullWritable value) throws IOException, InterruptedException {

        //接受到的key格式為:course + "\t" + name + "\t" + avgScore

        String keyStr = key.toString();

        String[] split = keyStr.split("\t");

        //擷取到平均分字段

        double score = Double.parseDouble(split[2]);

        //沒一行資料加入個換行符

        byte[] bytes = (keyStr + "\n").getBytes();

        //如果平均分大于60就用DataOutputStream寫出到jige目錄

        if(score >= 60){

            out_jige.write(bytes, 0, bytes.length);

        }else{//小于60分的寫道bujige目錄

            out_bujige.write(bytes, 0, bytes.length);

        }

    }

    @Override

    public void close(TaskAttemptContext context) throws IOException, InterruptedException {

        IOUtils.closeQuietly(out_jige);

        IOUtils.closeQuietly(out_bujige);

    }

}

————————————————

版權聲明:本文為CSDN部落客「逆水行舟如何」的原創文章,遵循 CC 4.0 BY-SA 版權協定,轉載請附上原文出處連結及本聲明。

原文連結:https://blog.csdn.net/weixin_43823423/article/details/85986538

MapReduce的Shuffle階段

https://wenku.baidu.com/view/db239bbd6aec0975f46527d3240c844769eaa0db.html

繼續閱讀