天天看點

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

一、概述

1.1 大資料概念

大資料是需要新處理模式才能具有更強的決策力、洞察發現力和流程優化能力來适應海量、高增長率和多樣化的資訊資産。

1.2 大資料面臨的問題

存儲

:單機存儲有限,需要使用叢集(多台機器)存儲資料;硬體上必須有足夠的存儲容量,軟體上有對應的容災機制。

分析

:單機算力有限,也需要使用叢集進行計算(需要在合理的時間内将資料變廢為寶)

1.3 大資料的特點

4V Volume 資料量大 Velocity 時效性 Variety 多樣性 Value 價值大

1)資料量大

B-KB-MB-GB-TB-PB-EB-ZB…

各種個人雲存儲解決方案:百度網盤、騰訊微雲、115、lanzou、誠通、OneDriver、GoogleDriver 等

大資料産生于21世紀的網際網路時代,日益進步的科技和日益增長的物質文化需求,導緻了資料的大爆炸;

淘寶、支付寶、微信、QQ、抖音這些App是目前國内頂尖的流量,使用人數及其的龐大,每天可以産生極多的資料量。

2)資料時效性

雙十一、618

大資料是在短時間内迅速産生(産生的時效性非常高),分析的時效性就必須因場景而異,需要在合理的時間内分析出有價值的資料。

3)資料多樣性

(1)資料存儲類型多樣性

結構化的資料:表格、文本、SQL等

非結構化資料:視訊、音頻、圖檔

(2)資料分析類型多樣性

地理位置:來自北京、中國、上海

裝置資訊:來自PC、手機、平闆、手表、手環、眼鏡

個人喜好:美女、面膜、ctrl、 數位、籃球、足球

社交網絡:A可能認識B 、C ,B就可能認識C

電話号碼:110,11086

網絡身份證:裝置MAC+電話+IP+地區

4)資料價值

警察叔叔:隻關注的是否哪裡違規

AI研究:犯罪預測、下棋、無人駕駛

是以在海量資料中有用的資料最為關鍵、這是分析資料的第一步,也就是對資料進行降噪處理(資料清洗|資料預處理)

1.4 應用場景

1)個人推薦

根據使用者喜好,推薦相關資源

千人一面、千人千面、一人千面

2)風控

大資料實時流處理,根據使用者行為模型進行支撐,判斷該行為是否正常

3)成本預測

4)氣候預測

5)人工智能

1.5 工作方向

1 業務
電商推薦、智能廣告系統、專家系統、智能交通、智能醫療
2 工作方向
 大資料開發工程師(實時計算、批處理、ETL、資料挖掘)、大資料運維工程師
           

1.6分布式

為了解決大資料存儲和計算的問題,需要使用一定數量的機器,硬體設施必須足夠,那軟體解決方案怎麼辦?

如何使用軟體去解決存儲和分析的問題?

二、Hadoop

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

Hadoop由 Apache Software Foundation 公司于 2005 年秋天作為Lucene的子項目Nutch的一部分正式引入。它受到最先由 Google Lab 開發的 Map/Reduce 和 Google File System(GFS) 的啟發。

2006 年 3 月份,Map/Reduce 和 Nutch Distributed File System (NDFS) 分别被納入稱為 Hadoop 的項目中。

Hadoop 是最受歡迎的在 Internet 上對搜尋關鍵字進行内容分類的工具,但它也可以解決許多要求極大伸縮性的問題。例如,如果您要 grep 一個 10TB 的巨型檔案,會出現什麼情況?在傳統的系統上,這将需要很長的時間。但是 Hadoop 在設計時就考慮到這些問題,采用并行執行機制,是以能大大提高效率。

HDFS

:Hadoop Distributed File System 作為Hadoop 生态體系中資料的存儲的軟體解決方案

MapReduce

:Hadoop中分布式計算架構(隻需要實作少量的代碼,就可以開發一個分布式的應用程式),對海量資料并行分析和計算

2.1 Hadoop生态系統

HDFS

:Hadoop Distributed File System 作為Hadoop 生态體系中資料的存儲的軟體解決方案

MapReduce

:Hadoop中分布式計算架構(隻需要實作少量的代碼,就可以開發一個分布式的應用程式),對海量資料并行分析和計算

HBase

: 基于HDFS 的列式存儲的NoSql

Hive

:是一款SQL解釋引擎,能夠将SQL語句翻譯成MR代碼

Flume

:分布式的日志收集系統,用于收集海量日志資料,并将其存儲在hdfS中

kafka

:消息對列,實作對分布式應用程式間的解耦和資料緩沖

Zookeeper

:分布式協調服務,使用者注冊中心、配置中心、叢集選舉、狀态檢測、分布式鎖

2.2 大資料分析方案

MapReduce

:大資料離線批處理(代表基于磁盤,延遲30分鐘+)

Spark

:大資料離線批處理(代表基于記憶體,速度相對于MR來說快的多)

Strom/Spark Streaming/Kafka Streaming/Flink

:實時流處理架構,達到對記錄級别消息的毫秒級處理

三、HDFS

3.1 安裝(僞叢集)

1)準備虛拟機

更改IP
删除MAC位址 
更改主機名     vi /etc/sysconfig/network
           

2)安裝JDK 8

3)配置Java環境變量

export JAVA_HOME=/home/java/jdk1.8.0_181
export PATH=$PATH:$JAVA_HOME/bin
           

4)配置主機名與IP的映射關系

[[email protected] ~]#  vi /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=HadoopNode00

[[email protected] ~]# vi /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.11.20 HadoopNode00
           

5)關閉防火牆

[[email protected] ~]# service iptables stop   #  關閉防火牆
[[email protected] ~]# chkconfig iptables off  # 關閉防火牆開機自動啟動 
           

6)ssh免密登陸

SSH是Secure Shell 的縮寫,SSH為建立在應用層山的安全協定,專為遠端登陸會話和其他網絡服務提供安全協定支援。

基于密碼的安全驗證

:基于使用者名和密碼 root | 123456

基于密鑰的安全驗證:需要依靠密鑰

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
[[email protected] ~]# ssh-keygen -t rsa   # 生成密鑰
[[email protected] ~]# ssh-copy-id HadoopNOde00
           

7)解壓Hadoop

解壓Hadoop到指定目錄
[[email protected] ~]# mkdir /home/hadoop/
[[email protected] ~]# tar -zxvf /home/hadoop/hadoop-2.6.0.tar.gz  -C /home/hadoop
           

8)配置Hadoop環境變量

export HADOOP_HOME=/home/hadoop/hadoop-2.6.0
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
           

HADOOP_HOME

環境變量别第三方依賴,hbase hive flume在內建HADOOP的時候,是通過HADOOP_HOME找到hadoop的位置

9)配置 etc/hadoop/core-site.xml

<property>
<name>fs.defaultFS</name>
<value>hdfs://HadoopNode00:9000</value>
</property>

<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/hadoop-2.6.0/hadoop-${user.name}</value>
</property>

           

10)配置 etc/hadoop/hdfs-site.xml

<property>
<name>dfs.replication</name>
<value>1</value>
</property>
           

11)格式化namenode

第一次啟動hdfs的時候,需要格式化namenode
[[email protected] ~]# hdfs namenode -format
[[email protected] ~]# tree /home/hadoop/hadoop-2.6.0/hadoop-root
/home/hadoop/hadoop-2.6.0/hadoop-root
└── dfs
    └── name
        └── current
            ├── fsimage_0000000000000000000
            ├── fsimage_0000000000000000000.md5
            ├── seen_txid
            └── VERSION

3 directories, 4 files
           

12)啟動hdfs

start-dfs.sh   # 開啟HDFS 
stop-dfs.sh    # 關閉hdfs 
           

進入web界面

http://主機名:50070   


           
windows下 配置域名與ip的映射:C:\Windows\System32\drivers\etc \hosts

3.2 HDFS Shell 相關操作

1)hdfs shell

[[email protected] ~]# hadoop fs
Usage: hadoop fs [generic options]
        [-appendToFile <localsrc> ... <dst>]
        [-cat [-ignoreCrc] <src> ...]
        [-checksum <src> ...]
        [-chgrp [-R] GROUP PATH...]
        [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
        [-chown [-R] [OWNER][:[GROUP]] PATH...]
        [-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]
        [-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
        [-count [-q] [-h] <path> ...]
        [-cp [-f] [-p | -p[topax]] <src> ... <dst>]
        [-createSnapshot <snapshotDir> [<snapshotName>]]
        [-deleteSnapshot <snapshotDir> <snapshotName>]
        [-df [-h] [<path> ...]]
        [-du [-s] [-h] <path> ...]
        [-expunge]
        [-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
        [-getfacl [-R] <path>]
        [-getfattr [-R] {-n name | -d} [-e en] <path>]
        [-getmerge [-nl] <src> <localdst>]
        [-help [cmd ...]]
        [ ..]
        [-usage [cmd ...]]

Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|resourcemanager:port>    specify a ResourceManager
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]


           

2)上傳檔案

# 上傳 root目錄下的install.log  到hdfs 根目錄下
[[email protected] ~]# hadoop fs -put  /root/install.log  /1.txt

           

3 ) ls檔案

# 找到到了剛才上傳為檔案命名為1.txt
[[email protected] ~]# hadoop fs -ls /
Found 1 items
-rw-r--r--   1 root supergroup       8901 2019-09-17 23:28 /1.txt
           

4)下載下傳檔案

5)删除檔案

[[email protected] ~]# hadoop fs -rm /2.txt
19/09/17 23:36:05 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /2.txt
           

6)檢視檔案

