天天看點

HadoopHadoop筆記zkfc: zookeeper failover controller

Author: 李金輝

Wechat:m04194514

Hadoop筆記

一、概述

大資料

大資料(Big Data)是指無法在一定時間範圍内用正常軟體工具進行捕捉、處理和管理的資料集合,需要新處理模式才能具有更強的決策力、洞察發現力和流程優化能力的海量、高增長率和多樣化的資訊資産。

大資料的5V特點(IBM提出):

  • Volume(大量)
  • Velocity(高速)
  • Variety(多樣)
  • Value(低價值密度)
  • Veracity(真實性)

Hadoop是什麼

http://hadoop.apache.org

Apache Hadoop是一個開源、可靠、可擴充的分布式計算架構。

Hadoop架構允許使用者在一個超大規模的伺服器叢集中,對大資料集進行分布式的處理計算。Hadoop叢集規模可以由單個(僞分布式叢集)或者上千台的商用伺服器(完全分布式叢集)構成。Hadoop叢集中的每一個伺服器都提供了本地計算和存儲的能力。Hadoop架構并不是通過硬體實作的高可用,而是通過應用層檢測處理錯誤,這樣的話Hadoop叢集就可以建立在廉價的商用伺服器上。

  • 狹義的Hadoop(六大子產品)
    • Hadoop Common:Hadoop架構通用支援庫
    • Hadoop Distributed File System (HDFS™):分布式檔案系統提供了高吞吐能力的資料通路
    • Hadoop YARN:一個用來做任務的排程和分布式叢集的資源管理的架構
    • Hadoop MapReduce:基于YARN的系統,對大資料集進行分布式的并行計算處理
    • Hadoop Ozone:Hadoop對象存儲系統
    • Hadoop Submarine:機器學習的引擎
  • 廣義的Hadoop(泛指生态體系)
    • Apache HBase:Big Table,用來存儲海量的結構化資料
    • Apache Zookeeper(動物園管理者):分布式協調服務系統,主要解決Hadoop生态體系中各個分布式系統存在的一些通用問題
    • Apache Hive(小蜜蜂):資料倉庫的基礎設施,用來簡化Hadoop的操作
    • Apache Flume(資料采集): 負責采集各種類型的資料,并且進行簡單的預處理操作
    • Apache Spark(Scala語言): 更為高效的分布式計算引擎
    • Apache Flink: 高效的分布式計算引擎(第三代資料分析引擎)
HadoopHadoop筆記zkfc: zookeeper failover controller

二、HDFS

HDFS是Hadoop的分布式檔案系統( Hadoop Distributed File System ),類似于其它的分布式檔案系統。HDFS支援高度容錯,可以部署在廉價的硬體裝置上,特别适宜于大型資料集的分布式存儲。

Google開源論文GFS的開源實作

環境搭建

建構HDFS的僞分布式叢集(使用單台機器,模拟HDFS叢集的所有服務)
  • 安裝CentOS-7.x
    CentOS-7.2版本
  • 配置網絡
    # ip addr 檢視目前的伺服器網絡設定
    # vi /etc/sysconfig/network-scripts/ifcfg-ens33 修改伺服器網絡配置檔案的參數
    # 修改配置檔案中的ONBOOT=yes
    # 重新開機生效
    systemctl restart network
               
  • 關閉防火牆
    [[email protected] ~]# systemctl stop firewalld
    [[email protected] ~]# systemctl disable firewalld
    Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
    Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
               
  • 修改伺服器的主機名
    # 簡化連接配接伺服器操作
    [[email protected] ~]# vi /etc/hostname
    # 删除localhost,新增hadoop(自定義的主機名)
               
  • 配置主機名和ip位址的映射關系
    [[email protected] ~]# vi /etc/hosts
    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    # 最後一行添加目前伺服器的ip位址和主機名映射
    192.168.12.129  hadoop
    
    # 測試
    [[email protected] ~]# ping hadoop
    PING hadoop (192.168.12.129) 56(84) bytes of data.
    64 bytes from hadoop (192.168.12.129): icmp_seq=1 ttl=64 time=0.107 ms
    64 bytes from hadoop (192.168.12.129): icmp_seq=2 ttl=64 time=0.053 ms
               
  • 配置SSH(遠端免密登入)
    [[email protected] ~]# ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
    Generating public/private rsa key pair.
    Your identification has been saved in /root/.ssh/id_rsa.
    Your public key has been saved in /root/.ssh/id_rsa.pub.
    The key fingerprint is:
    SHA256:/VJcuTQzpC4EDqiiEKWwwtYAqS9Von3ssc12fM+ldvQ [email protected]
    The key's randomart image is:
    +---[RSA 2048]----+
    |++.  .. .     .  |
    |=o+ o  o .   o . |
    |=* *    . . . B  |
    |B + +    o o o = |
    |o+ o = .S o + .  |
    |o . o + o .+  o  |
    | .   . . ..o.+ . |
    |           .= . E|
    |           . .   |
    +----[SHA256]-----+
    [[email protected] ~]#
    [[email protected] ~]# cd .ssh/
    [[email protected] .ssh]# ll
    總用量 12
    -rw-------. 1 root root 1679 8月  12 15:45 id_rsa
    -rw-r--r--. 1 root root  393 8月  12 15:45 id_rsa.pub
    -rw-r--r--. 1 root root  183 8月  12 15:43 known_hosts
    [[email protected] .ssh]# cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
    [[email protected] .ssh]# ll
    總用量 16
    -rw-r--r--. 1 root root  393 8月  12 15:47 authorized_keys
    -rw-------. 1 root root 1679 8月  12 15:45 id_rsa
    -rw-r--r--. 1 root root  393 8月  12 15:45 id_rsa.pub
    -rw-r--r--. 1 root root  183 8月  12 15:43 known_hosts
    [[email protected] .ssh]# chmod 0600 ~/.ssh/authorized_keys
    [[email protected] .ssh]#
    [[email protected] .ssh]# ssh hadoop
    Last login: Mon Aug 12 15:43:18 2019 from 192.168.12.1
               
  • 安裝JDK1.8+
[[email protected] ~]# rpm -ivh jdk-8u191-linux-x64.rpm
  警告:jdk-8u191-linux-x64.rpm: 頭V3 RSA/SHA256 Signature, 密鑰 ID ec551f03: NOKEY
  準備中...                          ################################# [100%]
  正在更新/安裝...
     1:jdk1.8-2000:1.8.0_191-fcs    ################################# [100%]
  Unpacking JAR files...
          tools.jar...
          plugin.jar...
          javaws.jar...
          deploy.jar...
          rt.jar...
          jsse.jar...
          charsets.jar...
          localedata.jar...
  [[email protected] ~]# java -version
  java version "1.8.0_191"
  Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
  Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
           
  • 安裝Hadoop
  • 修改HDFS叢集的配置檔案
[[email protected] hadoop-2.6.0]# vi etc/hadoop/core-site.xml
  <property>
     <name>fs.defaultFS</name>
     <value>hdfs://hadoop:9000</value>
  </property>
  <property>
     <name>hadoop.tmp.dir</name>
     <value>/usr/hadoop-2.6.0/hadoop-${user.name}</value>
  </property>
  
  [[email protected] hadoop-2.6.0]# vi etc/hadoop/hdfs-site.xml
  <property>
      <name>dfs.replication</name>
      <value>1</value>
  </property>
  
  [[email protected] hadoop-2.6.0]# vi etc/hadoop/slaves
  hadoop
           
  • 添加環境變量配置
