天天看點

Shuffle階段:partition分區以及自定義使用注意事項

 一:partition分區(shuffle階段)

Mapreduce中會将map輸出的kv對,按

照相同key分組

然後分發給不同的reducetask(是以這也決定了為什麼最終的檔案個數,即分區個數跟reducetask數量一樣了。)

。預設分區是根據key的hashCode對reduceTasks個數取模得到的。使用者沒法控制哪個key存儲到哪個分區。預設系統的patitioner類,實作類時hashpatitioner.

預設的分發規則為:根據key的hashcode%reducetask數來分發

是以:如果要按照我們自己的需求進行分組,則需要改寫資料分發(分組)元件Partitioner

自定義一個CustomPartitioner繼承抽象類:Partitioner

然後在job對象中,設定自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)

1)預設partition分區:要從0開始,否則報錯

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */

  public int getPartition(K key, V value, int numReduceTasks) {

  return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

//一般都是分區個數自動設定,通過上面的計算結果

  }

}

注意:

重寫的mypatition方法繼承了抽象類patitioner。然後在getPartition方法中重新定義分區的條件,用什麼進行分區。預設hashpatitioner傳入的分區個數是等于reducetask的個數的,是以傳入的分區的個數等于reducetask個數。  

      自定義的分區則是我們想怎麼定義怎麼定義,然後再根據我們的分區的條件産生的分區個數進行設定reducetask個數。而且要求設定reducetask最好要等于分區個數

 是以如果我們無論怎麼定義分區,而reducetASK的任務始終設定成為1個,則沒有任何意義,因為最終的分區檔案還是由reducetask的個數決定的。

2.自定義分區

1.自定義類繼承Partitioner,泛型聲明對誰排序。重寫getPartition()方法

public

class

 ProvincePartitioner 

extends

Partitioner

<Text, FlowBean> {

      //

key是要分區的字段,value分區要鍵入的值,KV也是MAP輸出的key,value.

     //return 傳回的結果值就是分區号

       @Override

int

 getPartition(Text key, FlowBean value, 

 numPartitions) {

//

這裡的

numpationns

就是分區的個數,也就是對應的

reduce

要開啟的

     Reducetask

的個數。

// 1 

擷取電話号碼的前三位

              String preNum = key.toString().substring(0, 3);

              //

手動設定分區個數

 partition = 4;

              // 2 

判斷是哪個省

if

 ("136".equals(preNum)) {

                     partition = 0;

              }

else

 ("137".equals(preNum)) {

                     partition = 1;

 ("138".equals(preNum)) {

                     partition = 2;

 ("139".equals(preNum)) {

                     partition = 3;

return

 partition;

       }

2

job

驅動中,設定自定義

partitioner

      job.setPartitionerClass(CustomPartitioner.class)

3)

自定義

partition

後,要根據自定義

的邏輯設定相應數量的

reduce task

     job.setNumReduceTasks(5);

如果

reduceTask

的數量

>= getPartition

的結果數,則會多産生幾個空的輸出檔案

part-r-000xx

1<reduceTask

<getPartition

的結果數,則有一部分分區資料無處安放,會

Exception

=1

,則不管

mapTask

端輸出多少個分區檔案,最終結果都交給這一個

,最終也就隻會産生一個結果檔案

 part-r-00000

;(預設也是hashpatitioner分區,隻是最終分區到同一檔案裡了,看不出來)

Patitionner

分區實際是通過設定

reducetask

的數量來産生檔案的數量。而分區

是通過該方法的傳回值确定的

例如:假設自定義分區數為

5

,則

1

job.setNumReduceTasks(1);

會正常運作,隻不過會産生一個輸出檔案

job.setNumReduceTasks(2);

會報錯

3

job.setNumReduceTasks(6);

大于

,程式會正常運作,會産生空檔案

定義的

patitions 

必須從

開始,是以如果定義了

patition = 4.

則,

NumReduceTaskSy=5

3.分區partition案例

把單詞按照首字母ASCII碼奇偶分區(Partitioner)

package com.robot

.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

public class WordCountPartitioner extends Partitioner<Text, IntWritable>{

       public int getPartition(Text key, IntWritable value, int numPartitions) {

//注意 ,如果在定義的類中使用HashPartition進行分區,要重寫hashcode方法

              // 1 擷取單詞key

              String firWord = key.toString().substring(0, 1);

       //直接用下面這一句也可以,自動類型轉換

          //char  firWord = key.toString().charAt(0);

        //轉換成ASCII碼

              char[] charArray = firWord.toCharArray();

              int result = charArray[0];

              // 2 根據奇數偶數分區

              if (result % 2 == 0) {

                     return 0;

              }else {

                     return 1;

2)在驅動中配置加載分區,設定reducetask個數

              job.setPartitionerClass(WordCountPartitioner.class);

              job.setNumReduceTasks(2);

繼續閱讀