天天看點

hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce

一、MapReduce概述

Hadoop MapReduce 是一個分布式計算架構,用于編寫批處理應用程式。編寫好的程式可以送出到 Hadoop 叢集上用于并行處理大規模的資料集。

MapReduce 作業通過将輸入的資料集拆分為獨立的塊,這些塊由

map

以并行的方式處理,架構對

map

的輸出進行排序,然後輸入到

reduce

中。MapReduce 架構專門用于

<key,value>

鍵值對處理,它将作業的輸入視為一組

<key,value>

對,并生成一組

<key,value>

對作為輸出。輸出和輸出的

key

value

都必須實作Writable 接口。

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
           

二、MapReduce程式設計模型簡述

這裡以詞頻統計為例進行說明,MapReduce 處理的流程如下:

hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce
  1. input : 讀取文本檔案;
  2. splitting : 将檔案按照行進行拆分,此時得到的

    K1

    行數,

    V1

    表示對應行的文本内容;
  3. mapping : 并行将每一行按照空格進行拆分,拆分得到的

    List(K2,V2)

    ,其中

    K2

    代表每一個單詞,由于是做詞頻統計,是以

    V2

    的值為 1,代表出現 1 次;
  4. shuffling :由于

    Mapping

    操作可能是在不同的機器上并行處理的,是以需要通過

    shuffling

    将相同

    key

    值的資料分發到同一個節點上去合并,這樣才能統計出最終的結果,此時得到

    K2

    為每一個單詞,

    List(V2)

    為可疊代集合,

    V2

    就是 Mapping 中的 V2;
  5. Reducing : 這裡的案例是統計單詞出現的總次數,是以

    Reducing

    List(V2)

    進行歸約求和操作,最終輸出。

MapReduce 程式設計模型中

splitting

shuffing

操作都是由架構實作的,需要我們自己程式設計實作的隻有

mapping

reducing

,這也就是 MapReduce 這個稱呼的來源。

三、combiner & partitioner

hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce

3.1 InputFormat & RecordReaders

InputFormat

将輸出檔案拆分為多個

InputSplit

,并由

RecordReaders

InputSplit

轉換為标準的鍵值對,作為 map 的輸出。這一步的意義在于隻有先進行邏輯拆分并轉為标準的鍵值對格式後,才能為多個

map

提供輸入,以便進行并行處理。

3.2 Combiner

combiner

map

運算後的可選操作,它實際上是一個本地化的

reduce

操作,它主要是在

map

計算出中間檔案後做一個簡單的合并重複

key

值的操作。這裡以詞頻統計為例:

map

在遇到一個 hadoop 的單詞時就會記錄為 1,但是這篇文章裡 hadoop 可能會出現 n 多次,那麼

map

輸出檔案備援就會很多,是以在

reduce

計算前對相同的 key 做一個合并操作,那麼需要傳輸的資料量就會減少,傳輸效率就可以得到提升。

但并非所有場景都适合使用

combiner

,使用它的原則是

combiner

的輸出不會影響到

reduce

計算的最終輸入,例如:求總數,最大值,最小值時都可以使用

combiner

,但是做平均值計算則不能使用

combiner

不使用 combiner 的情況:

hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce

使用 combiner 的情況:

hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce

可以看到使用 combiner 的時候,需要傳輸到 reducer 中的資料由 12keys,降低到 10keys。降低的幅度取決于你 keys 的重複率,下文詞頻統計案例會示範用 combiner 降低數百倍的傳輸量。

3.3 Partitioner

partitioner

可以了解成分類器,将

map

的輸出按照 key 值的不同分别分給對應的

reducer

,支援自定義實作,下文案例會給出示範。

四、MapReduce詞頻統計案例

4.1 項目簡介

這裡給出一個經典的詞頻統計的案例:統計如下樣本資料中每個單詞出現的次數。

Spark   HBase
Hive    Flink   Storm   Hadoop  HBase   Spark
Flink
HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
Hadoop  Spark   HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
           

為友善大家開發,我在項目源碼中放置了一個工具類

WordCountDataUtils

,用于模拟産生詞頻統計的樣本,生成的檔案支援輸出到本地或者直接寫到 HDFS 上。

項目完整源碼下載下傳位址:hadoop-word-count

4.2 項目依賴

想要進行 MapReduce 程式設計,需要導入

hadoop-client

依賴:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
</dependency>
           

4.3 WordCountMapper

将每行資料按照指定分隔符進行拆分。這裡需要注意在 MapReduce 中必須使用 Hadoop 定義的類型,因為 Hadoop 預定義的類型都是可序列化,可比較的,所有類型均實作了

WritableComparable

接口。

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, 
                                                                      InterruptedException {
        String[] words = value.toString().split("t");
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }

}
           

WordCountMapper

對應下圖的 Mapping 操作:

hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce

WordCountMapper

繼承自

Mappe

類,這是一個泛型類,定義如下:

WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
   ......
}
           
  • KEYIN :

    mapping

    輸入 key 的類型,即每行的偏移量 (每行第一個字元在整個文本中的位置),

    Long

    類型,對應 Hadoop 中的

    LongWritable

    類型;
  • VALUEIN :

    mapping

    輸入 value 的類型,即每行資料;

    String

    類型,對應 Hadoop 中

    Text

    類型;
  • KEYOUT

    mapping

    輸出的 key 的類型,即每個單詞;

    String

    類型,對應 Hadoop 中

    Text

    類型;
  • VALUEOUT

    mapping

    輸出 value 的類型,即每個單詞出現的次數;這裡用

    int

    類型,對應

    IntWritable

    類型。

