天天看點

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

​​第1章 MapReduce概述​​​​1.1 MapReduce定義​​​​1.2 MapReduce優缺點​​​​1.2.1 優點​​​​1.2.2 缺點​​​​1.3 MapReduce核心思想​​​​1.4 MapReduce程序​​​​1.5 官方WordCount源碼​​​​1.6 常用資料序列化類型​​​​1.7 MapReduce程式設計規範​​​​1.8 WordCount案例實操​​​​第2章 Hadoop序列化​​​​2.1 序列化概述​​​​2.2 自定義bean對象實作序列化接口(Writable)​​​​2.3 序列化案例實操​​

第1章 MapReduce概述

1.1 MapReduce定義

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

1.2 MapReduce優缺點

1.2.1 優點

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化
大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

1.2.2 缺點

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

1.3 MapReduce核心思想

MapReduce核心程式設計思想,如下圖所示。

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

詳解如下:

  • 1)分布式的運算程式往往需要分成至少2個階段。
  • 2)第一個階段的MapTask并發執行個體,完全并行運作,互不相幹。
  • 3)第二個階段的ReduceTask并發執行個體互不相幹,但是他們的資料依賴于上一個階段的所有MapTask并發執行個體的輸出。
  • 4)MapReduce程式設計模型隻能包含一個Map階段和一個Reduce階段,如果使用者的業務邏輯非常複雜,那就隻能多個MapReduce程式,串行運作。

​總結​

​:分析WordCount資料流走向,深入了解MapReduce核心思想。

1.4 MapReduce程序

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

1.5 官方WordCount源碼

  采用反編譯工具【jd-gui.exe】反編譯源碼,發現WordCount案例有Map類、Reduce類和驅動類。且資料的類型是Hadoop自身封裝的序列化類型。

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

1.6 常用資料序列化類型

常用的資料類型對應的Hadoop資料序列化類型

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

1.7 MapReduce程式設計規範

使用者編寫的程式分成三個部分:Mapper、Reducer 和 Driver。

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化
大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

1.8 WordCount案例實操

1、需求

在給定的文本檔案中統計輸出每一個單詞出現的總次數

(1)輸入資料

hello.txt

atguigu atguigu
ss ss
cls cls
jiao
banzhang
xue
hadoop
      

(2)期望輸出資料

atguigu    2
banzhang    1
cls    2
hadoop    1
jiao    1
ss    2
xue    1
      

2、需求分析

按照MapReduce程式設計規範,分别編寫Mapper,Reducer,Driver,如下圖所示。

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

3、環境準備

(1)建立Maven工程

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

不使用骨架建立Maven工程

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

填寫資訊

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

(2)在pom.xml檔案中添加如下依賴

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
    </dependencies>
      

(3)在項目的src/main/resources目錄下,建立一個檔案,命名為“log4j.properties”,在檔案中填入。

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
      

4.編寫程式

(1)編寫Mapper類

package com.atguigu.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Mapper階段
 * 
 * @author bruce
 */
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1、擷取一行
        String line = value.toString();

        // 2、按照空格切割
        String[] words = line.split(" ");

        // 3、循環輸出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}
      

(2)編寫Reducer類

package com.atguigu.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Reducer階段
 * 
 * @author bruce
 */
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    int sum;
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        // 1、加求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }

        // 2、輸出
        v.set(sum);
        context.write(key, v);
    }
}
      

(3)編寫Driver驅動類

package com.atguigu.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver {

    public static void main(String[] args)
            throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

        // 1、擷取配置資訊對象以及封裝任務
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2、設定jar的加載路徑
        job.setJarByClass(WordcountDriver.class);

        // 3、設定map和reduce類
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 4、設定map輸出的key和value類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5、設定最終輸出的key和value類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6、設定輸入和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7、送出job
        // job.submit();
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
      

5、本地測試

(1)如果電腦系統是win7的就将win7的hadoop jar包解壓到非中文路徑,并在Windows環境上配置​

​HADOOP_HOME​

​環境變量。如果是電腦win10作業系統,就解壓win10的hadoop jar包,并配置​

​HADOOP_HOME​

​環境變量。

​注意:​

​win8電腦和win10家庭版作業系統可能有問題,需要重新編譯源碼或者更改作業系統。

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

(2)在Eclipse/Idea上運作程式

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

控制台出現了如下相關異常:

Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:364)
      

解決方案一:拷貝hadoop.dll檔案(檔案位置:D:\work\Hadoop\hadoop-2.7.2\bin)到Windows目錄C:\Windows\System32。個别同學電腦可能還需要修改Hadoop源碼。(方案一:親測有效)

解決方案二:建立如下包名,并将NativeIO.java拷貝到該包名下

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

(3)Debug調試

6、在叢集上測試

(0)用maven打jar包,需要添加打包插件依賴

​注意:​

​标記紅顔色的部分需要替換為自己工程主類。

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin </artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.atguigu.mr.WordcountDriver</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
      

​注意:​

​如果工程上顯示紅叉。在項目上右鍵->Maven->Update Project即可。

(1)将程式打成jar包,然後拷貝到Hadoop叢集中

  步驟詳情:右鍵->Run as->Maven install。等待編譯完成就會在項目的target檔案夾中生成jar包。如果看不到。在項目上右鍵->Refresh,即可看到。修改不帶依賴的jar包名稱為wc.jar,并拷貝該jar包到Hadoop叢集。

(2)啟動Hadoop叢集

