天天看點

Hadoop-分布式離線計算架構執行個體

作者:臭豬比

WordCount執行個體

主程式

package com.lzz.hadoop.mapreduce.wordcount;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class MRRunJob {
    public static void main1(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        //namenode入口ip
        conf.set("fs.defaultFS", "hdfs://192.168.211.110:8020");
        conf.set("hadoop.tmp.dir","D:\\tmp\\hadoop-70452");
        Job job = Job.getInstance(conf, "mywc");;
        //主方法
        job.setJarByClass(MRRunJob.class);
        //map方法名
        job.setMapperClass(WordCountMapper.class);
        //reducer方法名
        job.setReducerClass(WordCountReducer.class);
        //map輸出的key類型
        job.setOutputKeyClass(Text.class);
        //map輸出的value類型
        job.setOutputValueClass(IntWritable.class);
        //讀取的檔案位置
        FileInputFormat.addInputPath(job, new Path("/wcinput/"));
        //處理完之後資料存放位置,注意輸出的檔案夾如果已經存在會報錯
        FileOutputFormat.setOutputPath(job, new Path("/wcoutput/"));
        boolean f = job.waitForCompletion(true);
    }
}


           

Mapper

package com.lzz.hadoop.mapreduce.wordcount;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //用空格進行分詞
        String[] str = value.toString().split(" ");
        //for循環輸出
        for (int i = 0; i < str.length; i++) {
            //new Tex,new IntWritable進行可序列化
            context.write(new Text(str[i]), new IntWritable(1));
        }
    }
}
           

Reducer

package com.lzz.hadoop.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//mapper切分後是a 1 b 1 a 1 c 1 的形式,輸出a 2 b 1 c 1形式
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    //資料分組合并輸出
    @Override
    protected void reduce(Text test, Iterable<IntWritable> arg1, Context arg2)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable i : arg1) {
            sum = sum + i.get();
        }
        arg2.write(test, new IntWritable(sum));
    }
}
           

排序統計用例

主程式

package com.lzz.hadoop.mapreduce.airsort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class RunJob {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        //基本配置 namenode入口 IP
        conf.set("fs.defaultFS", "hdfs://192.168.211.110:8020");
        conf.set("mapred.jar", "/soft/hadoop-0.0.1-SNAPSHOT.jar");

        FileSystem fs = FileSystem.get(conf);

        Job job = Job.getInstance(conf, "weather");

        //主方法
        job.setJarByClass(RunJob.class);

        //Map方法名
        job.setMapperClass(MyMapper.class);

        //InputFormat方法名
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        //Reducer方法名
        job.setReducerClass(MyReducer.class);

        //Partitioner方法名
        job.setPartitionerClass(MyPartitioner.class);

        //SortComparator方法名
        job.setSortComparatorClass(MySort.class);

        //GroupingComparator方法名
        job.setGroupingComparatorClass(MyGroup.class);

        //Reducer Text的數量
        job.setNumReduceTasks(3);

        //Map輸出的Key類型
        job.setOutputKeyClass(MyKey.class);

        //Map輸出的Value類型
        job.setOutputValueClass(Text.class);

        //讀取檔案的位置
        FileInputFormat.addInputPath(job, new Path("/wcinput"));

        //處理完之後資料存放位置,注意輸出的檔案夾如果已經存在會删除
        Path path = new Path("/wcoutput");

        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job, path);

        boolean f = job.waitForCompletion(true);

        System.out.println("f:" + f);

    }

}

           

Mapper

package com.lzz.hadoop.mapreduce.airsort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//輸入的key和value都是text類型,将溫度和年月分割後,輸出的是封裝後的MyKey,溫度仍為我們想看到的text類型
public class MyMapper extends Mapper<Text, Text, MyKey, Text> {
    @Override
    protected void map(Text key, Text value, Context context)
            throws IOException, InterruptedException {

        //年月日通過-進行分割提取
        String[] strArray = key.toString().split("-");
        //将年月溫度封裝到myKey中
        MyKey myKey = new MyKey();
        myKey.setYear(Integer.parseInt(strArray[0]));
        myKey.setMonth(Integer.parseInt(strArray[1]));
        myKey.setAir(Double.parseDouble(value.toString()));
        //”\t”相當于Tab鍵
        context.write(myKey, new Text(key.toString() + "\t" + value));
    }
}

           