[[email protected] ~]# hadoop fs -cat /1.txt
Installing libgcc-4.4.7-23.el6.x86_64
warning: libgcc-4.4.7-23.el6.x86_64: Header V3 RSA/SHA1 Signature, key ID c105b9de: NOKEY
Installing setup-2.8.14-23.el6.noarch

           

7)建立檔案夾

[[email protected] ~]# hadoop fs -mkdir /yyh
[[email protected] ~]# hadoop fs -ls /
Found 2 items
-rw-r--r--   1 root supergroup       8901 2019-09-17 23:28 /1.txt
drwxr-xr-x   - root supergroup          0 2019-09-17 23:37 /yyh

           

8)複制檔案

[[email protected] ~]# hadoop fs -cp /1.txt /yyh/
[[email protected] ~]# hadoop fs -ls /
Found 2 items
-rw-r--r--   1 root supergroup       8901 2019-09-17 23:28 /1.txt
drwxr-xr-x   - root supergroup          0 2019-09-17 23:38 /yyh
[[email protected] ~]# hadoop fs -ls /yyh
Found 1 items
-rw-r--r--   1 root supergroup       8901 2019-09-17 23:38 /yyh/1.txt
           

9)開啟資源回收筒機制

core-site.xml

<property>
<name>fs.trash.interval</name>
<value>1</value>
</property>
           
設定一分鐘延遲

3.3 Java API 操作HDFS

(1) 依賴

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.6.0</version>
</dependency>

           

(2)Windows 配置Hadoop環境

  • 解壓hadoop到指定的目錄
  • 拷貝hadoop.dll和winutils.exe到hadoop/bin 目錄下
  • 配置Hadoop環境變量
  • 配置主機名和IP的映射關系

(3)權限不足解決方案

org.apache.hadoop.security.AccessControlException: Permission denied: user=Administrator, access=WRITE, inode="/yyh":root:supergroup:drwxr-xr-x
           

1)配置 hdfs-site.xml

将權限檢查關閉
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
           

2)方案2

-DHADOOP_USER_NAME=root
           

3)方案3

(3)相關操作

package com.yyh.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.theories.suppliers.TestedOn;
import sun.awt.geom.AreaOp;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

public class App {

    private Configuration configuration;
    private FileSystem fileSystem;

    @Before
    public void getClient() throws Exception {

        System.setProperty("HADOOP_USER_NAME", "root");
        /*
         * 準備配置對象
         * */
        configuration = new Configuration();
        /*
         * 添加相應的配置檔案*/
        configuration.addResource("core-site.xml");
        configuration.addResource("hdfs-site.xml");

        /*
         * 通過FileSystem.newInstance 獲得用戶端對象*/
        fileSystem = FileSystem.newInstance(configuration);
    }

    @Test
    public void testUpload01() throws Exception {

        /*
         *
         * 源檔案  |   目标檔案
         * Path 對象
         * */
        fileSystem.copyFromLocalFile(new Path("G:\\A.docx"), new Path("/yyh/2.docx"));

    }

    @Test
    public void testUpload02() throws Exception {

        /*
         * 準備 本地輸入流
         * */
        FileInputStream inputStream = new FileInputStream("G:\\A.docx");


        /*
         * 準備 hdfs 輸出流
         * */
        FSDataOutputStream outputStream = fileSystem.create(new Path("/yyh/3.docx"));


        /*
         * 使用工具類進行拷貝
         * */
        IOUtils.copyBytes(inputStream, outputStream, 1024, true);
    }

    @Test
    public void testDownload01() throws Exception {

        fileSystem.copyToLocalFile(false, new Path("/1.txt"), new Path("G:\\3.txt"), true);

    }

    @Test
    public void testDownload02() throws Exception {

        FileOutputStream outputStream = new FileOutputStream("G:\\4.txt");

        FSDataInputStream inputStream = fileSystem.open(new Path("/1.txt"));
        IOUtils.copyBytes(inputStream, outputStream, 1024, true);

    }

    @Test
    public void test011() throws IOException {


        RemoteIterator<LocatedFileStatus> list = fileSystem.listFiles(new Path("/"), true);


        while (list.hasNext()) {

            LocatedFileStatus locatedFileStatus = list.next();
            Path path = locatedFileStatus.getPath();
            System.out.println(path.toString());

        }


    }

    @Test
    public void test02() throws Exception{

        fileSystem.delete(new Path("/yyh"),false);
    }
    @Test
    public void test03() throws Exception{

        boolean exists = fileSystem.exists(new Path("/1.txt"));
        if (exists){
            System.out.println("檔案存在");
        }else {

            System.out.println("檔案不存在");
        }
    }

    @Test
    public void testy04() throws Exception{

        fileSystem.mkdirs(new Path("/yyh1243"));
    }
}

           

3.4 HDFS Architecture

HDFS為主從架構,HDFS中有一個主的NameNode,管理系統命名空間和管理用戶端對檔案的通路,其中還有DataNode負責和NameNode進行協調工作,DataNode負責資料的存儲,在存儲資料(檔案)的過程中一個檔案會被分成一個塊或者多個塊,在NameNode中存儲了一些資料(存儲的資料是塊到DataNode的映射關系),datanode還根據NameNode的指令建立删除複制塊。

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

namenode

:存儲中繼資料(使用者描述資料的資料),負責管理DataNode

datanode

:用于存儲資料塊的節點,負責響應用戶端的對塊的讀寫請求,向NameNode彙報自己的塊資訊

block塊

:資料塊,hdfs中對檔案拆分的最小單元,切分尺度預設為128MB,每個塊在預設情況下有三個副本

rack

:機架,使用機架配置檔案對存儲節點進行實體編排,用于優化存儲和計算

1)什麼是Block塊

<property>
  <name>dfs.blocksize</name>
  <value>134217728</value>
  <description>
      The default block size for new files, in bytes.
      You can use the following suffix (case insensitive):
      k(kilo), m(mega), g(giga), t(tera), p(peta), e(exa) to specify the size (such as 128k, 512m, 1g, etc.),
      Or provide complete size in bytes (such as 134217728 for 128 MB).
  </description>
</property>
           
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

(1)為什麼塊的大小為128MB?

在Hadoop1.x 塊大小預設為64MB,在Hadoop2.x 預設為128MB

工業限制:一般來說機械硬碟的讀取速度100MB左右

軟體優化:通常認為最佳狀态為尋址時間為傳輸時間的100分之一

(2)Block塊的大小能否随意設定?

不能,如果BlockSize過大,可能導緻多餘存儲空間浪費,導緻存取時間過長 如果BlockSize過小,會導緻尋址時間過長,同樣造成效率低下。

(3)HDFS為什麼不适合存儲小檔案

檔案 namenode記憶體占用 datanode磁盤占用
128MB 單檔案 1個Blcok中繼資料的大小 128MB
128*1MB 128個Block中繼資料的大小 128MB

namenode記憶體會過于緊張

2)Rack Awareness 機架感覺

對于常見情況,當複制因子為3時,HDFS的放置政策是将一個副本放在本地機架中的一個節點上,另一個放在本地機架中的另一個節點上,将最後一個放在另一個機架中的另一個節點上。此政策可以減少機架間寫入流量,進而提高寫入性能。機架故障的可能性遠小于節點故障的可能性;此政策不會影響資料可靠性和可用性保證。但是,它确實減少了讀取資料時使用的聚合網絡帶寬,因為塊隻放在兩個唯一的機架而不是三個。使用此政策時,檔案的副本不會均勻分布在機架上。三分之一的副本位于一個節點上,三分之二的副本位于一個機架上,另外三個副本均勻分布在剩餘的機架上。此政策可提高寫入性能,而不會影響資料可靠性或讀取性能。
           
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

檢視預設機架

[[email protected] ~]# hdfs  dfsadmin  -printTopology
Rack: /default-rack
   192.168.11.20:50010 (HadoopNode00)
           

3)NameNode 和 SecondaryNameNode 的 關系 (重點)

fsimage檔案:中繼資料資訊的備份,會被加載到記憶體中

edits檔案:Edits檔案幫助記錄增加和更新操作,提高效率

namenode在啟動時會加載fsimage和edits的檔案,是以在第一次啟動的時候需要格式化namenode

當使用者上傳檔案的時候或者進行其他操作的時候,操作記錄會寫入edits檔案中,這樣edits和fsimage檔案加起來的中繼資料永遠是最新的。

如果此時使用者一直進行操作的話,edits檔案會越來越大,這就導緻了在下次啟動的時候啟動速度過慢。

為了解決這個問題,出現了SecondaryNameNode ,将目前的NameNode的edits和fsimage檔案拷貝到自己的節點上,進行合并操作,在合并完成後,将新的fsimage檔案傳輸到原來的namenode中,此時namanode再去加載最新的fsimage。

新的問題:在SecondaryNameNode 進行拷貝操作的時候,如果有用戶端讀寫請求過來,勢必要追加相應的操作記錄到edits檔案中,但是此時正在進行拷貝操作,改變則代表會造成資料紊亂,怎麼辦?解辦法是:會有一個新的叫做edits-inprogress的檔案被建立,新的操作将寫入此檔案中,等待SecondaryNameNode合并完成,将edits-inprogress檔案改名成為目前的edits檔案。

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

4)檢查點

namenode使用fsimage和edits檔案儲存中繼資料,2nn會定期的下載下傳主的(Active)namenode的fsimage檔案和edits 檔案,并在本地進行合并。
合并的時機就稱之為檢查點
檢查點有兩種觸發機制:
(1) 預設一個小時進行合并
(2) 操作數量達到100W次進行合并
           
<property>
  <name>dfs.namenode.checkpoint.period</name>
  <value>3600s</value>
  <description>
    The number of seconds between two periodic checkpoints.
    Support multiple time unit suffix(case insensitive), as described
    in dfs.heartbeat.interval.
  </description>