[[email protected] ~]# vi .bashrc
  HADOOP_HOME=/usr/hadoop-2.6.0
  JAVA_HOME=/usr/java/latest
  CLASSPATH=.
  PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
  export JAVA_HOME
  export CLASSPATH
  export PATH
  export HADOOP_HOME
  [[email protected] ~]# source .bashrc
           

服務啟動

  • 初始化操作

NOTE:

初始化操作隻需要在第一次啟動HDFS叢集之前執行,後續不需要再次執行,直接跳過啟動服務即可

  • 啟動HDFS叢集
[[email protected] ~]# start-dfs.sh
Starting namenodes on [hadoop]
hadoop: starting namenode, logging to /usr/hadoop-2.6.0/logs/hadoop-root-namenode-hadoop.out
hadoop: starting datanode, logging to /usr/hadoop-2.6.0/logs/hadoop-root-datanode-hadoop.out
Starting secondary namenodes [0.0.0.0]
The authenticity of host '0.0.0.0 (0.0.0.0)' can't be established.
ECDSA key fingerprint is SHA256:yDvdRHO65GeTfU6PJQjEKMap+lEZb8a/JeuesbTsMYs.
ECDSA key fingerprint is MD5:d4:bf:fe:86:d3:ed:2d:fc:5f:a2:2b:e5:86:0c:ae:ee.
Are you sure you want to continue connecting (yes/no)? yes
0.0.0.0: Warning: Permanently added '0.0.0.0' (ECDSA) to the list of known hosts.
0.0.0.0: starting secondarynamenode, logging to /usr/hadoop-2.6.0/logs/hadoop-root-secondarynamenode-hadoop.out
           
  • 驗證服務是否啟動成功
# 1. java的指令 jps,檢視java程序清單
[[email protected] ~]# jps
10995 SecondaryNameNode  # HDFS小秘
10796 NameNode     # HDFS Master
10877 DataNode     # HDFS Slaves

# 2. 通路HDFS的Web UI
http://伺服器位址:50070

# 3. 檢視分布式系統日志
[[email protected] hadoop-2.6.0]# cd logs/
[[email protected] logs]# ll
總用量 92
-rw-r--r--. 1 root root 24249 8月  12 16:12 hadoop-root-datanode-hadoop.log
-rw-r--r--. 1 root root   714 8月  12 16:12 hadoop-root-datanode-hadoop.out
-rw-r--r--. 1 root root 30953 8月  12 16:17 hadoop-root-namenode-hadoop.log
-rw-r--r--. 1 root root   714 8月  12 16:12 hadoop-root-namenode-hadoop.out
-rw-r--r--. 1 root root 22304 8月  12 16:13 hadoop-root-secondarynamenode-hadoop.log
-rw-r--r--. 1 root root   714 8月  12 16:12 hadoop-root-secondarynamenode-hadoop.out
-rw-r--r--. 1 root root     0 8月  12 16:12 SecurityAuth-root.audit
           
  • 關閉服務

指令操作

HDFS分布式檔案系統,操作類似于Linux檔案系統

比如Linux:cp、mv、rm、cat、mkdir 常用指令非常類似

文法:

hdfs dfs -參數

Usage: hadoop fs [generic options]
        [-appendToFile <localsrc> ... <dst>]
        [-cat [-ignoreCrc] <src> ...]  # 檢視文本檔案内容
        [-checksum <src> ...]
        [-chgrp [-R] GROUP PATH...]    # 修改屬組
        [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]  # 修改權限
        [-chown [-R] [OWNER][:[GROUP]] PATH...]  # 修改屬主
        [-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]  # 從本地拷貝到HDFS
        [-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]  # 從HDFS拷貝到本地
        [-count [-q] [-h] <path> ...]   # 計數
        [-cp [-f] [-p | -p[topax]] <src> ... <dst>]  # 拷貝
        [-createSnapshot <snapshotDir> [<snapshotName>]]
        [-deleteSnapshot <snapshotDir> <snapshotName>]
        [-df [-h] [<path> ...]]  
        [-du [-s] [-h] <path> ...]  
        [-expunge]
        [-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]   # 下載下傳
        [-getfacl [-R] <path>]
        [-getfattr [-R] {-n name | -d} [-e en] <path>]
        [-getmerge [-nl] <src> <localdst>]
        [-help [cmd ...]]   # 幫助
        [-ls [-d] [-h] [-R] [<path> ...]]  # 檢視目錄清單
        [-mkdir [-p] <path> ...]   # 建立檔案夾
        [-moveFromLocal <localsrc> ... <dst>]  # 從本地移動到HDFS
        [-moveToLocal <src> <localdst>]   # 将HDFS中的檔案移動到本地
        [-mv <src> ... <dst>]   # HDFS中的檔案或檔案夾的移動
        [-put [-f] [-p] [-l] <localsrc> ... <dst>]   # 上傳
        [-renameSnapshot <snapshotDir> <oldName> <newName>]
        [-rm [-f] [-r|-R] [-skipTrash] <src> ...]  # 删除
        [-rmdir [--ignore-fail-on-non-empty] <dir> ...]  # 删除檔案夾
        [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
        [-setfattr {-n name [-v value] | -x name} <path>]
        [-setrep [-R] [-w] <rep> <path> ...]
        [-stat [format] <path> ...]
        [-tail [-f] <file>]   # 檢視文本檔案的末尾内容
        [-test -[defsz] <path>]  
        [-text [-ignoreCrc] <src> ...]
        [-touchz <path> ...]
        [-usage [cmd ...]]
           

JAVA API操作

  • 環境搭建(以windows平台為例)
    • 解壓縮Hadoop的安裝包
      # 如解壓縮安裝到E:\\根目錄
                 
    • 拷貝相容檔案到安裝目錄下的bin檔案夾中

      [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-DSuUicVE-1590287261139)(D:\Learnspace\training camp\day01\圖檔\2019081202.png)]

    • 在windows的hosts檔案中添加主機名和IP位址的映射關系

      [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-XtN6Dy84-1590287261141)(D:\Learnspace\training camp\day01\圖檔\2019081203.png)]

    • 重新開機開發工具
    • 配置HADOOP_HOME環境變量
  • 實戰
    • 建立Maven工程,并導入HDFS Client Driver
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>2.6.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>2.6.0</version>
      </dependency>
      <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
      </dependency>
                 
    • 測試代碼
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.FSDataInputStream;
      import org.apache.hadoop.fs.FSDataOutputStream;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.fs.permission.FsAction;
      import org.apache.hadoop.fs.permission.FsPermission;
      import org.apache.hadoop.io.IOUtils;
      import org.junit.After;
      import org.junit.Before;
      import org.junit.Test;
      
      import java.io.FileInputStream;
      import java.io.FileOutputStream;
      import java.io.IOException;
      import java.net.URI;
      import java.net.URISyntaxException;
      
      /**
       * hdfs java api測試
       * FileSystem
       */
      public class HDFSDemo {
      
          /**
           * hdfs 用戶端操作對象
           */
          private FileSystem fileSystem = null;
          private Configuration configuration = null;
      
          @Before
          public void doBefore() throws URISyntaxException, IOException {
              URI uri = new URI("hdfs://hadoop:9000");
              configuration = new Configuration();
              fileSystem = FileSystem.get(uri, configuration);
          }
      
          /**
           * 檔案上傳
           *    put
           *    copyFromLocal
           *    moveFromLocal
           *
           * @org.apache.hadoop.security.AccessControlException: Permission denied: user=Administrator, access=WRITE, inode="/baizhi":root:supergroup:drwxr-xr-x
           * 解決方案:
           1. 修改權限  (UGO) o+w
           2. 修改操作hdfs使用者身份:-DHADOOP_USER_NAME=root
           3. 關閉hdfs權限檢查功能: hdfs-site.xml
           <property>
          	<name>dfs.permissions.enabled</name>
           	<value>false</value>
           </property>
           */
          @Test
          public void testUpload() throws IOException {
              Path src = new Path("G:\\apache-tomcat-7.0.85.zip");
              Path dst = new Path("/baizhi");
              fileSystem.copyFromLocalFile(src, dst);
          }
      
          @Test
          public void testUpload2() throws IOException {
              FileInputStream src = new FileInputStream("F:\\生态圖.png");
              Path dst = new Path("/baizhi/test");
              FSDataOutputStream dstOutputStream = fileSystem.create(dst);
              IOUtils.copyBytes(src, dstOutputStream, configuration);
          }
      
          /**
           * 下載下傳檔案
           *    get
           *    copyToLocal
           *    moveToLocal
           */
          @Test
          public void testDownload() throws IOException {
              Path src = new Path("/baizhi/test");
              Path dst = new Path("G:\\1.png");
              fileSystem.copyToLocalFile(src, dst);
          }
      
          @Test
          public void testDownload2() throws IOException {
              FSDataInputStream inputStream = fileSystem.open(new Path("/baizhi/test"));
              FileOutputStream outputStream = new FileOutputStream("G:\\2.png");
              IOUtils.copyBytes(inputStream, outputStream, configuration);
          }
      
          /**
           * 删除檔案
           */
          @Test
          public void testDelete() throws IOException {
              // fileSystem.delete(new Path("/baizhi/test"),false);
              // true代表遞歸删除
              fileSystem.delete(new Path("/baizhi"), true);
          }
      
          @Test
          public void testOther() throws IOException {
              // rwxrw-r--  /baizhi
              // fileSystem.mkdirs(new Path("/baizhi"), new FsPermission(FsAction.ALL, FsAction.READ_WRITE, FsAction.READ));
              boolean exists = fileSystem.exists(new Path("/baizhi"));
              System.out.println(exists?"存在":"不存在");
          }
      
          @After
          public void doAfter() throws IOException {
              fileSystem.close();
          }
      }
                 