Reducer

package com.lzz.hadoop.mapreduce.airsort;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<MyKey, Text, NullWritable, Text> {
    @Override                      //取出前三條
    protected void reduce(MyKey arg0, Iterable<Text> arg1, Context ctx)
            throws IOException, InterruptedException {
        int sum = 0; //計數器
        for (Text t : arg1) {
            sum++;
            //如果大于3
            if (sum > 3) {
                //跳出
                break;
            } else {
                //寫入記錄
                ctx.write(NullWritable.get(), t);
            }
        }
    }
}
           

Group分組

package com.lzz.hadoop.mapreduce.airsort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

//1949-10-29 23:29:12 39
public class MyGroup extends WritableComparator {
    //繼承WritableComparator類來實作排序
    public MyGroup() {
        super(MyKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        MyKey myKey1 = (MyKey) a;
        MyKey myKey2 = (MyKey) b;
        //年做對比,如果在同一年中就傳回所在月份,不在同一年就傳回比較結果
        int r1 = Integer.compare(myKey1.getYear(), myKey2.getYear());
        //如果年相等
        if (r1 == 0) {
            return Integer.compare(myKey1.getMonth(), myKey2.getMonth());
        }
        return r1;
    }
}

           

Partitioner

package com.lzz.hadoop.mapreduce.airsort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<MyKey, Text> {
    @Override
    //已知最小年份是1949,計算分區的位置,numbers為reduce的數量
    public int getPartition(MyKey key, Text value, int numPartitions) {
        //假如1949,1950,1951這三年
        //1949-1949=0; 得到第一個分區
        //1950-1949=1; 得到第二個分區
        //1951-1949=2; 得到第三個分區
        return (key.getYear() - 1949) % numPartitions;
    }
}

           

Sort

package com.lzz.hadoop.mapreduce.airsort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MySort extends WritableComparator {
    //使用super調用序列化的構造函數
    public MySort() {
        super(MyKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //MyKey進行排序處理分組合并
        MyKey myKey1 = (MyKey) a;
        MyKey myKey2 = (MyKey) b;
        //年做對比
        int r1 = Integer.compare(myKey1.getYear(), myKey2.getYear());
        //如果年相等就比較月,年不等就傳回年比較結果
        if (r1 == 0) {
            //月做對比
            int r2 = Integer.compare(myKey1.getMonth(), myKey2.getMonth());
            //如果月相等就溫度排序,月不相等就傳回月比較結果
            if (r2 == 0) {
                //月相等就把溫度按倒序排列,加-,表示倒序排列
                return -Double.compare(myKey1.getAir(), myKey2.getAir());
            }
            return r2;
        }
        return r1;
    }
}

           

Model

package com.lzz.hadoop.mapreduce.airsort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@SuppressWarnings("rawtypes")
public class MyKey implements WritableComparable {  //可序列化;可比較
    private int year; //年
    private int month; //月
    private double air; //溫度

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public double getAir() {
        return air;
    }

    public void setAir(double air) {
        this.air = air;
    }

    //@Override//序列化
    public void write(DataOutput out) throws IOException {
        //通過write方法寫入序列化的資料流
        out.writeInt(year);
        out.writeInt(month);
        out.writeDouble(air);
    }

    //@Override//反序列化
    public void readFields(DataInput in) throws IOException {
        //通過readFields方法從序列化的資料流中讀出進行指派
        year = in.readInt();
        month = in.readInt();
        air = in.readDouble();
    }

    //@Override
//按照字典順序進行比較,傳回的值是一個int型
    public int compareTo(Object o) {
        return this == o ? 0 : -1;
    }
}

           

用例

Hadoop-分布式離線計算架構執行個體

結果

Hadoop-分布式離線計算架構執行個體
Hadoop-分布式離線計算架構執行個體

繼續閱讀