</property>
           
<property>
  <name>dfs.namenode.checkpoint.txns</name>
  <value>1000000</value>
  <description>The Secondary NameNode or CheckpointNode will create a checkpoint
  of the namespace every 'dfs.namenode.checkpoint.txns' transactions, regardless
  of whether 'dfs.namenode.checkpoint.period' has expired.
  </description>
</property>
           

5)Safemode

在啟動時候加載fsimage和edits檔案,等待其他的DataNode報告塊資訊,直至大部分塊可用。在次期間,叢集處于SafeMode,NameNode的安全模式本質上是HDFS叢集的隻讀模式,它不允許對檔案系統或塊進行任何修改。
通常,在DataNode報告大多數檔案系統塊可用之後,NameNode會自動離開Safemode。
可以手動的進入或者退出SafeMode
           
[[email protected] ~]# hdfs dfsadmin -safemode  enter
Safe mode is ON
[[email protected] ~]# hadoop fs -put /root/1.txt  /
put: Cannot create file/1.txt._COPYING_. Name node is in safe mode.
[[email protected] ~]# hdfs dfsadmin -safemode  leave
Safe mode is OFF
[[email protected] ~]# hadoop fs -put /root/1.txt  /
           

6)DataNode工作機制

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
啟動的時候會注冊DataNode
周期向NameNode上報塊資訊,并且對其資訊狀态進行回報,DataNode進行相應的操作
心跳不能出現10分鐘以上的斷連,必須重新開機DataNode才能重制上線
           

四、MapReduce

4.1 概述

MapReduce是一種程式設計模型,用于大規模資料集(大于1TB)的并行運算。概念"Map(映射)“和"Reduce(歸約)”,是它們的主要思想,都是從函數式程式設計語言裡借來的,還有從矢量程式設計語言裡借來的特性。它極大地友善了程式設計人員在不會分布式并行程式設計的情況下,将自己的程式運作在分布式系統上。 目前的軟體實作是指定一個Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定并發的Reduce(歸約)函數,用來保證所有映射的鍵值對中的每一個共享相同的鍵組。

MapReduce

是Hadoop架構的一個

并行計算架構

,将一個計算任務拆分成兩個階段:Map和Reduce

MapReduce計算架構充分利用了 存儲節點(DataNode)所在實體主機的計算資源進行并行計算

預設情況下NodeManager會将本程序運作 的節點的計算資源抽像成8個計算單元,每個單元稱之為一個

Contioner

,所有的NodeManager都由ResourceManager排程,ResourceManager負責計算資源的統籌配置設定。

一是軟體架構 二是并行處理 三 可靠容錯 四 大規模計算 五 處理海量資料
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

MapReduce擅長做大資料處理,MapReduce的思想就是

分而治之

  • Map負責**”分“**,即把龐大且複雜的任務分解成若幹個”簡單的任務“來處理,簡單的任務包含三層
    • 是對資料或者計算模型相對于原任務要大大縮小
    • 就近計算原則,就是任務會被配置設定到存放所需資料的節點上進行計算
    • 這些小任務不止一個且并行計算,而且彼此間沒有依賴關系
  • Reducer負責對Map的計算結果進行彙總

4.2 為什麼使用MR?

package com.yyh.hdfs;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;

public class CleanApp {

    public static void main(String[] args) throws Exception {

        File file = new File("G:\\Note\\Day02-Hadoop\\資料檔案\\access.tmp2019-05-19-10-28.log");

        BufferedReader bufferedReader = new BufferedReader(new FileReader(file));

        FileWriter fileWriter = new FileWriter("G:\\Note\\Day02-Hadoop\\資料檔案\\clean.log");


        while (true) {

            String line = bufferedReader.readLine();

            if (line == null) {
                bufferedReader.close();
                fileWriter.close();
                return;
            }

            boolean contains = line.contains("thisisshortvideoproject'slog");
            if (contains) {

                String s = line.split("thisisshortvideoproject'slog")[0];
                fileWriter.write(s.trim() + "\n");
                fileWriter.flush();

            }

        }

    }
}

           

上述代碼是對日志進行簡單的清晰,在資料量少的時候一點問題都沒有,但是資料量一旦增加,就可能無法勝任需求,因為無法在合理的時間内完成計算,此時單機性能已經成為計算的瓶頸,但是手寫分布式應用程式難度太大,有現成的架構可以使用,那就是MR!

4.3 YARN 環境搭建

(1)什麼是 YARN ?

Yarn作為一個資源排程平台,有一個全局的管理者叫做ResourceManager,ResourceManager負責對叢集的整體計算及資源做統籌規劃,有各個節點的管理者叫做NodeManager,負責向ResourceManager報告其計算資源的使用情況,在NodeManger中有一個MRAppMaster管理這裡目前運作的MRApp,其任務是協調來自ResourceManager的資源,并與NodeManager一起執行和監視任務。
           

ResourceManager

:負責對叢集的整體計算及資源做統籌規劃

NodeManager

:管理主機上的計算組員,負責向RM 彙報自身的狀态資訊

MRAppMaster

:計算任務的Master,負責申請計算資源,協調計算任務

YARN Child

:負責做實際計算任務

Container:

計算資源的抽象單元

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

(2)配置YARN

etc/hadoop/yarn-site.xml

<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
 <property>
    <description>The hostname of the RM.</description>
    <name>yarn.resourcemanager.hostname</name>
    <value>HadoopNode00</value>
  </property> 
           

etc/hadoop/mapred-site.xml

etc/hadoop/ 下其實是沒有這個檔案 的但是有yitmp結尾的檔案,将其改名即可
<property>
  <name>mapreduce.framework.name</name>
  <value>yarn</value>
</property>
           

(3)啟動YARN

[[email protected] ~]# start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-2.6.0/logs/yarn-root-resourcemanager-HadoopNode00.out
localhost: starting nodemanager, logging to /home/hadoop/hadoop-2.6.0/logs/yarn-root-nodemanager-HadoopNode00.out
[[email protected] ~]# jps
60192 Jps
60046 ResourceManager
60142 NodeManager
           
web 界面: hostname:8088

4.4 MR 入門程式

需求:
wangkai gjf zkf suns gzy
wangkai zkf suns gzy
zkf suns gzy hxz leijun

wangkai 2
gjf 1
zkf 3 
suns 3
gzy 3
hxz 1
leijun 1
           

(1)依賴

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0</version>
        </dependency>
        
   
		<dependency>
   	 	<groupId>org.apache.hadoop</groupId>
    	<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
    	<version>2.6.0</version>
 
		</dependency>

		 <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0</version>
        </dependency>
           

(2)Mapper 邏輯

package com.yyh.mr.test01;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
* keyIn  LongWritable (Long) 輸入文本位元組偏移量
* valueIn Text (String)      輸入文本行
*  keyOut Text(String)
*  valueOut IntWritable(Int)
* */

public class WCMapper  extends Mapper<LongWritable, Text,Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String[] names = value.toString().split(" ");

        for (String name : names) {
            context.write(new Text(name),new IntWritable(1));
        }


    }
}

           

(3)Reduce 邏輯

package com.yyh.mr.test01;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


/*
 *keyIn Text 與mapper的keyOut的資料類型相對應
 *valeuIn IntWritable   與mapper的ValueOut的資料類型相對應
 * KeyOut
 * valueOut
 * */
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

           

(4)Job封裝

package com.yyh.mr.test01;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobRunner {
    public static void main(String[] args) throws Exception {

        /*
         * 擷取配置對象
         * */

        Configuration conf = new Configuration();

        /*
         * 擷取Job對象
         * */
        Job job = Job.getInstance(conf);


        /*
         * 設定資料輸入輸出元件
         * */
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);


        /*
         *設定資料輸入輸出路徑
         * */

        TextInputFormat.setInputPaths(job, new Path("/wordcount.txt"));
        /*
         * 注意: 此輸出路徑不能存在
         * */
        TextOutputFormat.setOutputPath(job, new Path("/yyh/out1"));


        /*
         * 設定MAP 和 REDUCE 處理邏輯
         * */
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);

        /*
         * 設定 map任務和reduce任務的輸出泛型
         * */
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        //  送出

        //job.submit();

        job.waitForCompletion(true);
    }
}
           

4.5 部署運作

(1)遠端Jar 包部署

// 設定jar 類加載器 否則MapReduce架構找不到Map和Reuce
 job.setJarByClass(JobRunner.class);
           
  • 打包
  • 運作 hadoop jar 包的名字 主類名

(2)本地仿真

填坑

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
<dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
           
log4j.properties
### 配置根 ###
log4j.rootLogger = info,console

### 配置輸出到控制台 ###
log4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern =  %p %d{yyyy-MM-dd HH:mm:ss} %c %m%n
           

(3)跨平台送出

  • 需要拷貝相關配置檔案到resource目錄
    • core-site.xml
    • hdfs-site.xml
    • yarn-site.xml
    • mapred-site.xml

代碼

System.setProperty("HADOOP_USER_NAME", "root");

        conf.addResource("conf2/core-site.xml");
        conf.addResource("conf2/hdfs-site.xml");
        conf.addResource("conf2/mapred-site.xml");
        conf.addResource("conf2/yarn-site.xml");
        conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");


           

配置跨平台送出

  • 配置mapred-site.xml
<property>
    <name>mapreduce.app-submission.cross-platform</name>
    <value>true</value>
  </property>
           
  • 代碼的方式

4.6 自定義Bean對象

(1)什麼是自定義Bean對象

