天天看點

Hadoop-mapreduce案例-兩表join

訂單資料表t_order:

id date pid amount
1001 20150710 P0001 2
1002 20150710 P0001 3
1002 20150710 P0002 3

抽象成檔案資料:

1001,20150710,P0001,2

1002,20150710,P0001,3

1003,20150710,P0002,3

1002,20150710,P0003,3

1002,20150710,P0002,4

商品資訊表t_product:

id pname category_id price
P0001 小米5 1000 2
P0002 錘子T1 1000 3

抽象成檔案資料:

p0001,小米,1000,2

p0002,魅族,1001,3

p0003,oppo,1002,3

1、需求:

假如資料量巨大,兩表的資料是以檔案的形式存儲在HDFS中,需要用mapreduce程式來實作一下SQL查詢運算:

select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
           

2、實作機制:

通過将關聯的條件作為map輸出的key,将兩表滿足join條件的資料并攜帶資料所來源的檔案資訊,發往同一個reduce task,在reduce中進行資料的串聯

3、代碼實作:

自定義bean:按關聯後的資料格式設定字段

public class InfoBean implements Writable{

	private int order_id;
	private String dateString;
	private String p_id;
	private int amount;
	private String pname;
	private int category_id;
	private float price;
	private String flag;//新加字段屬性,0代表訂單表,1代表産品表
		
	public InfoBean() {}
	public void set(int order_id, String dateString, String p_id, int amount,
			String pname, int category_id, float price,String flag) {
		this.order_id = order_id;
		this.dateString = dateString;
		this.p_id = p_id;
		this.amount = amount;
		this.pname = pname;
		this.category_id = category_id;
		this.price = price;
		this.flag=flag;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(order_id);
		out.writeUTF(dateString);
		out.writeUTF(p_id);
		out.writeInt(amount);
		out.writeUTF(pname);
		out.writeInt(category_id);
		out.writeFloat(price);
		out.writeUTF(flag);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		this.order_id=in.readInt();
		this.dateString=in.readUTF();
		this.p_id=in.readUTF();
		this.amount=in.readInt();
		this.pname=in.readUTF();
		this.category_id=in.readInt();
		this.price=in.readFloat();
		this.flag=in.readUTF();
	}

	@Override
	public String toString() {
		return "order_id=" + order_id + ", dateString=" + dateString
				+ ", p_id=" + p_id + ", amount=" + amount + ", pname=" + pname
				+ ", category_id=" + category_id + ", price=" + price
				+ ", flag=" + flag;
	}
	//get,set方法略
           

MapReduce代碼

package join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TestJoin {

	static class JoinMapper extends Mapper{
		InfoBean bean=new InfoBean();
		Text k=new Text();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
             String line = value.toString();
			//擷取檔案切片
			FileSplit inputSplit = (FileSplit) context.getInputSplit();
			//擷取檔案名
		    String name = inputSplit.getPath().getName();
			//通過檔案名字判斷是那種資料
		    String p_id=null;
			if(name.startsWith("order")){
				String[] fields = line.split(",");
			    p_id=fields[2];
			    //後面4各參數訂單表裡沒有,就用預設值
				bean.set(Integer.parseInt(fields[0]), fields[1],p_id , Integer.parseInt(fields[3]), "", 0, 0, "0");
			}else if(name.startsWith("product")){
				String[] fields = line.split(",");
			    p_id=fields[0];
			    //産品表裡沒有的用預設值
				bean.set(0, "", p_id, 0, fields[1], Integer.parseInt(fields[2]), Float.parseFloat(fields[3]), "1");
			}
			//每讀一行,把p_id設為key,bean設為value進行輸出
			k.set(p_id);
			context.write(k, bean);
		}
	}
	static class JoinReducer extends Reducer{
		
		@Override
		//< pid, {pbean,obean1,obean2,...}>傳進來的每條資料樣式,value中1個産品,可以多個訂單
		protected void reduce(Text pid, Iterable beans,Context context)
				throws IOException, InterruptedException {
			InfoBean pbean=new InfoBean();//裝産品
			List obeans=new ArrayList();//裝訂單
			for(InfoBean bean:beans){
				//1代表産品
				if("1".equals(bean.getFlag())){
					try {
						BeanUtils.copyProperties(pbean, bean);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}else{
					InfoBean obean = new InfoBean();
					try {
						BeanUtils.copyProperties(obean, bean);
					} catch (Exception e) {
						e.printStackTrace();
					} 
					obeans.add(obean);
				}
				//拼接資料,訂單中把産品資料補全
				for(InfoBean b:obeans){
					b.setPname(pbean.getPname());
					b.setCategory_id(pbean.getCategory_id());
					b.setPrice(pbean.getPrice());
					context.write(b, NullWritable.get());
				}
			}
		}
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJarByClass(TestJoin.class);
		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(InfoBean.class);
		
		job.setOutputKeyClass(InfoBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);	
	}
}
           

原文:https://www.2cto.com/net/201710/689450.html

繼續閱讀