天天看點

MapReduce 中的 Partitioner

package flow.partitioner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import flow.pojo.Flow;

public class FlowPartitionerMR {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(FlowPartitionerMR.class);
		
		job.setMapperClass(FlowPartitionerMRMapper.class);
		job.setMapOutputKeyClass(Flow.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setReducerClass(FlowPartitionerMRReducer.class);
		job.setOutputKeyClass(Flow.class);
		job.setOutputValueClass(NullWritable.class);
		
		/**
		 * 在MapReduce程式設計模型中,Partitioner的預設實作是HashPartitioner.如果
		 * 不能滿足我們的需求,按照HashPartitioner的實作方式自定義一個Partitioner元件即可
		 */
		job.setPartitionerClass(ProvincePartitioner.class);
	
//		job.setPartitionerClass(HashPartitioner.class); 
		
		
		job.setNumReduceTasks(9);
		
		Path inputPath = new Path(args[0]);
		Path outputPath = new Path(args[1]);
		FileInputFormat.setInputPaths(job, inputPath);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath,true);
		}
		FileOutputFormat.setOutputPath(job,outputPath);
		
		boolean isDone = job.waitForCompletion(true);
		System.exit(isDone ? 0 : 1);
		
	}
	
	/**
	 * Mapper階段的業務邏輯
	 *
	 */
	private static class FlowPartitionerMRMapper extends Mapper<LongWritable, Text, Flow, NullWritable>{
		Flow flow = new Flow();
		/**
		 * 13480253104	2494800	2494800	4989600
		 * 
		 */

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Flow, NullWritable>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split("\t");
			String phone = split[0];
			long upFlow = Long.parseLong(split[1]);
			long downFlow = Long.parseLong(split[2]);
			long sumFlow = Long.parseLong(split[3]);
			
			/**
			 * map方法每調用一次,那麼就給flow對象中的對應屬性重新設定值,達到減少JVM垃圾回收的目的
			 */
			flow.setPhone(phone);
			flow.setUpFlow(upFlow);
			flow.setDownFlow(downFlow);
			flow.setSumFlow(sumFlow);
		
			/**
			 * 目前這個做法,是讓整個MapTask公用一個Flow對象。為什麼可以這麼用?
			 * 
			 * 序列化的工作機制
			 * 每次通過context.write(key,value)其實是把key和value當中的屬性值已經序列化了到其他的比如流或者記憶體或者磁盤檔案
			 * 
			 */
			context.write(flow, NullWritable.get());
			
		}
		
	}
	
	/**
	 * Reducer階段的業務邏輯
	 */
	private static class FlowPartitionerMRReducer extends Reducer<Flow, NullWritable, Flow, NullWritable>{

		@Override
		protected void reduce(Flow key, Iterable<NullWritable> values, Context context) 
				throws IOException, InterruptedException {

			/**
			 * 如果ReducerTask什麼邏輯都不用做,僅僅隻是作原樣輸出:
			 * 
			 * 兩鐘實作方式:
			 * 
			 * 1、在目前的reduce方法中,直接周遊原樣輸出
			 */
			for(NullWritable value : values){
				context.write(key, value);
			}
			
			/**
			 * 2、直接不用定義Reducer,直接使用預設實作
			 */

		}
		
		
		
	}
	
	

}
           

按照省分區:

package flow.partitioner;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

import flow.pojo.Flow;
/**
 * Partitioner的key-value的類型就是 Mapper元件的輸出的key-value的類型
 * 
 * Combiner的輸入key-value的類型也是Mapper元件的輸出key-value的類型
 * 
 * 假如Combiner的執行在Partitioner之前。
 * Partitioner的輸入的key-vlaue的類型就應該是Combiner的輸出key-value類型
 * 
 * Partitoner的執行時在Combiner之前。
 *
 */

public class ProvincePartitioner  extends Partitioner<Flow, NullWritable> {

	/**
	 * 作用:就是用來決定輸入的參數key-value到底應該進入到哪個ReducerTask
	 * 
	 * 目前自定義的Partitioner的邏輯:
	 * 	按照手機歸屬地的不同,把所有使用者的流量彙總資訊輸出到不同的結果檔案中
	 * 	如果要實作這個業務,必須依賴一張使用者的手機号和歸屬地的字典表
	 * 
	 * 	這種類似的字典表: 存儲在MySQL當中
	 * 	
	 * 	咱們做模拟實作
	 * 	假設: 134開頭的所有使用者都是北京的
	 * 		 135開頭的所有使用者都是上海的
	 * 		 ...
	 *
	 */
	@Override
	public int getPartition(Flow key, NullWritable value, int numPartitions) {
		
		return getProvinceNumber(key);
	}
	
	private static int getProvinceNumber(Flow key){
		String phone = key.getPhone();
		String phonePrefix = phone.substring(0, 3);
		
		if(phonePrefix.equals("134")){
			return 0;
		}else if (phonePrefix.equals("135")){
			return 1;
		}else if(phonePrefix.equals("136")){
			return 2;
		}else if(phonePrefix.equals("137")){
			return 3;
		}else if(phonePrefix.equals("138")){
			return 4;
		}else{
			return 5;
		}
	}
	
	

}
           

繼續閱讀