開發不是一成不變的,Hadoop中提供了集中資料類型的序列化,但是在實際的開發中往往是不夠用的,需要自定義序列化對象
在Java中使用的序列化技術是内置的Serializable
但是Hadoop并沒有采取這種序列化方式,使用了自己實作的一套序列化機制,叫做Writable

需要進行序列化後才能在網絡中進行傳輸
編碼(序列化)----解碼(反序列化)
           

(2)需求

18611781163 700000 10000
18611781161 123 123123  
18611781163 700000 10000
18236529965 123 1223123
18236529964 123123 123
18236529965 546 45645
18611781163 300000 70000
18236529965 123 234523
18236529965 31243 436543
           

這是一組營運商的流量資訊

電話         上行    下行   總流量
18611781163 700000 10000   ?
           

(3)定義Bean對象

package com.yyh.mr.test02;

import org.apache.hadoop.io.Writable;
import sun.rmi.runtime.Log;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;

public class FlowBean implements Writable {

    private String phone;
    private Long upFlow;
    private Long downFlow;
    private Long sumFlow;

    public FlowBean() {
    }

    public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) {
        this.phone = phone;
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = sumFlow;
    }


    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public Long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Long upFlow) {
        this.upFlow = upFlow;
    }

    public Long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Long downFlow) {
        this.downFlow = downFlow;
    }

    public Long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(Long sumFlow) {
        this.sumFlow = sumFlow;
    }


    @Override
    public String toString() {
        return "" +
                "phone='" + phone + '\'' +
                " upFlow=" + upFlow +
                " downFlow=" + downFlow +
                " sumFlow=" + sumFlow ;
    }

    /*
     * 序列化 編碼
     * */
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.phone);
        dataOutput.writeLong(this.upFlow);
        dataOutput.writeLong(this.downFlow);
        dataOutput.writeLong(this.sumFlow);
    }


    /*
     * 反序列化  解碼
     * */
    public void readFields(DataInput dataInput) throws IOException {

        this.phone = dataInput.readUTF();
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();

    }
}

           
package com.yyh.mr.test02;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    /*
     * 18611781163 700000 10000
     * 18611781163 700000 10000
     * 18611781163 700000 10000
     * */

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();

        String[] data = line.split(" ");

        /*
         *  phone
         * */
        context.write(new Text(data[0]), new FlowBean(data[0], Long.valueOf(data[1]), Long.valueOf(data[2]), (Long.valueOf(data[1]) + Long.valueOf(data[2]))));



    }
}

           
package com.yyh.mr.test02;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
 * 18611781163  FlowBean[]
 *
 * */
public class FlowReducer extends Reducer<Text, FlowBean, NullWritable, FlowBean> {

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

        Long up = 0L;
        Long down = 0L;
        Long sum = 0L;


        for (FlowBean flowBean : values) {

            up += flowBean.getUpFlow();
            down += flowBean.getDownFlow();
            sum += flowBean.getSumFlow();

        }

        context.write(NullWritable.get(), new FlowBean(key.toString(), up, down, sum));

    }
}

           
package com.yyh.mr.test02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class FlowRunner {
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "root");

        Configuration conf = new Configuration();
        conf.addResource("conf2/core-site.xml");
        conf.addResource("conf2/hdfs-site.xml");
        conf.addResource("conf2/mapred-site.xml");
        conf.addResource("conf2/yarn-site.xml");
        conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");
        conf.set("mapreduce.app-submission.cross-platform", "true");

        Job job = Job.getInstance(conf);


        job.setJarByClass(FlowRunner.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);


        TextInputFormat.setInputPaths(job, new Path("/flow.dat"));

        TextOutputFormat.setOutputPath(job, new Path("/yyh/out333"));
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);


        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(FlowBean.class);


        job.waitForCompletion(true);

    }
}

           

4.7 MapReduce 計算流程(重點)

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

1 首先是通過程式員所編寫的MR程式通過指令行本地送出或者IDE遠端送出

2 一個MR程式就是一個Job,Job資訊會給Resourcemanger,向Resourcemanger注冊資訊

3 在注冊通過後,Job會拷貝相關的資源資訊(從HDFS中)

4 緊接着會向Resourcemanger送出完整的Job資訊(包括資源資訊)

5a Resourcemanger 會通過送出的Job資訊,計算出Job所需的資源,為Job配置設定Container資源

5b 計算資源會分發給對應的NodeManger,NodeManager會建立一個MRAppMaster

6 MRAppMaster初始化Job

7 擷取輸入切片資訊

8 MRAppMaster向ResourceManager 請求資源

9a 啟動計算資源(連接配接到對應的資源所在NodeManager)

9b 啟動YARN Child

10 從檔案系統中擷取完整的Job資訊

11 啟動對應的Maptask或者ReduceTask 程序,執行計算。

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

4.8 Job 送出流程(重點)

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

(1)建立連接配接

判斷是在本地運作還是叢集運作,分别會建立不同的運作對象 YARN | Local

(2)送出Job

1)校驗空間 checkSpecs()

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

2)緩存處理

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

3)建立資源路徑 Staging路徑

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

4)擷取Job ID ,在Staging路徑下建立Job路徑

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

5)拷貝相關資源到jobID路徑

files
libjars
archives
jobJar
           
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

6)計算切片 生成切片規劃檔案

切片是一個邏輯上的概念,不會檔案進行實際實體拆分,預設切分為128MB(本地為32MB)
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

7)向Staging路徑寫XML 配置檔案

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

4 .9 MapReduce 元件解析

(1)概述

通過WC案例的編寫,不難發現,其實我們是按照一定的規則進行程式的輸入和輸出,将作業放在本地運作或者送出到Hadoop叢集中運作。

Hadoop是将資料切分成了若幹個輸入切片(Input Split),并将每個切片交由一個MapTask的程序處理,MapTask不斷從對應的Split中解析出來一個一個的 key、value,并交由map()函數進行處理。處理完成之後根據ReduceTask的個數将結果集分成若幹個分片(partition)寫到磁盤中。

然後,每個ReduceTask會從MapTask所在的節點上的磁盤讀取屬于的那個分區(partition),然後使用基于排序方法将key 相同的資料聚合在一起,調用Reduce函數,處理完成後輸出到磁盤。

從上面的描述中,可以看出,還有一些元件是沒有在(目前的)程式設計中沒有展現到:

(1)指定文本格式。将輸入資料切分成若幹個切片,且将每個Split(切片)解析成滿足map函數要求的keyvalue對。

(2)确定map()函數産生的keyvalue對象發送給那個Reduce 函數處理

(3)指定輸出檔案格式,即每個keyvalue已何種形式儲存成輸出檔案。

是以在MR中,這個三個元件分别是InputFormat 、Partitioner、OutputFormat ,他們均需要使用者根據自己的需求進行配置,但是對于WC 來說,都是預設的。

但是最終。Hadoop還是提供五個可以程式設計的元件:分别 Mapper Reducer InputFormat Partitioner OutputFormat。

按照順序來:InputFormat Mapper Partitioner Reducer OutputFormat。

還有不是必備的元件:Canbiner ,通常是用于優化MR程式的性能,但是不能随意添加。

(2)InputFormat元件

InputFormat主要用于描述輸入資料的格式,它提供了如下的兩個功能:

  • 資料切分:按照某個政策将輸入資料切分成若幹輸入切片,确認MapTask個數和對應的Split
  • 為Mapper提供輸入資料:給定某個InputSplit,将其解析成一個一個的key、value

1)什麼是切片,如何分割?

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

切片

:邏輯上對資料檔案進行劃分

package org.apache.hadoop.mapreduce.lib.input;
-|FileInputFormat
	-|getSplits
           
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
public List<InputSplit> getSplits(JobContext job) throws IOException {
    	// 秒表 不用關注
        Stopwatch sw = (new Stopwatch()).start();
    
    
    	// 擷取最小大小
        long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
    	// 擷取最大大小
        long maxSize = getMaxSplitSize(job);
    
   		 // 準備存放InputSplit 的集合
        List<InputSplit> splits = new ArrayList();
    	 // 準備存放FileStatus 的集合
        List<FileStatus> files = this.listStatus(job);
        Iterator i$ = files.iterator();

        while(true) {
            while(true) {
                while(i$.hasNext()) {
                    FileStatus file = (FileStatus)i$.next();
                    //獲得目前檔案路徑
                    Path path = file.getPath();
                    // 擷取到目前的長度
                    long length = file.getLen();
                    if (length != 0L) {
                        BlockLocation[] blkLocations;
                        // 判斷是否是本地檔案還是hdfs檔案
                        if (file instanceof LocatedFileStatus) {
                            blkLocations = ((LocatedFileStatus)file).getBlockLocations();
                        } else {
                            FileSystem fs = path.getFileSystem(job.getConfiguration());
                            blkLocations = fs.getFileBlockLocations(file, 0L, length);
                        }

                        // 判斷是否可以進行切分
                        // hadoop預設資料都可以進行切割
                        if (this.isSplitable(job, path)) {
                            // 多的塊的大小
                            long blockSize = file.getBlockSize();
                            // 計算切片大小
                            long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
							// 準備描述剩餘資料的字段
                            long bytesRemaining;
                            int blkIndex;
                            for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }

                            if (bytesRemaining != 0L) {
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }
                        } else {
                            splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
                        }
                    } else {
                        splits.add(this.makeSplit(path, 0L, length, new String[0]));
                    }
                }

                job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
                sw.stop();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis());
                }

                return splits;
            }
        }
    }
           

2)如何為Mapper提供資料?

TextInpuFormat使用的是org.apache.hadoop.mapreduce.lib.input.LineRecordReader . 這個類中,首先是initialize()方法,該方法主要是擷取切片資訊初始化位置和結束位置,以及輸入流;

