一:partition分區(shuffle階段)
Mapreduce中會将map輸出的kv對,按
照相同key分組
,
然後分發給不同的reducetask(是以這也決定了為什麼最終的檔案個數,即分區個數跟reducetask數量一樣了。)
。預設分區是根據key的hashCode對reduceTasks個數取模得到的。使用者沒法控制哪個key存儲到哪個分區。預設系統的patitioner類,實作類時hashpatitioner.
預設的分發規則為:根據key的hashcode%reducetask數來分發
是以:如果要按照我們自己的需求進行分組,則需要改寫資料分發(分組)元件Partitioner
自定義一個CustomPartitioner繼承抽象類:Partitioner
然後在job對象中,設定自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)
1)預設partition分區:要從0開始,否則報錯
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
//一般都是分區個數自動設定,通過上面的計算結果
}
}
注意:
重寫的mypatition方法繼承了抽象類patitioner。然後在getPartition方法中重新定義分區的條件,用什麼進行分區。預設hashpatitioner傳入的分區個數是等于reducetask的個數的,是以傳入的分區的個數等于reducetask個數。
自定義的分區則是我們想怎麼定義怎麼定義,然後再根據我們的分區的條件産生的分區個數進行設定reducetask個數。而且要求設定reducetask最好要等于分區個數
。
是以如果我們無論怎麼定義分區,而reducetASK的任務始終設定成為1個,則沒有任何意義,因為最終的分區檔案還是由reducetask的個數決定的。
2.自定義分區
1.自定義類繼承Partitioner,泛型聲明對誰排序。重寫getPartition()方法
public
class
ProvincePartitioner
extends
Partitioner
<Text, FlowBean> {
//
key是要分區的字段,value分區要鍵入的值,KV也是MAP輸出的key,value.
//return 傳回的結果值就是分區号
@Override
int
getPartition(Text key, FlowBean value,
numPartitions) {
//
這裡的
numpationns
就是分區的個數,也就是對應的
reduce
要開啟的
Reducetask
的個數。
// 1
擷取電話号碼的前三位
String preNum = key.toString().substring(0, 3);
//
手動設定分區個數
partition = 4;
// 2
判斷是哪個省
if
("136".equals(preNum)) {
partition = 0;
}
else
("137".equals(preNum)) {
partition = 1;
("138".equals(preNum)) {
partition = 2;
("139".equals(preNum)) {
partition = 3;
return
partition;
}
2
)
在
job
驅動中,設定自定義
partitioner
:
job.setPartitionerClass(CustomPartitioner.class)
3)
自定義
partition
後,要根據自定義
的邏輯設定相應數量的
reduce task
job.setNumReduceTasks(5);
如果
reduceTask
的數量
>= getPartition
的結果數,則會多産生幾個空的輸出檔案
part-r-000xx
;
1<reduceTask
<getPartition
的結果數,則有一部分分區資料無處安放,會
Exception
=1
,則不管
mapTask
端輸出多少個分區檔案,最終結果都交給這一個
,最終也就隻會産生一個結果檔案
part-r-00000
;(預設也是hashpatitioner分區,隻是最終分區到同一檔案裡了,看不出來)
Patitionner
分區實際是通過設定
reducetask
的數量來産生檔案的數量。而分區
是通過該方法的傳回值确定的
例如:假設自定義分區數為
5
,則
(
1
job.setNumReduceTasks(1);
會正常運作,隻不過會産生一個輸出檔案
job.setNumReduceTasks(2);
會報錯
3
job.setNumReduceTasks(6);
大于
,程式會正常運作,會産生空檔案
定義的
patitions
必須從
開始,是以如果定義了
patition = 4.
則,
NumReduceTaskSy=5
3.分區partition案例
把單詞按照首字母ASCII碼奇偶分區(Partitioner)
package com.robot
.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordCountPartitioner extends Partitioner<Text, IntWritable>{
public int getPartition(Text key, IntWritable value, int numPartitions) {
//注意 ,如果在定義的類中使用HashPartition進行分區,要重寫hashcode方法
// 1 擷取單詞key
String firWord = key.toString().substring(0, 1);
//直接用下面這一句也可以,自動類型轉換
//char firWord = key.toString().charAt(0);
//轉換成ASCII碼
char[] charArray = firWord.toCharArray();
int result = charArray[0];
// 2 根據奇數偶數分區
if (result % 2 == 0) {
return 0;
}else {
return 1;
2)在驅動中配置加載分區,設定reducetask個數
job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);