天天看點

Apache Hadoop MapReduce WordCount案例程式設計入門

一.MapReduce 簡介

MapReduce作為Hadoop的三大元件(功能上分)之一,主要為提供大資料平台的分布式計算,雖然比較臃腫,隻适合處理離線處理,但是對于了解spark等架構的原理架構會有很大幫助。

二.WordCount案例編寫

為了測試友善,是以直接在windows10本地測試本案例

1.準備階段

1)資料準備

wordCountdemo.rar 解壓到某個文夾下,例如本例中解壓到:D:\mktest

2)Jar包準備(Maven配置)
<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn</artifactId>
            <version>2.7.6</version>
        </dependency>
    </dependencies>
           
即:

hadoop-common

,

hadoop-hdfs

,

hadoop-mapreduce-client-core

,

hadoop-mapreduce-client-common

hadoop-yarn

(Maven 形式的話,會自動下載下傳其所依賴的jar包)
3)加入log4j.properties日志配置檔案(src下面)
###set log levels###
log4j.rootLogger=info, stdout
###output to the console###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy HH:mm:ss:SSS z}] %t %5p %c{2}: %m%n
           

2.WordCount代碼實作

結構:

自定義Mapper

,

自定義Reducer

,

Driver

1)自定義Mapper類

建立WordCountMapper類,繼承Mapper類

由于五個檔案中都是以tab鍵分割的。
package com.mycat.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    // 定義變量作為map輸出的key
    Text sendKey=new Text();
    // 定義變量作為map輸出的value
    IntWritable sendValue=new IntWritable();

    //由于map方法的調用頻率是每一行,即按行調用,故粒度操作可細化為對一行的操作
    /**
     * 參數一(key):每一行的行首偏移量,與每一行的縮占用位元組數量息息相關.
     * 參數二(value):即每一行具體的内容
     * 參數三(context):上下文對象,上用來承接前面架構接口,下來作為向下一層輸出的接口
     * LongWritable,Text是實作了可序列化接口的類(分别對應java的long和String類型)
     * 之是以要傳遞序列化的類型是因為分布式計算需要通過網絡實作資料的傳輸,
     * 為什麼不使用Java預設的Serializable接口?
     *  java預設的序列化接口的好處是相容性強,但是序列化與反序列化性能方面卻很差,Hadoop預設采用
     *  Writable接口實作對象的序列化和反序列化。
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //針對,每一行的多個單詞按照tab鍵進行分割
        String[] lines = value.toString().split("\t");
        //對每一個單詞做一個标記,設定其值為1,然後通過context對象向下一層傳遞
        for (String word : lines) {
            sendKey.set(word);
            sendValue.set(1);
            context.write(sendKey,sendValue);
        }
    }
}
           
2)自定義Reducer類

建立WordCountReducer類,繼承Reducer類

注意:Text類型所在包是

org.apache.hadoop.io.Text

package com.mycat.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reducer的調用頻率是每組一次(按照分組)
 * 參數1(key):類型必須和Mapper的key類型保持一緻,其值對應于mapper的key
 * 參數2(values):對mapper的value進行排序分組聚合後的疊代器對象
 * 參數3(context):上下文對象,承上啟下,節結果交給下層輸出接口
 */
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    /**
     *
     * @param key 例如 hello
     * @param values  1,1,1,1,1
     * @param context 上下文對象
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //定義來彙總一組值的變量
        int sum=0;
        //疊代周遊values
        for (IntWritable value : values) {
            sum+=value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}
           
3) 建立Driver驅動類
package com.mycat.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //通過Configuration建立作業對象
        Configuration conf = new Configuration();
        Job job=Job.getInstance(conf);

        //指定打成jar包後主類入口
        job.setJarByClass(Driver.class);

        //指定Mapper類
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //指定自定義的mapper類的輸出鍵值類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //指定自定義reducer類的輸出鍵值類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(("D:\\mktest\\wordCountdemo")));
        FileSystem fs = FileSystem.get(conf);
        Path path=new Path("D://mktest/wordcount");
        //輸出目錄要求不能存在,不然會報錯,下面判斷為:如果該目錄存在該目錄直接級聯删除(友善測試)
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job, path);

        //送出作業
        job.waitForCompletion(true);
    }
}
           

3.結果展示

1) 控制台輸出
13/03/19 20:14:30:415 CST] main  INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local965530918_0001
.................................................
.................................................
[13/03/19 20:14:31:562 CST] main  INFO mapreduce.Job:  map 100% reduce 100%
[13/03/19 20:14:31:563 CST] main  INFO mapreduce.Job: Job job_local965530918_0001 completed successfully
[13/03/19 20:14:31:573 CST] main  INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=14818
		FILE: Number of bytes written=1758716
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
	Map-Reduce Framework
		Map input records=33
		Map output records=77
		Map output bytes=723
		Map output materialized bytes=907
		Input split bytes=500
		Combine input records=0
		Combine output records=0
		Reduce input groups=13
		Reduce shuffle bytes=907
		Reduce input records=77
		Reduce output records=13
		Spilled Records=154
		Shuffled Maps =5
		Failed Shuffles=0
		Merged Map outputs=5
		GC time elapsed (ms)=0
		Total committed heap usage (bytes)=3019898880
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=438
	File Output Format Counters 
		Bytes Written=117
           
2)輸出結果檔案夾
# D:\mktest\wordcount 的目錄
2019/03/13  20:14                12 .part-r-00000.crc
2019/03/13  20:14                 8 ._SUCCESS.crc
2019/03/13  20:14               105 part-r-00000
2019/03/13  20:14                 0 _SUCCESS
           
3)輸出檔案介紹

.part-r-00000.crc

:結果檔案的校驗檔案

._SUCCESS.crc

:結果成功辨別檔案的校驗檔案

part-r-00000

:輸出結果檔案(因為預設隻有一個reducetask是以隻有一個結果輸出檔案)

_SUCCESS

:成功辨別檔案
4) 結果輸出檔案檢視(

第一行那個數字格式是模拟測試資料時不小心儲存錯了,但是一樣測試

)

結果輸出格式:

單詞

出現次數

00:0c:29:16:90	1
fer	4
fhieu	4
fjeir	4
fjir	4
fre	8
hdf	8
hdfs	4
hds	4
hello	16
hfureh	4
word	4
world	12
           

繼續閱讀