天天看點

Hadoop上編寫mr計算

導言:使用java編寫map reduce程式,Map Reduce是包含兩個過程:Map過程和Reduce過程。每一個過程都包含鍵值對作為輸入,程式員可以選擇鍵和值的類型。Map和Reduce的資料流是這樣的:

Input==>Map==>Map Output==>sort and shuffle==>Reduce==>Final Output
           

使用Java編寫Hadoop Map Reduce代碼Map Reduce程式需要三個元素:Map, Reduce和運作任務的代碼(在這裡,我們把它叫做Invoker)。

1.建立Map(可以以任何名稱)類和map函數,map函數在org.apache.hadoop.mapreduce.Mapper.class類中,以抽象方法定義。

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
import java.io.IOException;
 
public class Map extends Mapper<LongWritable, Text, Text,IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
        word.set(value.toString());
        context.write(word, one);
    }
}
           

說明:Mapper類是一個泛型類,帶有4個參數(輸入的鍵,輸入的值,輸出的鍵,輸出的值)。在這裡輸入的鍵為LongWritable(hadoop中的Long類型),輸入的值為Text(hadoop中的String類型),輸出的鍵為Text(關鍵字)和輸出的值為Intwritable(hadoop中的int類型)。以上所有hadoop資料類型和java的資料類型都很相像,除了它們是針對網絡序列化而做的特殊優化。

2.建立Reducer(任何名字)類和reduce函數,reduce函數是在org.apache.hadoop.mapreduce.Reducer.class類中,以抽象方法定義的。

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
import java.io.IOException;
import java.util.Iterator;
 
public class Reduce extends Reducer<Text, IntWritable, Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable intWritable : values){
            sum += intWritable.get();
        }
        context.write(key, new IntWritable(sum));
    }
}
           

說明:Reducer類是一個泛型類,帶有4個參數(輸入的鍵,輸入的值,輸出的鍵,輸出的值)。在這裡輸入的鍵和輸入的值必須跟Mapper的輸出的類型相比對,輸出的鍵是Text(關鍵字),輸出的值是Intwritable(出現的次數)。

3.我們已經準備号了Map和Reduce的實作類,現在我們需要invoker來配置Hadoop任務,調用Map Reduce程式。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class WordCount{
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("fs.default.name", "hdfs://localhost:10011");
        configuration.set("mapred.job.tracker","localhost:10012");
 
        Job job = new Job(configuration, "Word Count");
 
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
        job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
        //Submit the job to the cluster and wait for it to finish.
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
           

4.編譯、建立jar

mkdir WordCount javac -classpath ${HADOOP_HOME}/hadoop-0.20.2+228-core.jar -d WordCount path/*.java
           

Maven項目使用 mvn package打成jar包 WordCount.jar

5.在本地檔案系統中建立輸入檔案

cd /wordcount/input gedit file01 gedit file02
           

6.複制本地的輸入檔案到HDFS

$HADOOP_HOME/bin/hadoop fs -cp ~/wordcount/input/file01 /home/user1/dfs/input/file01 
           
$HADOOP_HOME/bin/hadoop fs -cp ~/wordcount/input/file02 /home/user1/dfs/input/file02
           

7.執行jar包

$HADOOP_HOME/bin/hadoop jar WordCount.jar WordCount /home/user1/dfs/input /home/user1/dfs/output
           

8.執行完畢後,以下的指令是用于檢視reduce的輸出檔案

$HADOOP_HOME/bin/hadoop fs -ls /home/user1/dfs/output/
           

9.使用如下指令來檢視檔案:

$HADOOP_HOME/bin/hadoop fs -cat hdfs:///home/user1/dfs/output/part-00000 
           
$HADOOP_HOME/bin/hadoop fs -cat hdfs:///home/user1/dfs/output/part-00001 
           
$HADOOP_HOME/bin/hadoop fs -cat hdfs:///home/user1/dfs/output/part-00002
           

繼續閱讀