Mapper的key、value是通過nextKeyValue()判斷是否還有下一個,在這個方法中可以被設定成了檔案的偏移量,value通過LineReader.readLine()方法将每一行的值拿到

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit)genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647);
        this.start = split.getStart();
        this.end = this.start + split.getLength();
        Path file = split.getPath();
        FileSystem fs = file.getFileSystem(job);
        this.fileIn = fs.open(file);
        CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
        if (null != codec) {
            this.isCompressedInput = true;
            this.decompressor = CodecPool.getDecompressor(codec);
            if (codec instanceof SplittableCompressionCodec) {
                SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream(this.fileIn, this.decompressor, this.start, this.end, READ_MODE.BYBLOCK);
                this.in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes);
                this.start = cIn.getAdjustedStart();
                this.end = cIn.getAdjustedEnd();
                this.filePosition = cIn;
            } else {
                this.in = new SplitLineReader(codec.createInputStream(this.fileIn, this.decompressor), job, this.recordDelimiterBytes);
                this.filePosition = this.fileIn;
            }
        } else {
            this.fileIn.seek(this.start);
            this.in = new SplitLineReader(this.fileIn, job, this.recordDelimiterBytes);
            this.filePosition = this.fileIn;
        }

        if (this.start != 0L) {
            this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start));
        }

        this.pos = this.start;
    }
           
public boolean nextKeyValue() throws IOException {
        if (this.key == null) {
            this.key = new LongWritable();
        }

        this.key.set(this.pos);
        if (this.value == null) {
            this.value = new Text();
        }

        int newSize = 0;

        while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) {
            if (this.pos == 0L) {
                newSize = this.skipUtfByteOrderMark();
            } else {
                newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos));
                this.pos += (long)newSize;
            }

            if (newSize == 0 || newSize < this.maxLineLength) {
                break;
            }

            LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize));
        }

        if (newSize == 0) {
            this.key = null;
            this.value = null;
            return false;
        } else {
            return true;
        }
    }
           
public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
        return this.recordDelimiterBytes != null ? this.readCustomLine(str, maxLineLength, maxBytesToConsume) : this.readDefaultLine(str, maxLineLength, maxBytesToConsume);
    }
           
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
        str.clear();
        int txtLength = 0;
        int newlineLength = 0;
        boolean prevCharCR = false;
        long bytesConsumed = 0L;

        do {
            int startPosn = this.bufferPosn;
            if (this.bufferPosn >= this.bufferLength) {
                startPosn = this.bufferPosn = 0;
                if (prevCharCR) {
                    ++bytesConsumed;
                }

                this.bufferLength = this.fillBuffer(this.in, this.buffer, prevCharCR);
                if (this.bufferLength <= 0) {
                    break;
                }
            }

            while(this.bufferPosn < this.bufferLength) {
                if (this.buffer[this.bufferPosn] == 10) {
                    newlineLength = prevCharCR ? 2 : 1;
                    ++this.bufferPosn;
                    break;
                }

                if (prevCharCR) {
                    newlineLength = 1;
                    break;
                }

                prevCharCR = this.buffer[this.bufferPosn] == 13;
                ++this.bufferPosn;
            }

            int readLength = this.bufferPosn - startPosn;
            if (prevCharCR && newlineLength == 0) {
                --readLength;
            }

            bytesConsumed += (long)readLength;
            int appendLength = readLength - newlineLength;
            if (appendLength > maxLineLength - txtLength) {
                appendLength = maxLineLength - txtLength;
            }

            if (appendLength > 0) {
                str.append(this.buffer, startPosn, appendLength);
                txtLength += appendLength;
            }
        } while(newlineLength == 0 && bytesConsumed < (long)maxBytesToConsume);

        if (bytesConsumed > 2147483647L) {
            throw new IOException("Too many bytes before newline: " + bytesConsumed);
        } else {
            return (int)bytesConsumed;
        }
    }
           

(3)切片MapTask的關系

MapTask 的并發數量與切片相關(決定),ReduceTask數量是可以手動設定的,預設為1

(4)常用的InputFormat

1)分類

  • FileInputFormat
    • TextInputFormat
      • key LongWriteable 行的位元組偏移量
      • value Text 文本
      切片:以檔案為切分機關,有多少個檔案就至少有多少個切片
    • NLineInputFormat
      • key LongWriteable 行的位元組偏移量
      • value Text 文本

      切片:n行為一個切片,預設1行為一個切片,可以設定

      conf.set(“mapreduce.input.lineinputformat.linespermap”,“10”)

    NLineInputFormat.setNumLinesPerSplit();
    • CombineTextInputFormat
    • key LongWriteable 行的位元組偏移量
      • value Text 文本
      切片:按照SplitSize切分,一個切片可能對應多個Block塊
    CombineTextInputFormat.setMinInputSplitSize();
  • SequenceFileInputFormat
    • key 檔案名
    • value 檔案資料
  • DBInputFormat(資料庫)
  • TableInputFormat(HBase)

2)NLineInputFormat

package com.yyh.mr.test01;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobRunner {
    public static void main(String[] args) throws Exception {

        /*
         * 擷取配置對象
         * */

        Configuration conf = new Configuration();
        /*System.setProperty("HADOOP_USER_NAME", "root");
        conf.addResource("conf2/core-site.xml");
        conf.addResource("conf2/hdfs-site.xml");
        conf.addResource("conf2/mapred-site.xml");
        conf.addResource("conf2/yarn-site.xml");
        conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");
        conf.set("mapreduce.app-submission.cross-platform", "true");*/
        /*
         * 擷取Job對象
         * */
        Job job = Job.getInstance(conf);

        // // 設定jar 類加載器 否則MapReduce架構找不到Map和Reuce
        job.setJarByClass(JobRunner.class);


      /*  CombineFileInputFormat.setMaxInputSplitSize();
        CombineFileInputFormat.setMinInputSplitSize();*/

        /*
         * 設定資料輸入輸出元件
         * */
        job.setInputFormatClass(NLineInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        /*
         *設定資料輸入輸出路徑
         * */

        NLineInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\data02"));
        NLineInputFormat.setNumLinesPerSplit(job,3);
        //TextInputFormat.setInputPaths(job, new Path("/wordcount1.txt"));
        /*
         * 注意: 此輸出路徑不能存在
         * */
        //TextOutputFormat.setOutputPath(job, new Path("/yyh/out8121231233"));
        TextOutputFormat.setOutputPath(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\out12"));


        /*
         * 設定MAP 和 REDUCE 處理邏輯
         * */
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);

        /*
         * 設定 map任務和reduce任務的輸出泛型
         * */
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        //  送出

        //job.submit();

        job.waitForCompletion(true);
    }
}

           
package com.yyh.mr.test01;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.io.Serializable;

/*
* keyIn  LongWritable (Long) 輸入文本位元組偏移量
* valueIn Text (String)      輸入文本行
*  keyOut Text(String)
*  valueOut IntWritable(Int)
* */

public class WCMapper  extends Mapper<LongWritable, Text,Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


        String[] names = value.toString().split(" ");

        for (String name : names) {
            context.write(new Text(name),new IntWritable(1));
        }
    }
}

           
package com.yyh.mr.test01;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


/*
 *keyIn Text 與mapper的keyOut的資料類型相對應
 *valeuIn IntWritable   與mapper的ValueOut的資料類型相對應
 * KeyOut
 * valueOut
 * */
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

           

3)CombineTextInputFormat

package com.yyh.mr.test01;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobRunner {
    public static void main(String[] args) throws Exception {

        /*
         * 擷取配置對象
         * */

        Configuration conf = new Configuration();
        /*System.setProperty("HADOOP_USER_NAME", "root");
        conf.addResource("conf2/core-site.xml");
        conf.addResource("conf2/hdfs-site.xml");
        conf.addResource("conf2/mapred-site.xml");
        conf.addResource("conf2/yarn-site.xml");
        conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");
        conf.set("mapreduce.app-submission.cross-platform", "true");*/
        /*
         * 擷取Job對象
         * */
        Job job = Job.getInstance(conf);

        // // 設定jar 類加載器 否則MapReduce架構找不到Map和Reuce
        job.setJarByClass(JobRunner.class);





        /*
         * 設定資料輸入輸出元件
         * */
        job.setInputFormatClass(CombineTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        /*
         *設定資料輸入輸出路徑
         * */

        CombineTextInputFormat.setMinInputSplitSize(job, 1048576);
        CombineTextInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\data"));
        //NLineInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\data02"));
        // NLineInputFormat.setNumLinesPerSplit(job,3);
        //TextInputFormat.setInputPaths(job, new Path("/wordcount1.txt"));
        /*
         * 注意: 此輸出路徑不能存在
         * */
        //TextOutputFormat.setOutputPath(job, new Path("/yyh/out8121231233"));
        TextOutputFormat.setOutputPath(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\out111122"));


        /*
         * 設定MAP 和 REDUCE 處理邏輯
         * */
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);

        /*
         * 設定 map任務和reduce任務的輸出泛型
         * */
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        //  送出

        //job.submit();

        job.waitForCompletion(true);
    }
}

           

4)DBInputFormat

