天天看點

MapReduce之分區器(Partitioner)

Partitactioner

 Partitioner 元件可以對 MapTask後的資料按Key進行分區,進而将不同分區的Key交由不同的Reduce處理。這個也是我們經常會用到的功能。

1.使用場景

 比如上個案例中我們統計出來了每個使用者的流量資料,那麼我們接下來想把統計的使用者資料根據不同的手機号輸出到不同的檔案中,那麼這時使用分區器就非常合适了。

2.HashPartitioner

 在一般的 MapReduce 過程中,我們知道可以通過 job.setNumReduceTasks(N) 來建立多個 ReducerTask 進行處理任務。可是這種情況下,系統會調用預設的Partitioner也就是 HashPartitioner來對Map的 key 進行分區。進入 Hadoop 的源碼,可以看到 HashPartitioner 的實作其實很簡單。如下:

public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    // key的hash值與integer的最大值取與然後對ReduceTask的個數取餘
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}      

 hash的好處是可以很key的分布更加随機,但是這樣會将一些不同的key放在同一個分區中,這并不是我們所期望的。

3.自定義Partitioner

 面對HashPartitioner所具有的局限,我們可以通過自定義Partitioner來解決,如下:

3.1 實作自定義分區器

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 自定義分區器
 * @author 波波烤鴨
 *
 */
public class CustomPartitioner extends Partitioner< Text, Flow>{
    
    private static Map<String, Integer> map = new HashMap<>();
    
    // 此處我們将資料寫死,實際開發中我們應該從對應的資料源中擷取資料然後存儲在緩存中(Redis)
    static{
        map.put("138", 0);
        map.put("139", 1);
        map.put("158", 2);
        map.put("159", 3);
    }

    /**
     * 根據key擷取對應的分區号
     * @param key 就是用的手機号碼
     * @param value 統計的使用者的資訊
     */
    @Override
    public int getPartition(Text key, Flow value, int numPartitions) {
        // 擷取手機号碼的前3位 138
        String prefix = key.toString().substring(0, 3);
        return map.containsKey(prefix)?map.get(prefix):4;
    }
}      

3.2 啟動類設定

MapReduce之分區器(Partitioner)
public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration(true);
        conf.set("mapreduce.framework.name", "local");
        // 輸出到HDFS檔案系統中
        // conf.set("fs.defaultFS", "hdfs://hadoop-node01:9000");
        // 輸出到本地檔案系統
        conf.set("fs.defaultFS", "file:///");
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(FlowTest.class);
        
        // 設定ReduceTask的個數
        job.setNumReduceTasks(5);
        // 設定自定義的分區器
        job.setPartitionerClass(CustomPartitioner.class);
        
        // 指定本job要使用的map/reduce的工具類
        job.setMapperClass(MyMapTask.class);
        job.setReducerClass(MyReduceTask.class);
        
        // 指定mapper輸出kv的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Flow.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Flow.class);
        
        // 指定job的原始檔案輸入目錄
        // 6.設定輸出輸出類
        FileInputFormat.setInputPaths(job, new Path("c:/tools/bigdata/mr/flow/input/"));
        FileOutputFormat.setOutputPath(job, new Path("c:/tools/bigdata/mr/flow/output/"));
                
        //将job中配置的相關參數,以及job所用的jar包送出給yarn運作
        //job.submit();  waitForCompletion等待執行完成
        boolean flag = job.waitForCompletion(true);
        System.exit(flag?0:1);

    }      

 MapTask和ReduceTask的代碼内容不需要改變,可以參考上篇内容。

MapReduce之分區器(Partitioner)

Ok ~ partitioner的作用就是用來對Map之後的資料做分區處理操作!

繼續閱讀