HDFS架構

HDFS采用master/slave架構。一個HDFS叢集由一個Namenode和一定數目的Datanode組成。Namenode是一個中心伺服器,負責管理檔案系統的名字空間(namespace)以及用戶端對檔案的通路。叢集中的Datanode一般是一個節點一個,負責管理它所在節點上的資料存儲。 HDFS暴露了檔案系統的名字空間,使用者能夠以檔案的形式在上面存儲資料。從内部看,一個檔案其實被分成一個或多個資料塊,這些塊存儲在一組Datanode上。 Namenode執行檔案系統的名字空間操作,比如打開、關閉、重命名檔案或目錄。它也負責确定資料塊到具體Datanode節點的映射。 Datanode負責處理檔案系統用戶端的讀寫請求,在Namenode的統一排程下進行資料塊的建立、删除和複制。

  • Namenode:存儲系統中繼資料、 namespace、管理datanode、接收datanode狀态彙報
  • Datanode:存儲塊資料,響應用戶端對塊的讀寫請求,接收namenode的塊管理指令
  • Block:HDFS存儲資料的基本機關,預設值是128MB,實際塊大小0~128MB
  • Rack:機架,對datanode所在主機的實體辨別,辨別主機的位置,優化存儲和計算

架構圖

HadoopHadoop筆記zkfc: zookeeper failover controller

Block的複制原理

HadoopHadoop筆記zkfc: zookeeper failover controller

中繼資料(MetaData)的持久化機制

Namenode使用記憶體存儲MetaData,存在安全風險,HDFS提供了中繼資料的持久化機制

好處:保證中繼資料絕對不會丢失,并且

fsimage

加速了Namenode中繼資料的恢複速度

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-MByduazm-1590287261151)(D:\Learnspace\training camp\day02\圖檔\2019081302.png)]

HDFS常見問題

  • 為什麼HDFS不适合小檔案存儲?
    情況 Namenode占用 Datanode占用
    10000個檔案總共128MB 10000個中繼資料 >>150B
    1個128MB檔案 1個中繼資料 ==150B
    • 小檔案過多,會過多占用namenode的記憶體,并浪費block
    • HDFS适用于高吞吐量,而不适用于低時間延遲的通路。檔案過小,尋道時間大于資料讀寫時間,這不符合HDFS的設計原則
  • Namenode和SecondaryNamenode差別?

    Namenode主要維護兩個元件,一個是 fsimage ,一個是 editlog

    1. fsimage儲存了最新的中繼資料檢查點,包含整個HDFS檔案系統的所有目錄和檔案資訊。對于目錄來說包括修改時間、通路權限控制資訊(目錄所屬使用者、所在組)等;對于檔案來說包括資料塊描述資訊、通路時間、修改時間等。
    2. editlog主要是在Namenode已經啟動的情況下對HDFS進行的各種更新操作進行記錄,HDFS用戶端執行的所有寫操作都會被記錄到editlog中。
    為了避免editlog不斷增加,SecondaryNamenode會周期性合并fsimage和editlog生成新的fsimage。
    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-0nfWmVPS-1590287261153)(D:\Learnspace\training camp\day02\圖檔\2019081303.png)]

三、YARN

架構了解

https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html

Apache Hadoop YARN (Yet Another Resource Negotiator,另一種資源協調者)是一種新的 Hadoop 資源管理器,它是一個通用資源管理系統,可為上層應用提供統一的資源管理和排程,它的引入為叢集在資源使用率、資源統一管理和資料共享等方面帶來了很多好處。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-8mcQx9xd-1590287261154)(D:\Learnspace\training camp\day03\圖檔\2019081401.png)]

  • ResourceManager:在系統的所有應用程式之間仲裁資源的最終權限
  • NodeManager:是每台機器的架構代理,負責容器,監視其資源使用情況(CPU,記憶體,磁盤,網絡)并将其報告給ResourceManager的Scheduler(排程器)
  • App Master:應用程式的Master,負責任務在計算過程中的監控、故障轉移,每個Job作業隻有一個
  • Container:表示一個計算容器

環境搭建

  • 修改

    mapred-site.xml

    [[email protected] ~]# cd /usr/hadoop-2.6.0/
    [[email protected] hadoop-2.6.0]# mv etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xml
    [[email protected] hadoop-2.6.0]# vi etc/hadoop/mapred-site.xml
    # 添加以下内容
    <property>
       <name>mapreduce.framework.name</name>
       <value>yarn</value>
    </property>
               
  • 修改

    yarn-site.xml

    [[email protected] hadoop-2.6.0]# vi etc/hadoop/yarn-site.xml
    <property>
    	<name>yarn.nodemanager.aux-services</name>
    	<value>mapreduce_shuffle</value>
    </property>
    <property>
    	<name>yarn.resourcemanager.hostname</name>
    	<value>hadoop</value>
    </property>
               
  • 啟動YARN的服務
    僞分布式的YARN叢集
    [[email protected] hadoop-2.6.0]# start-yarn.sh
    starting yarn daemons
    starting resourcemanager, logging to /usr/hadoop-2.6.0/logs/yarn-root-resourcemanager-hadoop.out
    hadoop: starting nodemanager, logging to /usr/hadoop-2.6.0/logs/yarn-root-nodemanager-hadoop.out
    [[email protected] hadoop-2.6.0]# jps
    6892 ResourceManager  # master
    6974 NodeManager    # slave
               