package com.yyh.DBInputFormat;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobRunner {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoopnode00:3306/hadoop", "root", "1234");
        Job job = Job.getInstance(conf);
        job.setJarByClass(JobRunner.class);

        job.setInputFormatClass(DBInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        DBInputFormat.setInput(job, User.class, "select id,name from user", "select count(1) from user");
        FileOutputFormat.setOutputPath(job, new Path("G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\src\\main\\java\\com\\yyh\\DBInputFormat\\out1"));


        job.setMapperClass(DBMapper.class);
        job.setReducerClass(DBReducer.class);


        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);


        job.waitForCompletion(true);
    }
}

           
package com.yyh.DBInputFormat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class DBMapper extends Mapper<LongWritable, User, LongWritable, Text> {
    @Override
    protected void map(LongWritable key, User value, Context context) throws IOException, InterruptedException {

        context.write(key, new Text(value.toString()));

    }
}

           
package com.yyh.DBInputFormat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class DBReducer extends Reducer<LongWritable, Text, NullWritable, Text> {

    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        for (Text value : values) {
            context.write(NullWritable.get(), value);

        }

    }
}

           
package com.yyh.DBInputFormat;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class User implements Writable, DBWritable {

    int id;

    String name;

    public User() {
    }

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }

    public void write(DataOutput dataOutput) throws IOException {

        dataOutput.writeInt(this.id);
        dataOutput.writeUTF(this.name);

    }

    public void readFields(DataInput dataInput) throws IOException {

        this.id = dataInput.readInt();
        this.name = dataInput.readUTF();

    }

    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setInt(1, this.id);
        preparedStatement.setString(2, this.name);

    }

    public void readFields(ResultSet resultSet) throws SQLException {

        this.id = resultSet.getInt(1);
        this.name = resultSet.getString(2);

    }
}

           
  • 本地運作
<dependency>
           <groupId>mysql</groupId>
           <artifactId>mysql-connector-java</artifactId>
           <version>5.1.38</version>
       </dependency>
           
  • JAR 包部署
需要在hadoopNode00中添加MySQL的環境
将mysql的jar包放入/home/hadoop/hadoop-2.6.0/share/hadoop/yarn/ 中即可

           
  • 遠端送出
加上相應的配置屬性即可
System.setProperty("HADOOP_USER_NAME", "root");
        conf.addResource("conf2/core-site.xml");
        conf.addResource("conf2/hdfs-site.xml");
        conf.addResource("conf2/mapred-site.xml");
        conf.addResource("conf2/yarn-site.xml");
        conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");
        conf.set("mapreduce.app-submission.cross-platform", "true");
           

5) 自定義InputFormat

解決小檔案存儲問題,将多個小檔案存放在一個SequenceFile(SequenceFile檔案是Hadoop用來存儲二進制檔案形式的key-value的檔案格式),SequenceFile,存儲的形式為檔案的路徑名稱為key,檔案的内容為value
           
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
package com.yyh.OutputFormat;

import com.yyh.mr.test01.WCMapper;
import com.yyh.mr.test01.WCReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobRunner {
    public static void main(String[] args) throws Exception {

        /*
         * 擷取配置對象
         * */

        Configuration conf = new Configuration();
        /*System.setProperty("HADOOP_USER_NAME", "root");
        conf.addResource("conf2/core-site.xml");
        conf.addResource("conf2/hdfs-site.xml");
        conf.addResource("conf2/mapred-site.xml");
        conf.addResource("conf2/yarn-site.xml");
        conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");
        conf.set("mapreduce.app-submission.cross-platform", "true");*/
        /*
         * 擷取Job對象
         * */
        Job job = Job.getInstance(conf);

        // // 設定jar 類加載器 否則MapReduce架構找不到Map和Reuce
        job.setJarByClass(JobRunner.class);




        /*
         * 設定資料輸入輸出元件
         * */
        job.setInputFormatClass(OwnInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        /*
         *設定資料輸入輸出路徑
         * */

        //CombineTextInputFormat.setMinInputSplitSize(job, 1048576);
        OwnInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\data"));
        //NLineInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\data02"));
        // NLineInputFormat.setNumLinesPerSplit(job,3);
        //TextInputFormat.setInputPaths(job, new Path("/wordcount1.txt"));
        /*
         * 注意: 此輸出路徑不能存在
         * */
        //TextOutputFormat.setOutputPath(job, new Path("/yyh/out8121231233"));
        SequenceFileOutputFormat.setOutputPath(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\out12313"));


        /*
         * 設定MAP 和 REDUCE 處理邏輯
         * */
        job.setMapperClass(FileMapper.class);
        job.setReducerClass(FileReducer.class);

        /*
         * 設定 map任務和reduce任務的輸出泛型
         * */
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);


        //  送出

        //job.submit();

        job.waitForCompletion(true);
    }
}

           
package com.yyh.OutputFormat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {

    @Override
    protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {

        context.write(key, value);

    }
}

           
package com.yyh.OutputFormat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {

        for (BytesWritable value : values) {

            context.write(key, value);
        }

    }
}

           
package com.yyh.OutputFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class OwnInputFormat extends FileInputFormat<Text, BytesWritable> {
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        OwnRecordReader ownRecordReader = new OwnRecordReader();
        ownRecordReader.initialize(inputSplit, taskAttemptContext);
        return ownRecordReader;
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {

        return false;
    }
}

           
package com.yyh.OutputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class OwnRecordReader extends RecordReader<Text, BytesWritable> {

    FileSplit fileSplit;
    Configuration conf;
    BytesWritable value = new BytesWritable();
    Text key = new Text();

    boolean isProgress = true;

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        fileSplit = (FileSplit) inputSplit;

        conf = taskAttemptContext.getConfiguration();


    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (isProgress) {
            byte[] bytes = new byte[(int) fileSplit.getLength()];


            //擷取fs 對象

            /*
             * 目前檔案的路徑
             * */
            Path path = fileSplit.getPath();

            FileSystem fileSystem = path.getFileSystem(conf);

            /*
             * 擷取到檔案的資料流
             * */
            FSDataInputStream inputStream = fileSystem.open(path);


            IOUtils.readFully(inputStream, bytes, 0, bytes.length);

            /*
             * 封裝value
             * */
            value.set(bytes, 0, bytes.length);


            key.set(path.toString());

            IOUtils.closeStream(inputStream);

            isProgress = false;

            return true;

        }

        return false;
    }

    public Text getCurrentKey() throws IOException, InterruptedException {
        return this.key;
    }

    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    public void close() throws IOException {

    }
}

           

(5)Partitioner 元件

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
将不同地區的資料輸出到不同的檔案中
18611781163 700000 10000 hn
18611781161 123 123123 bj  
18611781163 700000 10000 hn
18236529965 123 1223123 tj
18236529964 123123 123 hb
18236529965 546 45645 tj
18611781163 300000 70000 hn
18236529965 123 234523 tj
18236529965 31243 436543 tj
           
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
package com.yyh.partitioner;

import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;

public class OwnPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> {

    private static HashMap<String, Integer> areaMap = new HashMap<String, Integer>();


    static {
        areaMap.put("hn", 0);
        areaMap.put("henna", 0);

        areaMap.put("bj", 1);
        areaMap.put("tj", 2);
        areaMap.put("hb", 3);
    }

    public int getPartition(KEY key, VALUE value, int i) {


        return areaMap.get(key.toString()) == null ? 5 : areaMap.get(key.toString());
    }
}

           
package com.yyh.partitioner;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {

    private String phone;
    private Long upFlow;
    private Long downFlow;
    private Long sumFlow;

    public FlowBean() {
    }

    public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) {
        this.phone = phone;
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = sumFlow;
    }


    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public Long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Long upFlow) {
        this.upFlow = upFlow;
    }

    public Long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Long downFlow) {
        this.downFlow = downFlow;
    }

    public Long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(Long sumFlow) {
        this.sumFlow = sumFlow;
    }


    @Override
    public String toString() {
        return "" +
                "phone='" + phone + '\'' +
                " upFlow=" + upFlow +
                " downFlow=" + downFlow +
                " sumFlow=" + sumFlow ;
    }

    /*
     * 序列化 編碼
     * */
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.phone);
        dataOutput.writeLong(this.upFlow);
        dataOutput.writeLong(this.downFlow);
        dataOutput.writeLong(this.sumFlow);
    }


    /*
     * 反序列化  解碼
     *f */
    public void readFields(DataInput dataInput) throws IOException {

        this.phone = dataInput.readUTF();
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();

    }
}
           
package com.yyh.partitioner;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    /*
     * 18611781163 700000 10000 hn
     * 18611781163 700000 10000 hn
     * 18611781163 700000 10000 hn
     * */

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();

        String[] data = line.split(" ");

        /*
         *  phone
         * */
        context.write(new Text(data[3]), new FlowBean(data[0], Long.valueOf(data[1]), Long.valueOf(data[2]), (Long.valueOf(data[1]) + Long.valueOf(data[2]))));



    }
}

           
package com.yyh.partitioner;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
 * 18611781163  FlowBean[]
 *
 * */
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

        Long up = 0L;
        Long down = 0L;
        Long sum = 0L;
        String phone = "";


        for (FlowBean flowBean : values) {

            up += flowBean.getUpFlow();
            down += flowBean.getDownFlow();
            sum += flowBean.getSumFlow();
            phone = flowBean.getPhone();

        }

        context.write(key, new FlowBean(phone, up, down, sum));

    }
}

           
package com.yyh.partitioner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class FlowRunner {
    public static void main(String[] args) throws Exception {
        //System.setProperty("HADOOP_USER_NAME", "root");

        Configuration conf = new Configuration();
       /* conf.addResource("conf2/core-site.xml");
        conf.addResource("conf2/hdfs-site.xml");
        conf.addResource("conf2/mapred-site.xml");
        conf.addResource("conf2/yarn-site.xml");
        conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");
        conf.set("mapreduce.app-submission.cross-platform", "true");
*/
        Job job = Job.getInstance(conf);


        job.setJarByClass(FlowRunner.class);
        job.setPartitionerClass(OwnPartitioner.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);



        TextInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\flow02.dat"));

        TextOutputFormat.setOutputPath(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\out131"));
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);


        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(FlowBean.class);

        job.setNumReduceTasks(4);

        job.waitForCompletion(true);

    }
}

           

