天天看点

Join中数据倾斜问题解决

Join中数据倾斜问题解决

问题描述:

就是在一个reducer任务中累加的数量过大,而在另一个reducer任务累加的数量较少,这样就造成了数据倾斜

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-F2hcOloM-1632316508589)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918215036327.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GEvmJbm6-1632316508591)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918215625320.png)]

如上图,一个产品对应多个订单,但是假设iPhoneX卖得非常的好,而iPhone8P销量寥寥,那么我们在使用 mapreduce做数据分析的时候,我们的某个reducetask就会压力山大,而某些reducetask就很闲。 (这是把两个表放在一起进行处理)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Uroipon6-1632316508592)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918215734214.png)]

解决方案:

1、我们之所以产生数据倾斜,是因为我们使用reducetask这一个阶段来拼接pid相同的product和order,所以我们在reducetask才会产生数据倾斜

2、如果我们在maptask就能将product和order都join起 来,那么不需要reducetask就不会产生倾斜了

3、所以我们如果可以在map阶段就获取到产品的全表,那么读取到order表 就能够直接进行join了

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5Um2dtTT-1632316508593)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918221055288.png)]

分析:

遇到的问题

1、我们如何将product放在maptask?

2、我们并不确定maptask在哪

3、我们存放文件应该存放在maptask的job目录下,但是job目录是maptask启动之后才创建的。

4、把product拷贝到maptask下是一个难题

解决方案:

使用分布式缓存(Distributed Cache)在存储数据,然后maptask都从分布式缓存中读取,这样子就没有maptask不知道在哪里的问题以及redis的问题了。(也就是把两个表分开处理)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cbCdPvzI-1632316508594)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918224052864.png)]

通过查看源码了解到,map最终是运行在run方法中的,而run方法其实是线程运行的方法,再继续观察发现,在调用map方法之前会调用一次setup方法
    
           

*****简单的说处理这个问题就是:

(这里的主要目的就是把两张表提前进行连接,这样久不会产生数据倾斜问题)

1、建立一个集合存放商品信息表,(当然了在创建之前需要创建对象)

然后把相应的信息加入到集合中,如果没有值久设为空

2、在通过map进行

再次把信息加入到集合中,然后在输出,不用reduce阶段的聚合

这里主要是对集合的使用

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class BeanInfo implements WritableComparable<BeanInfo> {
    //正常是实现Writable接口的,但是Writable是用作value的
    //WritableComparable是把这个对象看做key来用

    private int orderId;//订单id
    private String date;//订单时间
    private String pid;//商品id
    private int amount;//购买数量
    private String pname;//商品名称
    private  int category_id;//商品分类id
    private double price;//商品价格

    //创建构造方法

    public BeanInfo() {
    }

    public void set(int orderId, String date, String pid, int amount, String pname, int category_id, double price) {
        this.orderId = orderId;
        this.date = date;
        this.pid = pid;
        this.amount = amount;
        this.pname = pname;
        this.category_id = category_id;
        this.price = price;
    }

    //创建get、set方法
    public int getOrderId() {
        return orderId;
    }

    public void setOrderId(int orderId) {
        this.orderId = orderId;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public int getCategory_id() {
        return category_id;
    }

    public void setCategory_id(int category_id) {
        this.category_id = category_id;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    //实现这个接口,需要重写compareTo放方法
    //在这里的这个主要作用是判断两个表的id号是否相同
    //如果相同则返回0
    @Override
    public int compareTo(BeanInfo o) {
        return this.category_id = o.category_id;
    }

    //这个是实现这个接口的序列化
    //序列化是将对象的字段信息写入输出流
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.orderId);
        //如果是字符串,要实现序列化需要使用writeUTF
        out.writeUTF(this.date);
        out.writeUTF(this.pid);
        out.writeInt(this.amount);
        out.writeUTF(this.pname);
        out.writeInt(this.category_id);
        out.writeDouble(this.price);
    }
    //这个实现这个接口的反序列化
    //从输入流中读取各字段的信息
    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readInt();
        this.date = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.category_id = in.readInt();
        this.price = in.readDouble();
    }
    //实现toString()方法
    @Override
    public String toString() {
        return  orderId +" "+ date +" "+ pid +" "+ amount +" "+ pname +" "+ category_id +" "+ price;
    }
}
           
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class JoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    //创建一个集合,把订单表信息先行加入到集合中,如果没有该值则设为空
    List<BeanInfo> beanInfoList = new ArrayList<>();
    private Text k = new Text();
    //读取product表中的数据
    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
       //1、控制读取数据
        //这里是把数据放到idea中,通过反射进行读取
        BufferedReader br = new BufferedReader(new InputStreamReader(this.getClass().getClassLoader().getResourceAsStream("product.txt")));
        //2、遍历这个文件中的数据
        String line = null;
        while ((line = br.readLine())!= null){
            //实例化BeanInfo对象
            BeanInfo beanInfo = new BeanInfo();
            String[] words = line.split(",");
            //对对象进行赋值
            beanInfo.set(0,"",words[0],0,words[1],Integer.parseInt(words[2]),Double.parseDouble(words[3]));
            beanInfoList.add(beanInfo);
        }
        //3、释放
        br.close();
    }

    //读取order信息,读取订单表信息
    @Override
    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(",");
        int orderId = Integer.parseInt(words[0]);
        String date = words[1];
        String pid = words[2];
        int amount = Integer.parseInt(words[3]);
        //迭代product集合,用于比较集合中pid和order中的pid
        Iterator<BeanInfo> iterator = beanInfoList.iterator();
        while (iterator.hasNext()){
            BeanInfo next = iterator.next();
            //获取pid,通过pid可以把数据加到相应的集合中数据里
            String pid1 = next.getPid();
            if (pid1.equals(pid)){
                next.setOrderId(orderId);
                next.setDate(date);
                next.setAmount(amount);
                k.set(next.toString());
                context.write(k,NullWritable.get());
            }

        }
    }
}