四、MapReduce

思想了解

Hadoop MapReduce是一個軟體架構,基于該架構能夠容易地編寫應用程式,這些應用程式能夠運行在由上千台商用機器組成的大叢集上,并以一種可靠的,具有容錯能力的方式并行地處理上TB級别的海量資料集。這個定義里面有着這些關鍵詞:

一是軟體架構,二是并行處理,三是可靠且容錯,四是大規模叢集,五是海量資料集。

MapReduce擅長處理大資料,它為什麼具有這種能力呢?這可由MapReduce的設計思想發覺。MapReduce的思想就是“分而治之”或者“化繁為簡”。

  • Mapper

    負責“分”,即把複雜的任務分解為若幹個“簡單的任務”來處理。 “簡單的任務”包含三層含義:
    • 一是資料或計算的規模相對于原任務大大縮小;
    • 二是就近計算原則,即任務會配置設定到存放着所需資料的節點上進行計算;
    • 三是這些小任務可以并行計算,彼此間幾乎沒有依賴關系。
  • Reducer

    主要負責對map階段的結果進行彙總

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-LQAYmADH-1590287261155)(D:\Learnspace\training camp\day03\圖檔\2019081402.png)]

基本開發

建立Maven工程,導入依賴

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-common</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
    <version>2.6.0</version>
</dependency>
           

開發MapReduce應用程式

單詞計數的應用程式

MapReduce應用程式的兩個階段:

  1. Mapper:将大任務拆分為若幹個小任務,将非結構化的資料映射為KV結構的資料
  2. Reducer:負責統計計算
準備樣例檔案
How are you
Where are you from
Welcome to BJ
Are you ok
           
将模拟資料上傳到HDFS中
[[email protected] ~]# hdfs dfs -put data.txt /baizhi
           
定義Mapper任務
package com.baizhi;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * *Writable表示Hadoop提供的序列化對象
 *    LongWritable
 *    IntWritable
 *    String ---> Text
 *    ...
 *    <p>
 * Mapper階段
 *    keyIn: LongWritable 每行資料的首字元的offset(位置)
 *    valueIn: Text 一行記錄
 *    keyOut:  Text 單詞
 *    valueOut: IntWritable 初始值 1
 */
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    /**
     * 映射方法
     * How are you ---> (how,1) (are,1) (you,1)
     *
     * @param key     keyIn
     * @param value   valueIn
     * @param context 上下文對象(MapReduce應用程式運作的上下文資訊載體)
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.toLowerCase().split(" ");
        for (String word : words) {
            // 輸出處理完成kv資料
            context.write(new Text(word), new IntWritable(1));
        }
    }
}
           
定義Reducer任務
package com.baizhi;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * reducer階段 統計和計算
 *   keyIn:類型等價于Mapper的keyOut
 *   valueIn:類型等價于Mapper的valueOut
 *   keyOut:單詞  Text
 *   valueOut:總次數 IntWritable
 */
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    /**
     * 統計計算方法
     *  how are you
     *  are you ok
     *  	are [1,1]
     *      you [1,1]
     *      how [1]
     *
     * @param key   單詞
     * @param values key相同的初始值的集合
     * @param context  上下文對象
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        Iterator<IntWritable> iterator = values.iterator(); //擷取疊代器對象
        while (iterator.hasNext()){
            int num = iterator.next().get(); // 1
            count += num;
        }
        // 計算完成後 輸出計算結果
        context.write(key,new IntWritable(count));
    }
}
           
初始化類
package com.baizhi;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

/**
 * 單詞計數的初始化類
 */
public class WordCountApplication {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1. 建立MapReduce任務對象
        Configuration conf = new Configuration();
        String jobName = "wordcount";
        Job job = Job.getInstance(conf,jobName);
        job.setJarByClass(WordCountApplication.class);

        //2. 設定計算資料的輸入格式和計算結果的輸出格式(文本)
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //3. 指定計算資料的來源位置以及計算結果的輸出位置
        TextInputFormat.addInputPath(job,new Path("/baizhi/data.txt"));
        // 注意:計算結果的輸出目錄必須不存在
        TextOutputFormat.setOutputPath(job,new Path("/baizhi/result"));

        //4. 指定MapReduce應用的Mapper階段和Reducer階段的實作類
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        //5. 設定Mapper階段和Reducer階段的KeyOut和ValueOut的類型
        job.setMapOutputKeyClass(Text.class); // mapper的keyOut的類型
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6. 送出任務
        job.waitForCompletion(true); // true 輸出運作日志
    }
}
           
将MapReduce應用程式打為

jar

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-wE80SRkY-1590287261156)(D:\Learnspace\training camp\day03\圖檔\2019081403.png)]

測試運作
  • 将應用jar包 上傳到Linux作業系統中
  • 使用指令送出MapReduce應用程式

    文法:

    hadoop jar xxx.jar 入口類的全限定名

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-97rcOtnY-1590287261158)(D:\Learnspace\training camp\day03\圖檔\2019081404.png)]
檢視計算結果

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-HLRaaTz0-1590287261160)(D:\Learnspace\training camp\day03\圖檔\2019081405.png)]

第二個案例(流量統計)

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-pz1gYwf3-1590287261161)(D:\Learnspace\training camp\day03\圖檔\2019081406.png)]

MapReduce應用程式的其它運作方式

注意:

​ 在生産環境中,MapReduce Application一定是運作在YARN分布式叢集中的,

​ 但是,在開發測試MapReduce應用程式時,我們可以使用以下方式來測試代碼

本地計算 + 本地資料

本地計算指的是借助于Windows平台的Hadoop環境模拟運作MapReduce程式

本地資料指的是計算的資料來源于Windows平台,并且輸出到本地

  • 修改初始化類中如下代碼
// 注意:file:/// 表示使用本地檔案系統中的資料
TextInputFormat.addInputPath(job,new Path("file:///e:\\ssby.txt"));
// 注意:計算結果的輸出目錄必須不存在
TextOutputFormat.setOutputPath(job,new Path("file:///e:\\result"));
           
  • 運作程式

    右鍵初始化類 ---> Run as

    # 如出現以下異常
    Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    # 解決方案:
    1. 在項目的根目錄中建立包 org.apache.hadoop.io.nativeio
    2. 在包中建立類 NativeIO
    3. 找到Hadoop的NativeIO類将所有的代碼複制到自建的NativeIO中
    4. 修改NativeIO中的源碼(關聯源碼是557行,未關聯是287行),修改為:return true;
    5. 重新運作,得到運作結果
               
    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-kA7wPnj7-1590287261163)(D:\Learnspace\training camp\day03\圖檔\2019081407.png)]

本地計算 + 遠端資料

  • 修改初始化類