(6)OutputFormat

自定義輸出

package com.yyh.outformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FileMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(NullWritable.get(), value);

    }
}

           
package com.yyh.outformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FileReducer extends Reducer<NullWritable, Text, NullWritable, Text> {

    @Override
    protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        for (Text value : values) {
            context.write(NullWritable.get(), value);
        }


    }
}

           
package com.yyh.outformat;

import com.yyh.mr.test01.WCMapper;
import com.yyh.mr.test01.WCReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobRunner {
    public static void main(String[] args) throws Exception {



        /*
         * 擷取配置對象
         * */

        Configuration conf = new Configuration();
        /*System.setProperty("HADOOP_USER_NAME", "root");
        conf.addResource("conf2/core-site.xml");
        conf.addResource("conf2/hdfs-site.xml");
        conf.addResource("conf2/mapred-site.xml");
        conf.addResource("conf2/yarn-site.xml");
        conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");
        conf.set("mapreduce.app-submission.cross-platform", "true");*/
        /*
         * 擷取Job對象
         * */
        Job job = Job.getInstance(conf);

        // // 設定jar 類加載器 否則MapReduce架構找不到Map和Reuce
        job.setJarByClass(JobRunner.class);





        /*
         * 設定資料輸入輸出元件
         * */
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(OwnOutputFormat.class);

        /*
         *設定資料輸入輸出路徑
         * */

        //CombineTextInputFormat.setMinInputSplitSize(job, 1048576);
        //CombineTextInputFormat.setInputPaths(job, new Path(" v  "));
        //NLineInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\data02"));
        // NLineInputFormat.setNumLinesPerSplit(job,3);
        TextInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\flow.dat"));
        /*
         * 注意: 此輸出路徑不能存在
         * */
        //TextOutputFormat.setOutputPath(job, new Path("/yyh/out8121231233"));
        OwnOutputFormat.setOutputPath(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\outyyh"));


        /*
         * 設定MAP 和 REDUCE 處理邏輯
         * */
        job.setMapperClass(FileMapper.class);
        job.setReducerClass(FileReducer.class);

        /*
         * 設定 map任務和reduce任務的輸出泛型
         * */
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);


        //  送出

        //job.submit();

        job.waitForCompletion(true);
    }
}

           
package com.yyh.outformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OwnOutputFormat extends FileOutputFormat<NullWritable, Text> {
    public RecordWriter<NullWritable, Text> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        try {
            return new OwnRecordWriter(taskAttemptContext);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

           
package com.yyh.outformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class OwnRecordWriter extends RecordWriter<NullWritable, Text> {
    FileSystem fileSystem;

    FSDataOutputStream outputStream;

    public OwnRecordWriter(TaskAttemptContext taskAttemptContext) throws Exception {
        fileSystem = FileSystem.get(taskAttemptContext.getConfiguration());

        outputStream = fileSystem.create(new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\testoutputforamt.txt"));

    }

    public void write(NullWritable nullWritable, Text text) throws IOException, InterruptedException {
        outputStream.write(text.getBytes());

    }

    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        IOUtils.closeStream(outputStream);
        fileSystem.close();
    }
}

           

(7)Combiner 元件

  • Conbiner是MR程式中Mapper和Reducer之外的一種元件
  • Combiner的元件的父類就是Reducer
  • Combiner和Reucer的差別在于運作的位置
Combiner是在每一個MapTask的節點上運作  (局部彙總)
Reducer是接收全局的所有的Mapper結果再進行處理 (全局彙總)
           
  • Combiner的意義就是對于每一個MapTask的輸出進行局部彙總,減少網絡傳輸量
  • Combiner能夠運用的前提是不能影響最終業務結果(累加操作不會影響)而且 Combiner的輸出KV 應該能跟Reducer的KV相對應

應用場景

Combiner并不是适用于所有的場景

1. 并不是所有場景都可以使用Combiner,必須滿足結果可以累加
2. 适合求和,但不适合求平均數   Avg(0,20,10,25,15)=14 | avg(0,20,10)=10  avg(25,15)=20   avg(10,20)=15,通過上述案例可以發現顯然這裡不适合使用Combiner
           

使用

  • 建立CombinerClass繼承Reducer ,job.setCombinerClass();
  • 直接使用ReducerClass 作為CombinerClass

案例

// 核心代碼  注意必須滿足累加
job.setCombinerClass(WCReducer.class);
           
  • 沒有使用Combiner
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
  • 使用Combiner
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce

4.10 MR 過程

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
MR架構是使用InputFormat為map所需的資料進行預處理,并為其提供資料。兩個功能:切片,封裝keyvalue
因為InputSplit為邏輯切分而非實體拆分,是以說還需要RecoderReader根據InputSplit中的資訊裡處理InputSplit中的具體資訊,加載資料并轉換為合适的Map任務的keyvalue,輸入給Map任務

Map是自定義的邏輯,根據InputFormat給定的相應資料結合場景進行相應的處理


為了讓Reducer可以并處理Map的處理結果,需要對map的輸出結果進行一定的分區(Partition)、排序(Sort)、合并(Combine)、歸并(Merge)等操作,得到keyvalue形式的中間結果,再交給Reducer處理,這個過程就是Shuffle,從無序的keyvalue到有序有分區的keyvalue,這個過程稱之為Shuffle很形象。

Reducer是自定義的邏輯,根據從不同的MapTask 節點拿過來的給定的相應資料結合場景進行相應的處理

OutputFormat進行輸出,輸出至分布式檔案系統
           

4.11 Shuffle

Shuffle過程是MapReducer的核心,描述這資料從map task輸出到reduce task的過程。

Hadoop的叢集環境,大部分的map task和reduce task 是執行在不同的節點上的,那麼reduce就要取得map的輸出結果,一般就需要在不同的節點上去拉取;那麼叢集中運作的很多個Job,task的執行會對叢集中網絡資源消耗嚴重,雖說這種消耗是正常的,不可避免的,但是可以采取措施減少不必要的網絡消耗,另一方面,每個節點内部,向對比于記憶體,磁盤IO對Job的完成時間影響較大。

是以說:從以上進行分析,shuffle的過程基本要求:

  • 完整的從map task 端拉取資料到reduce task端
  • 在拉取資料的過程中,盡可能減少網絡消耗
  • 盡可能的減少磁盤IO 對task執行效率的影響

shuffle過程

(1)Map端的shuffle

org.apache.hadoop.mapred.MapTask
-|MapOutputBuffer
 -|init()
           

Map的輸出結果首先被緩存到記憶體,當緩存區(環狀緩沖區)達到80% (預設大小為100MB),就會啟動溢寫操作,目前啟動溢寫操作時,首先把緩存中的資料進行分區,對每個分區的資料進行排序和合并。之後再寫入到磁盤中,每次溢寫 都會生成新的磁盤檔案,随着Job執行,被溢寫出到磁盤的檔案會越來越多,在Map任務全部結束之前,這些溢寫檔案會被歸并成一個大的磁盤檔案,然後通知相應的Reduce任務來領取屬于自己的資料。

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
  • map輸入結果寫入緩沖區
  • 緩沖區達到門檻值(觸發溢寫的百分比),溢寫到磁盤中
  • 分區内排序合并最後歸并成大檔案(key,value[])

(2)Reduce 端的Shuffle

Reduce任務從Map端的不用的Map機器領回屬于自己的處理那部分資料,然後對資料進行處理

Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
  • 領取資料
  • 歸并資料
  • 資料給reduce任務

4.11 程式設計案例

(1)WordCount

(2)PV UV 的統計

pv 網站的總通路數量 算總數

uv 獨立活躍使用者(日活,月活) 去重(UUID 能代表使用者唯一為key)

(3)流量統計之對象輸出

(4)流量統計之對象排序輸出

package com.yyh.流量統計之對象排序輸出;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

    private String phone;
    private Long upFlow;
    private Long downFlow;
    private Long sumFlow;

    public FlowBean() {
    }

    public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) {
        this.phone = phone;
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = sumFlow;
    }


    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public Long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Long upFlow) {
        this.upFlow = upFlow;
    }

    public Long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Long downFlow) {
        this.downFlow = downFlow;
    }

    public Long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(Long sumFlow) {
        this.sumFlow = sumFlow;
    }


    @Override
    public String toString() {
        return "" +
                "phone='" + phone + '\'' +
                " upFlow=" + upFlow +
                " downFlow=" + downFlow +
                " sumFlow=" + sumFlow;
    }

    /*
     * 序列化 編碼
     * */
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.phone);
        dataOutput.writeLong(this.upFlow);
        dataOutput.writeLong(this.downFlow);
        dataOutput.writeLong(this.sumFlow);
    }


    /*
     * 反序列化  解碼
     *f */
    public void readFields(DataInput dataInput) throws IOException {

        this.phone = dataInput.readUTF();
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();

    }

    public int compareTo(FlowBean o) {

        return this.sumFlow > o.sumFlow ? -1 : 1;
    }
}

           
package com.yyh.流量統計之對象排序輸出;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> {

    /*
     * 18611781163 700000 10000 hn
     * 18611781163 700000 10000 hn
     * 18611781163 700000 10000 hn
     * */

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();

        String[] data = line.split(" ");

        context.write(new FlowBean(data[0], Long.valueOf(data[1]), Long.valueOf(data[2]), (Long.valueOf(data[2]) + Long.valueOf(data[1]))), NullWritable.get());


    }
}

           
package com.yyh.流量統計之對象排序輸出;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
 * 18611781163  FlowBean[]
 *
 * */
public class FlowReducer extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable> {

    @Override
    protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