[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh 
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh 
      

(3)執行WordCount程式

[atguigu@hadoop102 hadoop-2.7.2]$ hadoop jar wc.jar com.atguigu.mr.wordcount.WordcountDriver /user/atguigu/input/ /user/atguigu/output1/
      

第2章 Hadoop序列化

2.1 序列化概述

大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化
大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

2.2 自定義bean對象實作序列化接口(Writable)

  在企業開發中往往常用的基本序列化類型不能滿足所有需求,比如在Hadoop架構内部傳遞一個bean對象,那麼該對象就需要實作序列化接口。

具體實作bean對象序列化步驟如下7步。

(1)必須實作Writable接口。

(2)反序列化時,需要反射調用空參構造函數,是以必須有空參構造。

public FlowBean() {
    super();
}
      

(3)重寫序列化方法。

@Override
public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);
    out.writeLong(downFlow);
    out.writeLong(sumFlow);
}
      

(4)重寫反序列化方法。

@Override
public void readFields(DataInput in) throws IOException {
    upFlow = in.readLong();
    downFlow = in.readLong();
    sumFlow = in.readLong();
}
      

(5)​

​注意反序列化的順序和序列化的順序完全一緻。​

(6)要想把結果顯示在檔案中,需要重寫toString(),可用"\t"分開,友善後續用。

(7)如果需要将自定義的bean放在key中傳輸,則還需要實作Comparable接口,因為MapReduce框中的Shuffle過程要求對key必須能排序。詳見後面排序案例。

@Override
public int compareTo(FlowBean o) {
    // 倒序排列,從大到小
    return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
      

2.3 序列化案例實操

  統計每一個手機号耗費的總上行流量、下行流量、總流量。

phone_data.txt

1    13736230513 192.196.100.1   www.atguigu.com 2481    24681   200
2    13846544121 192.196.100.2                   264 0   200
3     13956435636 192.196.100.3                   132     1512    200
4     13966251146 192.168.100.1                   240 0   404
5     18271575951 192.168.100.2   www.atguigu.com 1527    2106    200
6     84188413    192.168.100.3   www.atguigu.com 4116    1432    200
7     13590439668 192.168.100.4                   1116    954     200
8     15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9     13729199489 192.168.100.6                   240 0   200
10     13630577991 192.168.100.7   www.shouhu.com  6960    690     200
11     15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12     15959002129 192.168.100.9   www.atguigu.com 1938    180     500
13     13560439638 192.168.100.10                  918     4938    200
14     13470253144 192.168.100.11                  180     180     200
15     13682846555 192.168.100.12  www.qq.com      1938    2910    200
16     13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17     13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18     18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19     13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20     13768778790 192.168.100.17                  120     120     200
21     13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22     13568436656 192.168.100.19                  1116    954     200
      

(2)輸入資料格式

7     13560436666 120.196.100.99      1116         954            200
id    手機号碼        網絡ip          上行流量     下行流量     網絡狀态碼
      

(3)期望輸出資料格式

13560436666         1116                954             2070
手機号碼            上行流量            下行流量         總流量
      
大資料技術之_05_Hadoop學習_01_MapReduce_MapReduce概述+Hadoop序列化

3、編寫MapReduce程式

(1)編寫流量統計的Bean對象

package com.atguigu.mr.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// 1、實作writable接口
public class FlowBean implements Writable {

    private long upFlow; // 上行流量
    private long downFlow;  // 下行流量
    private long sumFlow; // 總流量

    // 2 、反序列化時,需要反射調用空參構造函數,是以必須有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    // 3、 寫序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    // 4、反序列化方法
    // 5、反序列化方法讀順序必須和寫序列化方法的寫順序必須一緻
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow  = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    // 6、重寫toString方法,友善後續列印到文本
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}
      

(2)編寫Mapper類

package com.atguigu.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    Text k = new Text();
    FlowBean v = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1、擷取一行:7 13560436666     120.196.100.99      1116    954     200
        String line = value.toString();

        // 2、切隔字段
        String[] fieids = line.split("\t");

        // 3、封裝對象
        // 取出手機号碼
        String phoneNum = fieids[1]; // 封裝手機号
        k.set(phoneNum);

        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fieids[fieids.length - 3]);
        long downFlow = Long.parseLong(fieids[fieids.length - 2]);

        v.setUpFlow(upFlow);
        v.setDownFlow(downFlow);
        // v.set(upFlow, downFlow);

        // 4、寫出
        context.write(k, v);
    }
}
      

(3)編寫Reducer類

package com.atguigu.mapreduce.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {

        long sum_upFlow = 0;
        long sum_downFlow = 0;

        // 1 周遊所用bean,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            sum_upFlow += flowBean.getUpFlow();
            sum_downFlow += flowBean.getDownFlow();
        }

        // 2 封裝對象
        FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);

        // 3 寫出
        context.write(key, resultBean);
    }
}
      

(4)編寫Driver驅動類

package com.atguigu.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
        args = new String[] { "d:/temp/atguigu/0529/input/inputflow", "d:/temp/atguigu/0529/output2" };

        // 1、擷取配置資訊,或者擷取job對象執行個體
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2、指定本程式的jar包所在的本地路徑
        job.setJarByClass(FlowsumDriver.class);

        // 3、指定本業務job要使用的Mapper/Reducer業務類
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 4、指定Mapper輸出的資料的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5、指定最終輸出的資料的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6、指定job的輸入輸出原始檔案所在的目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7、将job中配置的相關參數,以及job所用的java類所在的jar包,送出給yarn去運作
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}
      

Copyright ©2018-2019