4.4 WordCountReducer

在 Reduce 中進行單詞出現次數的統計:

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 
                                                                                  InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}
           

如下圖,

shuffling

的輸出是 reduce 的輸入。這裡的 key 是每個單詞,values 是一個可疊代的資料類型,類似

(1,1,1,...)

hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce

4.4 WordCountApp

組裝 MapReduce 作業,并送出到伺服器運作,代碼如下:

/**
 * 組裝作業 并送出到叢集運作
 */
public class WordCountApp {


    // 這裡為了直覺顯示參數 使用了寫死,實際開發中可以通過外部傳參
    private static final String HDFS_URL = "hdfs://192.168.0.107:8020";
    private static final String HADOOP_USER_NAME = "root";

    public static void main(String[] args) throws Exception {

        //  檔案輸入路徑和輸出路徑由外部傳參指定
        if (args.length < 2) {
            System.out.println("Input and output paths are necessary!");
            return;
        }

        // 需要指明 hadoop 使用者名,否則在 HDFS 上建立目錄時可能會抛出權限不足的異常
        System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

        Configuration configuration = new Configuration();
        // 指明 HDFS 的位址
        configuration.set("fs.defaultFS", HDFS_URL);

        // 建立一個 Job
        Job job = Job.getInstance(configuration);

        // 設定運作的主類
        job.setJarByClass(WordCountApp.class);

        // 設定 Mapper 和 Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 設定 Mapper 輸出 key 和 value 的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 設定 Reducer 輸出 key 和 value 的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 如果輸出目錄已經存在,則必須先删除,否則重複運作程式時會抛出異常
        FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
        Path outputPath = new Path(args[1]);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        // 設定作業輸入檔案和輸出檔案的路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);

        // 将作業送出到群集并等待它完成,參數設定為 true 代表列印顯示對應的進度
        boolean result = job.waitForCompletion(true);

        // 關閉之前建立的 fileSystem
        fileSystem.close();

        // 根據作業結果,終止目前運作的 Java 虛拟機,退出程式
        System.exit(result ? 0 : -1);

    }
}
           

需要注意的是:如果不設定

Mapper

操作的輸出類型,則程式預設它和

Reducer

操作輸出的類型相同。

4.5 送出到伺服器運作

在實際開發中,可以在本機配置 hadoop 開發環境,直接在 IDE 中啟動進行測試。這裡主要介紹一下打包送出到伺服器運作。由于本項目沒有使用除 Hadoop 外的第三方依賴,直接打包即可:

# mvn clean package
           

使用以下指令送出作業:

hadoop jar /usr/appjar/hadoop-word-count-1.0.jar 
com.heibaiying.WordCountApp 
/wordcount/input.txt /wordcount/output/WordCountApp
           

作業完成後檢視 HDFS 上生成目錄:

# 檢視目錄
hadoop fs -ls /wordcount/output/WordCountApp

# 檢視統計結果
hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000
           
hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce

五、詞頻統計案例進階之Combiner

5.1 代碼實作

想要使用

combiner

功能隻要在組裝作業時,添加下面一行代碼即可:

// 設定 Combiner
job.setCombinerClass(WordCountReducer.class);
           

5.2 執行結果

加入

combiner

後統計結果是不會有變化的,但是可以從列印的日志看出

combiner

的效果:

沒有加入

combiner

的列印日志:

hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce

加入

combiner

後的列印日志如下:

hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce

這裡我們隻有一個輸入檔案并且小于 128M,是以隻有一個 Map 進行處理。可以看到經過 combiner 後,records 由

3519

降低為

6

(樣本中單詞種類就隻有 6 種),在這個用例中 combiner 就能極大地降低需要傳輸的資料量。

六、詞頻統計案例進階之Partitioner

6.1 預設的Partitioner

這裡假設有個需求:将不同單詞的統計結果輸出到不同檔案。這種需求實際上比較常見,比如統計産品的銷量時,需要将結果按照産品種類進行拆分。要實作這個功能,就需要用到自定義

Partitioner

這裡先介紹下 MapReduce 預設的分類規則:在建構 job 時候,如果不指定,預設的使用的是

HashPartitioner

:對 key 值進行哈希散列并對

numReduceTasks

取餘。其實作如下:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}
           

6.2 自定義Partitioner

這裡我們繼承

Partitioner

自定義分類規則,這裡按照單詞進行分類:

public class CustomPartitioner extends Partitioner<Text, IntWritable> {

    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
        return WordCountDataUtils.WORD_LIST.indexOf(text.toString());
    }
}
           

在建構

job

時候指定使用我們自己的分類規則,并設定

reduce

的個數:

// 設定自定義分區規則
job.setPartitionerClass(CustomPartitioner.class);
// 設定 reduce 個數
job.setNumReduceTasks(WordCountDataUtils.WORD_LIST.size());
           

6.3 執行結果

執行結果如下,分别生成 6 個檔案,每個檔案中為對應單詞的統計結果:

hadoop應用開發技術..._Hadoop 系列(三)—— 分布式計算架構 MapReduce

參考資料

  1. 分布式計算架構 MapReduce
  2. Apache Hadoop 2.9.2 > MapReduce Tutorial
  3. MapReduce - Combiners

http://weixin.qq.com/r/ZENheb7EnuPhrfoa9xZ2 (二維碼自動識别)

源碼GitHub位址:https://github.com/heibaiying/spring-samples-for-all

繼續閱讀