       context.write(key,NullWritable.get());
    }
}

           
package com.yyh.流量統計之對象排序輸出;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobRunner {

    public static void main(String[] args) throws Exception {

        //System.setProperty("HADOOP_USER_NAME", "root");

        Configuration conf = new Configuration();
       /* conf.addResource("conf2/core-site.xml");
        conf.addResource("conf2/hdfs-site.xml");
        conf.addResource("conf2/mapred-site.xml");
        conf.addResource("conf2/yarn-site.xml");
        conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");
        conf.set("mapreduce.app-submission.cross-platform", "true");
*/
        Job job = Job.getInstance(conf);


        job.setJarByClass(JobRunner.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);


        TextInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\flow.dat"));

        TextOutputFormat.setOutputPath(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\out4"));
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);


        job.setOutputKeyClass(FlowBean.class);
        job.setOutputValueClass(NullWritable.class);

        job.setNumReduceTasks(1);

        job.waitForCompletion(true);
    }
}

           

(5)流量統計之對象排序分區輸出

package com.yyh.流量統計之對象排序分區輸出;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

    private String phone;
    private Long upFlow;
    private Long downFlow;
    private Long sumFlow;
    private String area;

    public FlowBean() {
    }

    public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow, String area) {
        this.phone = phone;
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = sumFlow;
        this.area = area;
    }


    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public Long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Long upFlow) {
        this.upFlow = upFlow;
    }

    public Long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Long downFlow) {
        this.downFlow = downFlow;
    }

    public Long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(Long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public String getArea() {
        return area;
    }

    public void setArea(String area) {
        this.area = area;
    }

    @Override
    public String toString() {
        return "FlowBean{" +
                "phone='" + phone + '\'' +
                ", upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                ", area='" + area + '\'' +
                '}';
    }

    /*
     * 序列化 編碼
     * */
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.phone);
        dataOutput.writeLong(this.upFlow);
        dataOutput.writeLong(this.downFlow);
        dataOutput.writeLong(this.sumFlow);
        dataOutput.writeUTF(this.area);
    }


    /*
     * 反序列化  解碼
     *f */
    public void readFields(DataInput dataInput) throws IOException {

        this.phone = dataInput.readUTF();
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
        this.area = dataInput.readUTF();
    }

    public int compareTo(FlowBean o) {

        return this.sumFlow > o.sumFlow ? -1 : 1;
    }
}

           
package com.yyh.流量統計之對象排序分區輸出;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> {

    /*
     * 18611781163 700000 10000 hn
     * 18611781163 700000 10000 hn
     * 18611781163 700000 10000 hn
     * */

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();

        String[] data = line.split(" ");

        /*
         *  phone
         * */
        context.write(new FlowBean(data[0], Long.valueOf(data[1]), Long.valueOf(data[2]), (Long.valueOf(data[1]) + Long.valueOf(data[2])),data[3]),NullWritable.get());



    }
}

           
package com.yyh.流量統計之對象排序分區輸出;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
 * 18611781163  FlowBean[]
 *
 * */
public class FlowReducer extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable> {

    @Override
    protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {


        context.write(key, NullWritable.get());

    }
}

           
package com.yyh.流量統計之對象排序分區輸出;

import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;

public class OwnPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> {

    private static HashMap<String, Integer> areaMap = new HashMap<String, Integer>();


    static {
        areaMap.put("hn", 0);
        areaMap.put("henna", 0);

        areaMap.put("zz", 1);
        areaMap.put("kf", 2);
        areaMap.put("bj", 3);
        areaMap.put("xy", 4);
    }

    public int getPartition(KEY key, VALUE value, int i) {

        FlowBean flowBean = (FlowBean) key;

        return areaMap.get(flowBean.getArea()) == null ? 5 : areaMap.get(flowBean.getArea());
    }
}

           

(6)學生成績之合并檔案(表連接配接)

需求:

student_info.txt

gjf 00001
gzy 00002
jzz 00003
zkf 00004
           

student_info_class.txt

00001 yuwen
00001 shuxue
00002 yinyue
00002 yuwen
00003 tiyu
00003 shengwu
00004 tiyu
00004 wuli
           

結果:

00001 gjf yuwen shuxue
00002 gzy yinyue yuwen
           
package com.yyh.合并檔案表連接配接;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobRunner {
    public static void main(String[] args) throws Exception {
        //System.setProperty("HADOOP_USER_NAME", "root");

        Configuration conf = new Configuration();
       /* conf.addResource("conf2/core-site.xml");
        conf.addResource("conf2/hdfs-site.xml");
        conf.addResource("conf2/mapred-site.xml");
        conf.addResource("conf2/yarn-site.xml");
        conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");
        conf.set("mapreduce.app-submission.cross-platform", "true");
*/
        Job job = Job.getInstance(conf);


        job.setJarByClass(JobRunner.class);
        //job.setPartitionerClass(OwnPartitioner.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);


        TextInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\classinfo"));

        TextOutputFormat.setOutputPath(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\out7"));
        job.setMapperClass(StuMapper.class);
        job.setReducerClass(StuReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);


        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setNumReduceTasks(1);

        job.waitForCompletion(true);

    }
}

           
package com.yyh.合并檔案表連接配接;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;

import java.io.IOException;

public class StuMapper extends Mapper<LongWritable, Text, Text, Text> {


    public static final String STU_INFO = "student_info.txt";
    public static final String STU_INFO_CLASS = "student_info_class.txt";
    public static final String STU_INFO_FLAG = "a";
    public static final String STU_INFO_CLASS_FLAG = "b";

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        FileSplit inputSplit = (FileSplit) context.getInputSplit();

        String filname = inputSplit.getPath().getName();

        String[] data = value.toString().split(" ");
        String userid = "";
        String flag = "";
        String valueName = "";

        if (filname.contains(STU_INFO)) {

            userid = data[1];
            flag = STU_INFO_FLAG;

            /*
             * 名字
             * */
            valueName = data[0];


        }

        if (filname.contains(STU_INFO_CLASS)) {


            userid = data[0];
            flag = STU_INFO_CLASS_FLAG;
            /*
            學科
            * */
            valueName = data[1];
        }

        context.write(new Text(userid), new Text(flag + " " + valueName));

    }
}

           
package com.yyh.合并檔案表連接配接;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;

/*
 *userid  | 标示+學科或者名字
 * 0001 |a gjf
 * 0001 |b yuwen
 * 0001 |b shuxue
 * */
public class StuReducer extends Reducer<Text, Text, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {


        String classList = "";

        String name = "";


        for (Text value : values) {

            String[] data = value.toString().split(" ");

            if (data[0].equals("a")) {
                name = data[1];

            }
            if (data[0].equals("b")) {

                classList += " " + data[1];

            }

        }


        context.write(new Text(key.toString() + " " + name + classList), NullWritable.get());

    }
}

           
Hadoop系列之HDFS和MapReduce的使用一、概述二、Hadoop三、HDFS四、MapReduce
Configuration conf = new Configuration();
   /* conf.addResource("conf2/core-site.xml");
    conf.addResource("conf2/hdfs-site.xml");
    conf.addResource("conf2/mapred-site.xml");
    conf.addResource("conf2/yarn-site.xml");
    conf.set(MRJobConfig.JAR, "G:\\IDEA_WorkSpace\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");
    conf.set("mapreduce.app-submission.cross-platform", "true");
           

*/

Job job = Job.getInstance(conf);

job.setJarByClass(JobRunner.class);
    //job.setPartitionerClass(OwnPartitioner.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);


    TextInputFormat.setInputPaths(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\classinfo"));

    TextOutputFormat.setOutputPath(job, new Path("G:\\Note\\Day02-Hadoop\\資料檔案\\out7"));
    job.setMapperClass(StuMapper.class);
    job.setReducerClass(StuReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);


    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

    job.setNumReduceTasks(1);

    job.waitForCompletion(true);

}
           

}

~~~java
package com.yyh.合并檔案表連接配接;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;

import java.io.IOException;

public class StuMapper extends Mapper<LongWritable, Text, Text, Text> {


    public static final String STU_INFO = "student_info.txt";
    public static final String STU_INFO_CLASS = "student_info_class.txt";
    public static final String STU_INFO_FLAG = "a";
    public static final String STU_INFO_CLASS_FLAG = "b";

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        FileSplit inputSplit = (FileSplit) context.getInputSplit();

        String filname = inputSplit.getPath().getName();

        String[] data = value.toString().split(" ");
        String userid = "";
        String flag = "";
        String valueName = "";

        if (filname.contains(STU_INFO)) {

            userid = data[1];
            flag = STU_INFO_FLAG;

            /*
             * 名字
             * */
            valueName = data[0];


        }

        if (filname.contains(STU_INFO_CLASS)) {


            userid = data[0];
            flag = STU_INFO_CLASS_FLAG;
            /*
            學科
            * */
            valueName = data[1];
        }

        context.write(new Text(userid), new Text(flag + " " + valueName));

    }
}

           
package com.yyh.合并檔案表連接配接;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;

/*
 *userid  | 标示+學科或者名字
 * 0001 |a gjf
 * 0001 |b yuwen
 * 0001 |b shuxue
 * */
public class StuReducer extends Reducer<Text, Text, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {


        String classList = "";

        String name = "";


        for (Text value : values) {

            String[] data = value.toString().split(" ");

            if (data[0].equals("a")) {
                name = data[1];

            }
            if (data[0].equals("b")) {

                classList += " " + data[1];

            }

        }


        context.write(new Text(key.toString() + " " + name + classList), NullWritable.get());

    }
}

           

[外鍊圖檔轉存中…(img-I9C61YDS-1573779970355)]

繼續閱讀