//3. 指定計算資料的來源以及計算結果的輸出位置
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop:9000/baizhi/data.txt"));
// 注意:計算結果的輸出目錄必須不存在
TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop:9000/baizhi/result3"));
           
  • 運作程式

    右鍵初始化類 --> Run as

  • 通路控制異常,添加虛拟機參數

    -DHADOOP_USER_NAME=root

遠端計算 + 遠端資料

遠端計算指MapReduce應用程式依然運作在YARN叢集中

遠端資料指資料來源于HDFS或者輸出到HDFS

  • 修改初始化類
    // 添加遠端計算的支援
    //===============================================================
    conf.set("fs.defaultFS", "hdfs://hadoop:9000/");
    conf.set("mapreduce.job.jar", "file:///F:\\IdeaProjects\\20190812\\hadoop-mapreduce\\target\\hadoop-mapreduce-1.0-SNAPSHOT.jar");
    conf.set("mapreduce.framework.name", "yarn");
    conf.set("yarn.resourcemanager.hostname", "hadoop");
    conf.set("yarn.nodemanager.aux-services", "mapreduce_shuffle");
    conf.set("mapreduce.app-submission.cross-platform", "true");
    conf.set("dfs.replication", "1");
    //===============================================================
               
  • 将Maven項目重新打包

    maven plugin ---> package---> xxx.jar

  • 運作程式

    右鍵初始化類 ---> Run as

作業

  • 某系統被通路的日志樣例資料,資料格式如下:
# 用戶端的ip位址  請求時間 請求方式 通路資源 響應的位元組大小 狀态碼
192.168.0.3 2019-08-14 15:30:15 GET /index.jsp 300 200
11.135.14.110 2019-08-14 15:32:10 POST /user/login.do 500 404
...
           
  • PV

    (Page View): 系統的通路量
    mapreduce
    	map: k: 日期 v: 1
    	reduce: k: 日期 values: [1,1,1,1]
               
  • UV

    (Unique Visitor): 獨立使用者的通路量
    mapreduce
    	map: k: 日期 v:ip
    	reduce: k: 日期 values:[ip,ip,ip]
    				   values ---> Set
               

MapReduce程式的運作流程

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-4f4VN1l0-1590287261165)(D:\Learnspace\training camp\day04\圖檔\2019081501.png)]

MapReduce任務送出的源碼剖析

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-LRwh2Dl6-1590287261167)(D:\Learnspace\training camp\day04\圖檔\2019081502.png)]

InputFormat和OutputFormat

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-9FzWcvNP-1590287261169)(D:\Learnspace\training camp\day04\圖檔\2019081503.png)]

InputFormat

InputFormat資料的輸入格式對象

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-swrCTYqM-1590287261171)(D:\Learnspace\training camp\day04\圖檔\2019081504.png)]

TextInputFormat例析

  • getSplits

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-ZN7DZETL-1590287261171)(D:\Learnspace\training camp\day04\圖檔\2019081505.png)]

  • createRecordReader

特點:
1.  InputFormat決定如何對計算的資料集進行邏輯切割(==140.8MB==)
  	2.  InputFormat決定如何解析讀取資料切片(split)中的資料内容,并且map任務的keyIn和valueIn的類型由RecordReader中的key、value決定
  	3.  一個InputSplit會由一個Map任務進行映射處理
  	4.  Inputformat負責輸入資料的合法性校驗
           
常見的InputFormat
  • FileInputFormat
    • TextInputFormat

      : 基于文本的資料輸入格式對象
      特點:按行讀取文本中的資料 KeyIn:LongWritable ValueIn:Text
    • NLineInputFormat

      特點:将文本中的N行(預設為1行)資料作一個資料切片 KeyIn:LongWritable ValueIn:Text

      設定N行:

      conf.set("mapreduce.input.lineinputformat.linespermap","3");

    • KeyValueLineRecordReader

      特點:按照KV解析文本中的資料 KeyIn:Text ValueIn:Text

      資料切片的計算規則等同于

      TextInputFormat

      資料切片的讀取方式按照KV的結構進行解析

      mapreduce.input.keyvaluelinerecordreader.key.value.separator

      預設為

      \t

      例如:

      conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");

    • FixedLengthInputFormat

    • CombineTextInputFormat

      特點:将多個小檔案的内容整合到一個資料切片中 KeyIn:LongWritable ValueIn: Text
  • DBInputFormat
    • DBInputFormat

      特點: 從資料庫中擷取資料,将獲得的資料作為Map任務的輸入

      KeyIn: LongWritable ValueIn:extends DBWritable

      [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-AmPk94qT-1590287261173)(D:\Learnspace\training camp\day04\圖檔\2019081506.png)]
      • 開發自定義的Writable對象,讀寫資料庫表中的記錄
        package com.baizhi.inputformat.db;
        
        import org.apache.hadoop.mapreduce.lib.db.DBWritable;
        
        import java.sql.PreparedStatement;
        import java.sql.ResultSet;
        import java.sql.SQLException;
        import java.util.Date;
        
        /**
         * 通過OrderWritable對象讀寫資料庫的記錄
         */
        public class OrderWritable implements DBWritable {
            private Integer orderId;
            private Double totalMoney;
            private Date createTime;
            private Integer userId;
        
            public OrderWritable() {
            }
        
            public OrderWritable(Integer orderId, Double totalMoney, Date createTime, Integer userId) {
                this.orderId = orderId;
                this.totalMoney = totalMoney;
                this.createTime = createTime;
                this.userId = userId;
            }
        
            public void write(PreparedStatement pstm) throws SQLException {
                pstm.setInt(2, this.orderId);
                pstm.setDouble(3, this.totalMoney);
        
                java.sql.Date date = new java.sql.Date(this.createTime.getTime());
                pstm.setDate(4, date);
        
                pstm.setInt(5, this.userId);
            }
        
            public void readFields(ResultSet rs) throws SQLException {
                this.orderId = rs.getInt("order_id");
                this.totalMoney = rs.getDouble("total_money");
                this.createTime = rs.getDate("create_time");
                this.userId = rs.getInt("user_id");
            }
        }
        
                   
      • 開發處理的Map任務
        package com.baizhi.inputformat.db;
        
        import org.apache.hadoop.io.DoubleWritable;
        import org.apache.hadoop.io.LongWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapreduce.Mapper;
        
        import java.io.IOException;
        import java.util.Date;
        
        public class OrderMapper extends Mapper<LongWritable, OrderWritable, Text, DoubleWritable> {
        	/**
        	   value: 資料庫一行記錄
        	*/
            @Override
            protected void map(LongWritable key, OrderWritable value, Context context) throws IOException, InterruptedException {
                Date createTime = value.getCreateTime();
                Integer userId = value.getUserId();
                Double totalMoney = value.getTotalMoney();
                String month = createTime.getYear() + "-" + createTime.getMonth() + "-" + userId;
                context.write(new Text(month), new DoubleWritable(totalMoney));
            }
        }
                   
      • 開發統計的Reduce任務
        package com.baizhi.inputformat.db;
        
        import org.apache.hadoop.io.DoubleWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapreduce.Reducer;
        
        import java.io.IOException;
        import java.util.Iterator;
        
        public class OrderReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
            @Override
            protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
                double sum = 0.0D;
                Iterator<DoubleWritable> iterator = values.iterator();
                while (iterator.hasNext()) {
                    DoubleWritable money = iterator.next();
                    sum += money.get();
                }
                context.write(key, new DoubleWritable(sum));
            }
        }
                   
      • 設定初始化類
        package com.baizhi.inputformat.db;
        
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.DoubleWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapred.lib.db.DBInputFormat;
        import org.apache.hadoop.mapreduce.Job;
        import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
        import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
        import org.iq80.leveldb.DB;
        
        import java.io.IOException;
        
        public class OrderComputApplication {
        
            public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration configuration = new Configuration();
                // 設定資料源資訊
                configuration.set(DBConfiguration.DRIVER_CLASS_PROPERTY,"com.mysql.jdbc.Driver");
                configuration.set(DBConfiguration.URL_PROPERTY,"jdbc:mysql://localhost:3306/vue");
                configuration.set(DBConfiguration.USERNAME_PROPERTY,"root");
                configuration.set(DBConfiguration.PASSWORD_PROPERTY,"root");
        
                Job job = Job.getInstance(configuration, "order");
                job.setJarByClass(OrderComputApplication.class);
        
                job.setInputFormatClass(DBInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
        
                // select order_id,total_money... from t_order where ... order by ...
                DBInputFormat.setInput(job,OrderWritable.class,"t_order",null,null,
                        "order_id","total_money","create_time","user_id");
                TextOutputFormat.setOutputPath(job,new Path("file:///E:/result5"));
        
                job.setMapperClass(OrderMapper.class);
                job.setReducerClass(OrderReducer.class);
        
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(DoubleWritable.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(DoubleWritable.class);
        
                job.waitForCompletion(true);
            }
        }
                   
      • 引入資料源的驅動jar包

        本地計算:在Maven項目中導入MySQL的依賴即可

        遠端計算:将MySQL的驅動jar包上傳到

        Hadoop安裝目錄的/share/hadoop/yarn/lib

IutputFormat作用
結論:
  1. 決定如何對計算的資料集進行邏輯切割
  2. 決定如何解析讀取資料切片(split)中的資料内容
  3. 負責輸入資料的合法性校驗

OutputFormat

OutputFormat資料的輸出格式對象,決定了如何将Reducer的計算結果輸出到指定的存儲系統中

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-HPTYoyCw-1590287261175)(D:\Learnspace\training camp\day05\圖檔\2019081601.png)]

