天天看点

自定义数据类型,处理手机上网数据

数据和代码来源:吴超的7天视频

数据类型如下图所示:

自定义数据类型,处理手机上网数据
<span style="font-size:14px;">1363157985066 1372623050300-FD-07-A4-72-B8:CMCC 120.196.100.82i02.c.aliimg.com 2427 248124681 200
1363157995052 138265441015C-0E-8B-C7-F1-E0:CMCC 120.197.40.44 0 264 0 200
1363157991076 1392643565620-10-7A-28-CC-0A:CMCC 120.196.100.992 4 132 1512 200
1363154400022 139262511065C-0E-8B-8B-B1-50:CMCC 120.197.40.44 0 240 0 200
1363157993044 1821157596194-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99iface.qiyi.com 视频网站15 12 1527 2106 200
1363157995074 841384135C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4122.72.52.12 2016 41161432 200
1363157993055 13560439658C4-17-FE-BA-DE-D9:CMCC 120.196.100.9918 15 1116 954 200
1363157995033 159201332575C-0E-8B-C7-BA-20:CMCC 120.197.40.4sug.so.360.cn 信息安全20 20 3156 2936 200
1363157983019 1371919941968-A1-B7-03-07-B1:CMCC-EASY 120.196.100.824 0 240 0 200
1363157984041 136605779915C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4s19.cnzz.com 站点统计24 9 6960 690 200
1363157973098 150136858585C-0E-8B-C7-F7-90:CMCC 120.197.40.4rank.ie.sogou.com 搜索引擎28 27 3659 3538 200
1363157986029 15989002119E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99www.umeng.com 站点统计3 3 1938 180 200
1363157992093 13560439658C4-17-FE-BA-DE-D9:CMCC 120.196.100.9915 9 918 4938 200
1363157986041 134802531045C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.43 3 180 180 200
1363157984040 136028465655C-0E-8B-8B-B6-00:CMCC 120.197.40.42052.flash2-http.qq.com 综合门户15 12 1938 2910 200
1363157995093 1392231446600-FD-07-A2-EC-BA:CMCC 120.196.100.82img.qfc.cn 1212 30083720 200
1363157982040 135024688235C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99y0.ifengimg.com 综合门户57 102 7335 110349 200
1363157986072 1832017338284-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99input.shouji.sogou.com 搜索引擎21 18 9531 2412 200
1363157990043 1392505741300-1F-64-E1-E6-9A:CMCC 120.196.100.55t3.baidu.com 搜索引擎69 63 11058 48243 200
1363157988072 1376077871000-FD-07-A4-7B-08:CMCC 120.196.100.822 2 120 120 200
1363157985079 1382307000120-7C-8F-70-68-1F:CMCC 120.196.100.996 3 360 180 200
1363157985069 1360021750200-1F-64-E2-E8-B1:CMCC 120.196.100.5518 138 1080 186852 200</span>
           
package mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class KpiApp {

	
	private static final String INPUT_PATH = "hdfs://hadoop:9000/wlan";
	private static final String OUT_PATH = "hdfs://hadoop:9000/out";
	public static void main(String[] args) throws Exception {
		Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
		
		//1.1 指定输入文件路径
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定哪个类用来格式化输入文件
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2 指定自定义的Mapper类
		job.setMapperClass(MyMapper.class);
		//指定输入<k2,v2>的类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(KpiWritable.class);
		
		//1.3 指定分区类
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		
		//1.4 排序、分区
		
		//1.5 合并
		
		//2.2 指定自定义的reduce类
		job.setReducerClass(MyReducer.class);
		//指定输出<k3,v3>的类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//2.3 指定输出到哪里
		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		//设定输出文件的格式化类
		job.setOutputFormatClass(TextOutputFormat.class);
		
		//把代码嫁给JobTracker执行
		job.waitForCompletion(true);
		
	}
	
	static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{

		@Override
		protected void map(LongWritable key, Text value,
				org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String[] splited = value.toString().split("\t");
			String msisdn = splited[1];
			Text k2 = new Text(msisdn);
			KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]);
			context.write(k2,v2);
		}
	}
	static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{

		@Override
		protected void reduce(Text k2, Iterable<KpiWritable> v2s,
				org.apache.hadoop.mapreduce.Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			long upPackNum = 0L;
			long downPackNum = 0L;
			long upPayLoad = 0L;
			long downPayLoad = 0L;
			for (KpiWritable kpiWritable : v2s) {
				upPackNum += kpiWritable.upPackNum;
				downPackNum += kpiWritable.downPackNum;
				upPayLoad += kpiWritable.upPayLoad;
				downPayLoad += kpiWritable.downPayLoad;
			}
			KpiWritable v3 = new KpiWritable(upPackNum+"",downPackNum+"", upPayLoad+"", downPayLoad+"");
			context.write(k2,v3);
		}
	}
}

class  KpiWritable implements Writable{
	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;
	
	public KpiWritable(){}
	
	public KpiWritable(String upPackNum, String downPackNum,  String upPayLoad, String downPayLoad){
		this.upPackNum = Long.parseLong(upPackNum);
		this.downPackNum = Long.parseLong(downPackNum);
		this.upPayLoad = Long.parseLong(upPayLoad);
		this.downPayLoad = Long.parseLong(downPayLoad);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeLong(upPackNum);
		out.writeLong(downPackNum);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
		
	}
	
	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
	}
	
}


           

继续阅读