天天看點

User Defined Hadoop DataType

User Defined Hadoop DataType

目錄

    • User Defined Hadoop DataType
    • 目錄
    • 需求
    • 實作
    • 運作

需求

有時候 Hadoop 内置的資料類型不能滿足我們的要求,這個時候就需要自定義類型了。

假設輸入檔案是很多電話号碼,每行一個:

13612345678
13051812535
13051812535
13912345677
13412345678
           

要求按照如下格式輸出

其中的 China Mobile 和 1,都是算出來的。

實作

需要一個電話号碼類 TelNo,需要實作 WritableComparable 接口。

// TelNo.java
package com.stephen.hadoop;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class TelNo implements WritableComparable<TelNo>{
    private String no;
    private String operator;
    private Integer times;
    private transient final int BEGINPOS = ;
    private transient final int ENDPOS = ;

    public TelNo() {}

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(no);
        out.writeUTF(operator);
        out.writeInt(times);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        no = in.readUTF();
        operator = in.readUTF();
        times = in.readInt();
    }

    @Override
    public int compareTo(TelNo o) {
        return this.no.compareTo(o.getNo());
    }

    public boolean equals(Object o) {
        if( !(o instanceof TelNo)) {
            return false;
        }
        TelNo other = (TelNo) o;
        return this.no.compareTo(other.getNo()) == ; 
    }

    public int hashCode() {
        return no.hashCode();
    }

    public Integer getTimes() {
        return times;
    }

    public void setTimes(Integer times) {
        this.times = times;
    }

    public void setNo(String no) {
        this.no = no;
    }

    public String getNo() {
        return no;
    }

    public String getOperator() {
        String header = no.substring(BEGINPOS, ENDPOS);
        if (header.compareTo("130") >= ) {
            if (header.compareTo("135") <= ) {
                operator = "***China Mobile***";
            } else if (header.compareTo("137") <= ) {
                operator = "***China Unicom***";
            } else if (header.compareTo("139") <= ) {
                operator = "***China Telecom***";
            } else {
                operator = "***Invalid Operator***";
            }
        }
        return operator;
    }

    @Override
    public String toString() {
        return "is subscribed from " + getOperator() + ", appearing " + times + " times";
    }
}
           

MapReduce 實作如下(Partitioner 類沒有使用)

// TelNoCategorizerTool.java
package com.stephen.hadoop;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;

public class TelNoCategorizerTool extends Configured implements Tool {

    public static class TelNoMapper extends
            Mapper<LongWritable, Text, Text, LongWritable> {
        private Text telno = new Text();
        private final static LongWritable one = new LongWritable();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String newkey = value.toString();
            telno.set(newkey);
            context.write(telno, one);
        }
    }

    public static class TelNoReducer extends
            Reducer<Text, LongWritable, Text, TelNo> {

        private TelNo telNo = new TelNo();

        public void reduce(Text key, Iterable<LongWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = ;
            for (LongWritable val : values) {
                sum += val.get();
            }
            telNo.setNo(key.toString());
            telNo.setTimes(sum);
            context.write(key, telNo);
        }
    }

    public static class OperatorPartitioner<K, V> extends Partitioner<K, V> {
        private static final List<String> mobileNumList = new ArrayList<>();
        private static final List<String> unicomNumList = new ArrayList<>();
        private static final List<String> telecomNumList = new ArrayList<>();

        static {
            mobileNumList.add("130");
            mobileNumList.add("131");
            mobileNumList.add("132");
            mobileNumList.add("133");
            mobileNumList.add("134");
            mobileNumList.add("135");

            unicomNumList.add("136");
            unicomNumList.add("137");

            telecomNumList.add("138");
            telecomNumList.add("139");
        }

        @Override
        public int getPartition(K key, V value, int numReduceTasks) {
            String telNoHead = key.toString().substring(, );
            if (mobileNumList.contains(telNoHead)) {
                return ;
            } else if (unicomNumList.contains(telNoHead)) {
                return ;
            } else if (telecomNumList.contains(telNoHead)) {
                return ;
            } else {
                return ;
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();

        Job job = Job.getInstance(conf, "Telno Categorizer");
        job.setJarByClass(TelNoCategorizerTool.class);

        job.setMapperClass(TelNoMapper.class);
        job.setReducerClass(TelNoReducer.class);

        // 隻對 Mapper 生效
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        /**
         * 這兩個方法對 Mapper 和 Reducer 都生效
         * 是以要在上面單獨指定 Mapper 的Key 和 Value 的格式
         * 沒有 setReduceOutputKeyClass...方法
         */
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(TelNo.class);

        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

        job.setPartitionerClass(OperatorPartitioner.class);

        FileInputFormat.addInputPath(job, new Path(args[]));
        FileOutputFormat.setOutputPath(job, new Path(args[]));

        return job.waitForCompletion(true) ?  : ;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Configuration(),
                new TelNoCategorizerTool(), args);
        System.exit(exitCode);
    }
}
           

運作

執行一下:

hadoop jar TelNoCategorizerTool.jar com.stephen.hadoop.TelNoCategorizerTool /user/stephen/input/ /user/stephen/output
           

檢視結果:

hadoop fs -cat /user/stephen/output/part-r-

#output
 is subscribed from ***China Mobile***, appearing  times
 is subscribed from ***China Mobile***, appearing  times
 is subscribed from ***China Unicom***, appearing  times
 is subscribed from ***China Telecom***, appearing  times
           

如果想要分區:

hadoop jar TelNoCategorizerTool.jar com.stephen.hadoop.TelNoCategorizerTool -D mapreduce.job.reduces= /user/stephen/input/ /user/stephen/output
           

能看到 3 個檔案(使用了 LazyOutputFormat,不會輸出空記錄),分别包含了分區後的記錄。

hadoop fs -ls /user/stephen/output/

#output
-rw-r--r--    root supergroup           -- : /user/stephen/output/_SUCCESS
-rw-r--r--    root supergroup         -- : /user/stephen/output/part-r-
-rw-r--r--    root supergroup          -- : /user/stephen/output/part-r-
-rw-r--r--    root supergroup          -- : /user/stephen/output/part-r-
           

3 個檔案的内容合并起來就是之前的 part-r-00000 的内容。

繼續閱讀