常見的OutputFormat
  • FileOutputFormat:基于檔案的資料輸出格式
    • TextOutputFormat

      特點:計算的結果以文本的形式儲存在檔案中,文本中一行結果為Reduce方法的keyOut valueOut
  • DBOutputFormat:基于資料庫的資料輸出格式
    • DBOutputFormat

      特點:将Reducer的計算結果輸出儲存到資料庫,reduce方法每輸出一次則在資料庫産生一條記錄
  • TableOutputFormat:基于HBase的資料輸出格式
OutputFormat作用
結論:
  1. 決定計算結果以何種格式儲存到指定的存儲系統中

    2. 校驗計算結果的輸出位置是否合法

Shuffle原理剖析

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-3Wzlsjf3-1590287261177)(D:\Learnspace\training camp\day05\圖檔\2019081602.png)]

Shuffle是指對Map任務的輸出結果進行分區、排序、合并等處理後交給Reduce任務處理的過程,分為Map端的操作和Reduce端的操作。

Shuffle過程

  • Map端的Shuffle

    Map的輸出結果首先被緩存到記憶體,當緩存區容量到達80%(緩沖區預設100MB),就啟動溢寫操作。當啟動溢寫操作時,首先需要把緩存中的資料進行分區,然後對每個分區的資料進行排序和合并(combine),之後再寫入磁盤檔案。每次溢寫操作都會生成一個新的磁盤檔案,随着Map任務的執行,磁盤中就會生成多個溢寫檔案。在Map任務全部結束前,這些溢寫檔案會被歸并成一個大的磁盤檔案,然後通知相應的Reduce任務來拉取自己所應處理的分區資料。

  • 在Reduce端的Shuffle過程

    Reduce任務會從Map端的不同Map機器上拉取自己所應處理的分區資料,然後對分區資料進行排序合并後交給Reduce任務去處理。

作用

  • 保證每一個Reduce任務處理的資料量大緻是相同的
  • Map任務輸出的key相同,分區也一定相同,并且肯定是相同的Reduce處理的,保證計算結果的準确性
  • Reduce任務的數量決定了分區的數量,Reduce任務越多計算處理的并行度也就越高
    Reduce任務的數量(預設為1)可以通過:

    job.setNumReduceTasks(數量)

特點

  • Map端溢寫時,key相同所在分區一定相同
  • Map端溢寫時,排序減少了Reduce任務全局排序的複雜度
  • Map端溢寫時,合并(Combiner【可選】)減少溢寫檔案的體積,提高了Reduce任務在Fetch資料時的效率,它是一種MapReduce優化政策
  • Reduce端計算或者輸出時,它的資料都是有序的

Shuffle源碼追蹤

  • MapTask

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-dABOuAJw-1590287261178)(D:\Learnspace\training camp\day05\圖檔\2019081603.png)]

  • ReduceTask

    (略)

    建議閱讀

資料清洗

資料清洗是指将原始資料處理成有價值資料的過程,這一過程稱為資料清洗。

企業大資料開發的基本流程:

  1. 采集資料(Flume、Logstash)先儲存到MQ(Kafka)中
  2. 将MQ中的暫存資料存放到HDFS中儲存
  3. 資料清洗(低價值密度的資料處理)存放到HDFS
  4. 算法幹預(MapReduce),計算結果儲存到HDFS或者HBase
  5. 計算結果的可視化展示(ECharts、Highcharts)

需求

現有某系統某天的Nginx的通路日志,格式如下:

27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
110.52.250.126 - - [30/May/2013:17:38:20 +0800] "GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1" 200 1292
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_1.gif HTTP/1.1" 200 680
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_2.gif HTTP/1.1" 200 682
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/filetype/common.gif HTTP/1.1" 200 90
           

大資料處理的算法,需要參數==用戶端的ip位址、請求時間、請求資源、響應狀态碼

正規表達式提取資料

Regex Expression主要作用:字元串

比對

抽取

替換

文法
規則 解釋
. 比對任意字元
\d 比對任意數字
\D 比對任意非數字
\w 配置a-z和A-Z
\W 比對非a-z和A-Z
\s 比對空白符
^ 比對字元串的開頭
$ 比對字元串的末尾
規則的比對次數
文法 解釋
* 規則比對0到n次
規則比對1次
{n} 規則比對n次
{n,m} 規則比對n到m次
+ 規則比對1到n次(至少1次)
應用
# 比對手機号碼 11位數值構成
\d{11}

# 郵箱位址校驗  @
.+@.+
           

使用正規表達式提取Nginx通路日志中的四項名額

測試站點:http://regex101.com

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-IFmbJs8a-1590287261180)(D:\Learnspace\training camp\day06\圖檔\2019081901.png)]

分析後得到需要的正規表達式

^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*\[(.*)\]\s"\w*\s(.*)\sHTTP\/1.1"\s(\d{3}).*$
           

使用MapReduce分布式并行計算架構進行資料清洗

注意:因為資料清洗不涉及統計計算,是以MapReduce程式通常隻有map任務,沒有reduce任務

job.setNumReduceTasks(0)

實作代碼

資料清洗的Mapper

