天天看點

hadoop之MapReduce---Hadoop序列化序列化概述自定義bean對象實作序列化接口(Writable)序列化案例實操

序列化概述

  1. 什麼是序列化

    序列化就是把記憶體中的對象,轉換成位元組序列(或其他資料傳輸協定)以便于存儲到磁盤(持久化)和網絡傳輸

    反序列化就是将收到位元組序列(或其他資料傳輸協定)或者是磁盤的持久化資料,轉換成記憶體中的對象

  2. 為什麼要序列化

    一般來說,“活的”對象隻生存在記憶體裡,關機斷電就沒有了。而且“活的”對象隻能由本地的程序使用,不能被發送到網絡上的另外一台計算機。 然而序列化可以存儲“活的”對象,可以将“活的”對象發送到遠端計算機。

  3. 為什麼不用Java的序列化

    Java的序列化是一個重量級序列化架構(Serializable),一個對象被序列化後,會附帶很多額外的資訊(各種校驗資訊,Header,繼承體系等),不便于在網絡中高效傳輸。是以,Hadoop自己開發了一套序列化機制(Writable)

  4. Hadoop序列化特點

    1)緊湊 :高效使用存儲空間。

    2)快速:讀寫資料的額外開銷小

    3)可擴充:随着通信協定的更新而可更新

    4)互操作:支援多語言的互動

自定義bean對象實作序列化接口(Writable)

在企業開發中往往常用的基本序列化類型不能滿足所有需求,比如在Hadoop架構内部傳遞一個bean對象,那麼該對象就需要實作序列化接口。

具體實作bean對象序列化步驟如下7步

  1. 必須實作Writable接口
  2. 反序列化時,需要反射調用空參構造函數,是以必須有空參構造
public FlowBean() {
	super();
}
           
  1. 重寫序列化方法
@Override
public void write(DataOutput out) throws IOException {
	out.writeLong(upFlow);
	out.writeLong(downFlow);
	out.writeLong(sumFlow);
}
           
  1. 重寫反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
	upFlow = in.readLong();
	downFlow = in.readLong();
	sumFlow = in.readLong();
}
           
  1. 注意反序列化的順序和序列化的順序完全一緻
  2. 要想把結果顯示在檔案中,需要重寫toString(),可用”\t”分開,友善後續用
  3. 如果需要将自定義的bean放在key中傳輸,則還需要實作Comparable接口,因為MapReduce框中的Shuffle過程要求對key必須能排序。詳見後面排序案例
@Override
public int compareTo(FlowBean o) {
	// 倒序排列,從大到小
	return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
           

序列化案例實操

  1. 需求

    統計每一個手機号耗費的總上行流量、下行流量、總流量

    1)輸入資料

1	13736230513	192.196.100.1	www.liujh.com	2481	24681	200
2	13846544121	192.196.100.2			264	0	200
3 	13956435636	192.196.100.3			132	1512	200
4 	13966251146	192.168.100.1			240	0	404
5 	18271575951	192.168.100.2	www.liujh.com	1527	2106	200
6 	84188413	192.168.100.3	www.liujh.com	4116	1432	200
7 	13590439668	192.168.100.4			1116	954	200
8 	15910133277	192.168.100.5	www.hao123.com	3156	2936	200
9 	13729199489	192.168.100.6			240	0	200
10 	13630577991	192.168.100.7	www.shouhu.com	6960	690	200
11 	15043685818	192.168.100.8	www.baidu.com	3659	3538	200
12 	15959002129	192.168.100.9	www.liujh.com	1938	180	500
13 	13560439638	192.168.100.10			918	4938	200
14 	13470253144	192.168.100.11			180	180	200
15 	13682846555	192.168.100.12	www.qq.com	1938	2910	200
16 	13992314666	192.168.100.13	www.gaga.com	3008	3720	200
17 	13509468723	192.168.100.14	www.qinghua.com	7335	110349	404
18 	18390173782	192.168.100.15	www.sogou.com	9531	2412	200
19 	13975057813	192.168.100.16	www.baidu.com	11058	48243	200
20 	13768778790	192.168.100.17			120	120	200
21 	13568436656	192.168.100.18	www.alibaba.com	2481	24681	200
22 	13568436656	192.168.100.19			1116	954	200
           

2)輸入資料格式:

id 手機号碼 網絡ip 上行流量 下行流量 網絡狀态碼
7 13560436666 120.196.100.99 1116 954 200

3)期望輸出資料格式

手機号碼 上行流量 下行流量 總流量
13560436666 1116 954 2070
  1. 需求分析
    hadoop之MapReduce---Hadoop序列化序列化概述自定義bean對象實作序列化接口(Writable)序列化案例實操
  2. 編寫MapReduce程式

    1)編寫流量統計的Bean對象

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
// 1 實作writable接口
public class FlowBean implements Writable{
	private long upFlow;
	private long downFlow;
	private long sumFlow;
	//2  反序列化時,需要反射調用空參構造函數,是以必須有
	public FlowBean() {
		super();
	}
	public FlowBean(long upFlow, long downFlow) {
		super();
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}
	//3  寫序列化方法
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}
	//4 反序列化方法
	//5 反序列化方法讀順序必須和寫序列化方法的寫順序必須一緻
	@Override
	public void readFields(DataInput in) throws IOException {
		this.upFlow  = in.readLong();
		this.downFlow = in.readLong();
		this.sumFlow = in.readLong();
	}
	// 6 編寫toString方法,友善後續列印到文本
	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}
	public long getUpFlow() {
		return upFlow;
	}
	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}
	public long getDownFlow() {
		return downFlow;
	}
	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}
	public long getSumFlow() {
		return sumFlow;
	}
	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}
}
           

2)編寫Mapper類

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
	FlowBean v = new FlowBean();
	Text k = new Text();
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
		// 1 擷取一行
		String line = value.toString();
		// 2 切割字段
		String[] fields = line.split("\t");
		// 3 封裝對象
		// 取出手機号碼
		String phoneNum = fields[1];
		// 取出上行流量和下行流量
		long upFlow = Long.parseLong(fields[fields.length - 3]);
		long downFlow = Long.parseLong(fields[fields.length - 2]);
		k.set(phoneNum);
		v.set(downFlow, upFlow);
		// 4 寫出
		context.write(k, v);
	}
}
           

3)編寫Reducer類

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
	@Override
	protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {
		long sum_upFlow = 0;
		long sum_downFlow = 0;
		// 1 周遊所用bean,将其中的上行流量,下行流量分别累加
		for (FlowBean flowBean : values) {
			sum_upFlow += flowBean.getUpFlow();
			sum_downFlow += flowBean.getDownFlow();
		}
		// 2 封裝對象
		FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
		// 3 寫出
		context.write(key, resultBean);
	}
}
           

4)編寫Driver驅動類

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowsumDriver {
	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
args = new String[] { "e:/input/inputflow", "e:/output1" };
		// 1 擷取配置資訊,或者job對象執行個體
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);
		// 6 指定本程式的jar包所在的本地路徑
		job.setJarByClass(FlowsumDriver.class);
		// 2 指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);
		// 3 指定mapper輸出資料的kv類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);
		// 4 指定最終輸出的資料的kv類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		// 5 指定job的輸入原始檔案所在目錄
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		// 7 将job中配置的相關參數,以及job所用的java類所在的jar包, 送出給yarn去運作
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}
           
hadoop之MapReduce---Hadoop序列化序列化概述自定義bean對象實作序列化接口(Writable)序列化案例實操

簡書:https://www.jianshu.com/u/0278602aea1d

CSDN:https://blog.csdn.net/u012387141