package com.baizhi.dataclean;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class DataCleanMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    /**
     * @param key
     * @param value   nginx通路日志中的一行記錄(原始資料)
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        final String regex = "^(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}).*\\[(.*)\\]\\s\"\\w*\\s(.*)\\sHTTP\\/1.1\"\\s(\\d{3}).*$";
        String line = value.toString();

        final Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
        final Matcher matcher = pattern.matcher(line);

        while (matcher.find()) {
            // 四項關鍵名額 ip 請求時間 請求資源 響應狀态碼
            String clientIp = matcher.group(1);
            // yyyy-MM-dd HH:mm:ss
            String accessTime = matcher.group(2);
            String accessResource = matcher.group(3);
            String status = matcher.group(4);

            // 30/May/2013:17:38:21 +0800
            // 30/05/2013:17:38:21
            SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
            try {
                Date date = sdf.parse(accessTime);
                SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String finalDate = sdf2.format(date);
                context.write(new Text(clientIp + " " + finalDate + " " + accessResource + " " + status), null);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    }
}
           

初始化類

package com.baizhi.dataclean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class DataCleanApplication {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration(), "data clean");
        job.setJarByClass(DataCleanApplication.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TextInputFormat.setInputPaths(job,new Path("file:///E:/access.log"));
        TextOutputFormat.setOutputPath(job,new Path("file:///E:/final"));

        job.setMapperClass(DataCleanMapper.class);

        // 注意:資料清洗通常隻有map任務而沒有reduce任務
        job.setNumReduceTasks(0);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.waitForCompletion(true);
    }
}
           

資料傾斜

資料分區預設政策

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-CFdJRlwz-1590287261180)(D:\Learnspace\training camp\day06\圖檔\2019081902.png)]

資料傾斜是指大量key相同的資料交由一個Reduce任務去統計計算,造成”閑的閑死,忙的忙死“這樣的現象,不符合分布式并行計算的設計初衷。

現象

  • 某一個Reduce任務運作特别耗時
  • Reduce任務運作時,記憶體突然溢出

解決方案

  • 增加執行Reduce任務機器的JVM記憶體(硬體的水準擴充)
  • 增加Reduce任務的數量,每個Reduce任務隻負責極少部分的資料處理,并且Reduce任務的數量增加提高了資料計算的并行度
Reduce任務的正确數量:0.95或者1.75 * (NodeManage數量 * 每個節點的最大容器數量)
  • 自定義分區規則Partitioner
package com.baizhi.partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 自定義分區規則
 */
public class CustomPartitioner extends Partitioner<Text, LongWritable> {

    /**
     * @param key
     * @param value
     * @param i     numReduceTasks
     * @return 分區序号
     */
    public int getPartition(Text key, LongWritable value, int i) {
        if (key.toString().equals("CN-GD")) return 0;
        else if (key.toString().equals("CN-GX")) return 1;
        else if (key.toString().equals("CN-HK")) return 2;
        else if (key.toString().equals("JP-TY")) return 3;
        else return 4;
    }
}
           
  • 合理使用

    Combiner

    ,将key相同的value進行歸并

在Combiner合并時,要求value必須能支援疊代計算,并且不能影響Reduce任務的輸入

Combiner通常就是Reducer任務

// 優化政策:combiner合并操作
job.setCombinerClass(MyReducer.class);
           

五、Hadoop完全分布式叢集

架構

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-BVsr3Kwd-1590287261183)(D:\Learnspace\training camp\day06\圖檔\2019081903.png)]

環境搭建

準備3台虛拟機

  • node1:

    192.168.12.130

  • node2:

    192.168.12.131

  • node3:

    192.168.12.132

服務劃分

服務名 node1 node2 node3
Namenode Y(主) Y(備)
Datanode Y Y Y
Journal node Y Y Y
Zookeeper Y Y Y
ResourceManager Y(主) Y(備)
NodeManger Y Y Y

配置步驟

準備安裝包
# jdk1.8+
# centos7.2
# hadoop-2.6.0
# zookeeper-3.4.6
           
修改主機名和IP位址的映射檔案
[[email protected] ~]# vi /etc/hosts
192.168.12.130  node1
192.168.12.131  node2
192.168.12.132  node3
           
關閉防火牆
[[email protected] ~]# systemctl stop firewalld
[[email protected] ~]# systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
           
SSH免密登入
[[email protected] ~]# ssh-keygen -t rsa
[[email protected] ~]# ssh-copy-id node1
[[email protected] ~]# ssh-copy-id node2
[[email protected] ~]# ssh-copy-id node3
           
同步時鐘
[[email protected] ~]# date -s '2018-12-1 20:06:00'
2018年 12月 01日 星期六 20:06:00 CST
[[email protected] ~]# clock -w
[[email protected] ~]# date
2018年 12月 01日 星期六 20:06:10 CST
           
注意:伺服器叢集時鐘如果一緻,可以跳過此步驟!
修改伺服器的主機名
[[email protected] ~]# vi /etc/hostname
# 192.168.12.130
node1
# 192.168.12.131
node2
# 192.168.12.132
node3
           
重新開機機器,生效
安裝JDK1.8+
# 将叢集搭建的安裝包上傳到某節點
# 利用scp指令拷貝到其它節點
[[email protected] ~]# scp jdk-8u191-linux-x64.rpm [email protected]:~
jdk-8u191-linux-x64.rpm
[[email protected] ~]# scp jdk-8u191-linux-x64.rpm [email protected]:~
jdk-8u191-linux-x64.rpm
           
安裝ZooKeeper
[[email protected] ~]# scp zookeeper-3.4.6.tar.gz [email protected]:~
zookeeper-3.4.6.tar.gz     100%   17MB  84.3MB/s   00:00
[[email protected] ~]# scp zookeeper-3.4.6.tar.gz [email protected]:~
zookeeper-3.4.6.tar.gz     100%   17MB  73.4MB/s   00:00

[[email protected] ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr
[[email protected] ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg
tickTime=2000
dataDir=/root/zkdata
clientPort=2181
initLimit=5
syncLimit=2
server.1=node1:2887:3887
server.2=node2:2887:3887
server.3=node3:2887:3887

[[email protected] ~]# mkdir -p /root/zkdata

# node1執行此指令
[[email protected] ~]# cd zkdata/
[[email protected] zkdata]# vi myid
1

# node2執行此指令
[[email protected] ~]# cd zkdata/
[[email protected] zkdata]# vi myid
2

# node3執行此指令
[[email protected] ~]# cd zkdata/
[[email protected] zkdata]# vi myid
3

# 啟動ZooKeeper叢集
[[email protected] ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh start /usr/zookeeper-3.4.6/conf/zoo.cfg
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/conf/zoo.cfg
Starting zookeeper ... STARTED
# 确認zookeper服務是否正常:方法一
[[email protected] ~]# jps
1777 QuorumPeerMain
1811 Jps
# 确認zookeper服務是否正常:方法二
[[email protected] ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh status /usr/zookeeper-3.4.6/conf/zoo.cfg
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/conf/zoo.cfg
Mode: leader
           
安裝Hadoop
#1.将hadoop的安裝包遠端拷貝到其它的節點
[[email protected] ~]# scp hadoop-2.6.0_x64.tar.gz [email protected]:~
hadoop-2.6.0_x64.tar.gz    100%  172MB 113.8MB/s   00:01
[[email protected] ~]# scp hadoop-2.6.0_x64.tar.gz [email protected]:~
hadoop-2.6.0_x64.tar.gz    100%  172MB 120.8MB/s   00:01

#2.安裝
[[email protected] ~]# tar -zxf hadoop-2.6.0_x64.tar.gz -C /usr

#3.配置java和hadoop的環境變量
[[email protected] ~]# vi ~/.bashrc
HADOOP_HOME=/usr/hadoop-2.6.0
JAVA_HOME=/usr/java/latest
CLASSPATH=.
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export JAVA_HOME
export CLASSPATH
export PATH
export HADOOP_HOME
[[email protected] ~]# source ~/.bashrc
           
修改Hadoop的配置檔案
  • core-site.xml

[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/core-site.xml
<property>
  <name>fs.defaultFS</name>
  <value>hdfs://mycluster</value>
</property>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/usr/hadoop-2.6.0/hadoop-${user.name}</value>
</property>
<property>
  <name>fs.trash.interval</name>
  <value>30</value>
</property>
<property>
  <name>net.topology.script.file.name</name>
  <value>/usr/hadoop-2.6.0/etc/hadoop/rack.sh</value>
</property>
           
  • 建立機架腳本檔案,該腳本可以根據IP判斷機器所處的實體位置
[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/rack.sh
while [ $# -gt 0 ] ; do
	nodeArg=$1
	exec</usr/hadoop-2.6.0/etc/hadoop/topology.data
	result=""
	while read line ; do
        ar=( $line )
        if [ "${ar[0]}" = "$nodeArg" ] ; then
        result="${ar[1]}"
	fi
	done
    shift
    if [ -z "$result" ] ; then
    echo -n "/default-rack"
    else
    echo -n "$result "
    fi
done

[[email protected] ~]# chmod u+x /usr/hadoop-2.6.0/etc/hadoop/rack.sh
[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/topology.data
192.168.12.130 /rack1
192.168.12.131 /rack2
192.168.12.132 /rack2
[[email protected] ~]# /usr/hadoop-2.6.0/etc/hadoop/rack.sh 192.168.23.137
/rack1

           
  • hdfs-site.xml

[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/hdfs-site.xml
<property>
	<name>dfs.replication</name>
	<value>3</value>
</property>
<property>
	<name>dfs.ha.automatic-failover.enabled</name>
	<value>true</value>
</property>
<property>
	<name>ha.zookeeper.quorum</name>
	<value>node1:2181,node2:2181,node3:2181</value>
</property>
<property>
    <name>dfs.nameservices</name>
    <value>mycluster</value>
</property>
<property>
    <name>dfs.ha.namenodes.mycluster</name>
    <value>nn1,nn2</value>
</property>
<property>
    <name>dfs.namenode.rpc-address.mycluster.nn1</name>
    <value>node1:9000</value>
</property>
<property>
    <name>dfs.namenode.rpc-address.mycluster.nn2</name>
    <value>node2:9000</value>
</property>
<property>
    <name>dfs.namenode.shared.edits.dir</name>
    <value>qjournal://node1:8485;node2:8485;node3:8485/mycluster</value>
</property>
<property>
   <name>dfs.client.failover.proxy.provider.mycluster</name>
   <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
    <name>dfs.ha.fencing.methods</name>
    <value>sshfence</value>
</property>
<property>
    <name>dfs.ha.fencing.ssh.private-key-files</name>
    <value>/root/.ssh/id_rsa</value>
</property>
           
  • slaves

[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/slaves
node1
node2
node3
           

啟動HDFS HA叢集的服務

[[email protected] ~]# hadoop-daemon.sh start journalnode
[[email protected] ~]# hdfs namenode -format
[[email protected] ~]# hadoop-daemon.sh start namenode
[[email protected] ~]# hdfs namenode -bootstrapStandby
[[email protected] ~]# hadoop-daemon.sh start namenode
# zkfc: zookeeper failover controller
[[email protected]|2 ~]# hdfs zkfc -formatZK (可以在node1或者node2任意一台注冊namenode資訊)
[[email protected] ~]# hadoop-daemon.sh start zkfc (哨兵)
[[email protected] ~]# hadoop-daemon.sh start zkfc (哨兵)
[[email protected] ~]# hadoop-daemon.sh start datanode
           
注意:CentOS-7.x版本需要安裝一個中間依賴服務

[[email protected] ~]# yum install -y psmisc

YARN的HA叢集

  • mapred-site.xml

    [[email protected] ~]# cp /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml.template /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml
    [[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml
    
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
               
  • yarn-site.xml

    <property>
        <name>yarn.nodemanager.aux-services</name>
    	<value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.resourcemanager.ha.enabled</name>
        <value>true</value>
    </property>
    <property>
        <name>yarn.resourcemanager.cluster-id</name>
        <value>cluster1</value>
    </property>
    <property>
        <name>yarn.resourcemanager.ha.rm-ids</name>
        <value>rm1,rm2</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname.rm1</name>
        <value>node2</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname.rm2</name>
        <value>node3</value>
    </property>
    <property>
        <name>yarn.resourcemanager.zk-address</name>
        <value>node1:2181,node2:2181,node3:2181</value>
    </property>
               
  • 啟動YARN
    [[email protected] ~]# yarn-daemon.sh start resourcemanager
    [[email protected] ~]# yarn-daemon.sh start resourcemanager
    [[email protected] ~]# yarn-daemon.sh start nodemanager
               
  • 檢視ResourceManager HA狀态
    [[email protected] ~]# yarn rmadmin -getServiceState rm1
    active
    [[email protected] ~]# yarn rmadmin -getServiceState rm2
    standby
               

namenode

zkfc: zookeeper failover controller

[[email protected]|2 ~]# hdfs zkfc -formatZK (可以在node1或者node2任意一台注冊namenode資訊)

[[email protected] ~]# hadoop-daemon.sh start zkfc (哨兵)

[[email protected] ~]# hadoop-daemon.sh start zkfc (哨兵)

[[email protected] ~]# hadoop-daemon.sh start datanode

> 注意:CentOS-7.x版本需要安裝一個中間依賴服務
>
>  `[[email protected] ~]# yum install -y psmisc`



#### YARN的HA叢集

- `mapred-site.xml`

  ```xml
  [[email protected] ~]# cp /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml.template /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml
  [[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml
  
  <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
  </property>
           
  • yarn-site.xml

    <property>
        <name>yarn.nodemanager.aux-services</name>
    	<value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.resourcemanager.ha.enabled</name>
        <value>true</value>
    </property>
    <property>
        <name>yarn.resourcemanager.cluster-id</name>
        <value>cluster1</value>
    </property>
    <property>
        <name>yarn.resourcemanager.ha.rm-ids</name>
        <value>rm1,rm2</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname.rm1</name>
        <value>node2</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname.rm2</name>
        <value>node3</value>
    </property>
    <property>
        <name>yarn.resourcemanager.zk-address</name>
        <value>node1:2181,node2:2181,node3:2181</value>
    </property>
               
  • 啟動YARN
    [[email protected] ~]# yarn-daemon.sh start resourcemanager
    [[email protected] ~]# yarn-daemon.sh start resourcemanager
    [[email protected] ~]# yarn-daemon.sh start nodemanager
               
  • 檢視ResourceManager HA狀态
    [[email protected] ~]# yarn rmadmin -getServiceState rm1
    active
    [[email protected] ~]# yarn rmadmin -getServiceState rm2
    standby
               

繼續閱讀