Author: 李金輝
Wechat:m04194514
Hadoop筆記
一、概述
大資料
大資料(Big Data)是指無法在一定時間範圍内用正常軟體工具進行捕捉、處理和管理的資料集合,需要新處理模式才能具有更強的決策力、洞察發現力和流程優化能力的海量、高增長率和多樣化的資訊資産。
大資料的5V特點(IBM提出):
- Volume(大量)
- Velocity(高速)
- Variety(多樣)
- Value(低價值密度)
- Veracity(真實性)
Hadoop是什麼
http://hadoop.apache.org
Apache Hadoop是一個開源、可靠、可擴充的分布式計算架構。
Hadoop架構允許使用者在一個超大規模的伺服器叢集中,對大資料集進行分布式的處理計算。Hadoop叢集規模可以由單個(僞分布式叢集)或者上千台的商用伺服器(完全分布式叢集)構成。Hadoop叢集中的每一個伺服器都提供了本地計算和存儲的能力。Hadoop架構并不是通過硬體實作的高可用,而是通過應用層檢測處理錯誤,這樣的話Hadoop叢集就可以建立在廉價的商用伺服器上。
- 狹義的Hadoop(六大子產品)
- Hadoop Common:Hadoop架構通用支援庫
- Hadoop Distributed File System (HDFS™):分布式檔案系統提供了高吞吐能力的資料通路
- Hadoop YARN:一個用來做任務的排程和分布式叢集的資源管理的架構
- Hadoop MapReduce:基于YARN的系統,對大資料集進行分布式的并行計算處理
- Hadoop Ozone:Hadoop對象存儲系統
- Hadoop Submarine:機器學習的引擎
- 廣義的Hadoop(泛指生态體系)
- Apache HBase:Big Table,用來存儲海量的結構化資料
- Apache Zookeeper(動物園管理者):分布式協調服務系統,主要解決Hadoop生态體系中各個分布式系統存在的一些通用問題
- Apache Hive(小蜜蜂):資料倉庫的基礎設施,用來簡化Hadoop的操作
- Apache Flume(資料采集): 負責采集各種類型的資料,并且進行簡單的預處理操作
- Apache Spark(Scala語言): 更為高效的分布式計算引擎
- Apache Flink: 高效的分布式計算引擎(第三代資料分析引擎)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2csgGbYFGaSdVYohGMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL3gzM2QzNxgTM3ITMxgTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
二、HDFS
HDFS是Hadoop的分布式檔案系統( Hadoop Distributed File System ),類似于其它的分布式檔案系統。HDFS支援高度容錯,可以部署在廉價的硬體裝置上,特别适宜于大型資料集的分布式存儲。
Google開源論文GFS的開源實作
環境搭建
建構HDFS的僞分布式叢集(使用單台機器,模拟HDFS叢集的所有服務)
- 安裝CentOS-7.x
CentOS-7.2版本
- 配置網絡
# ip addr 檢視目前的伺服器網絡設定 # vi /etc/sysconfig/network-scripts/ifcfg-ens33 修改伺服器網絡配置檔案的參數 # 修改配置檔案中的ONBOOT=yes # 重新開機生效 systemctl restart network
- 關閉防火牆
[[email protected] ~]# systemctl stop firewalld [[email protected] ~]# systemctl disable firewalld Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service. Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
- 修改伺服器的主機名
# 簡化連接配接伺服器操作 [[email protected] ~]# vi /etc/hostname # 删除localhost,新增hadoop(自定義的主機名)
- 配置主機名和ip位址的映射關系
[[email protected] ~]# vi /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 # 最後一行添加目前伺服器的ip位址和主機名映射 192.168.12.129 hadoop # 測試 [[email protected] ~]# ping hadoop PING hadoop (192.168.12.129) 56(84) bytes of data. 64 bytes from hadoop (192.168.12.129): icmp_seq=1 ttl=64 time=0.107 ms 64 bytes from hadoop (192.168.12.129): icmp_seq=2 ttl=64 time=0.053 ms
- 配置SSH(遠端免密登入)
[[email protected] ~]# ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa Generating public/private rsa key pair. Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: SHA256:/VJcuTQzpC4EDqiiEKWwwtYAqS9Von3ssc12fM+ldvQ [email protected] The key's randomart image is: +---[RSA 2048]----+ |++. .. . . | |=o+ o o . o . | |=* * . . . B | |B + + o o o = | |o+ o = .S o + . | |o . o + o .+ o | | . . . ..o.+ . | | .= . E| | . . | +----[SHA256]-----+ [[email protected] ~]# [[email protected] ~]# cd .ssh/ [[email protected] .ssh]# ll 總用量 12 -rw-------. 1 root root 1679 8月 12 15:45 id_rsa -rw-r--r--. 1 root root 393 8月 12 15:45 id_rsa.pub -rw-r--r--. 1 root root 183 8月 12 15:43 known_hosts [[email protected] .ssh]# cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys [[email protected] .ssh]# ll 總用量 16 -rw-r--r--. 1 root root 393 8月 12 15:47 authorized_keys -rw-------. 1 root root 1679 8月 12 15:45 id_rsa -rw-r--r--. 1 root root 393 8月 12 15:45 id_rsa.pub -rw-r--r--. 1 root root 183 8月 12 15:43 known_hosts [[email protected] .ssh]# chmod 0600 ~/.ssh/authorized_keys [[email protected] .ssh]# [[email protected] .ssh]# ssh hadoop Last login: Mon Aug 12 15:43:18 2019 from 192.168.12.1
- 安裝JDK1.8+
[[email protected] ~]# rpm -ivh jdk-8u191-linux-x64.rpm
警告:jdk-8u191-linux-x64.rpm: 頭V3 RSA/SHA256 Signature, 密鑰 ID ec551f03: NOKEY
準備中... ################################# [100%]
正在更新/安裝...
1:jdk1.8-2000:1.8.0_191-fcs ################################# [100%]
Unpacking JAR files...
tools.jar...
plugin.jar...
javaws.jar...
deploy.jar...
rt.jar...
jsse.jar...
charsets.jar...
localedata.jar...
[[email protected] ~]# java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
- 安裝Hadoop
- 修改HDFS叢集的配置檔案
[[email protected] hadoop-2.6.0]# vi etc/hadoop/core-site.xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/hadoop-2.6.0/hadoop-${user.name}</value>
</property>
[[email protected] hadoop-2.6.0]# vi etc/hadoop/hdfs-site.xml
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
[[email protected] hadoop-2.6.0]# vi etc/hadoop/slaves
hadoop
- 添加環境變量配置
[[email protected] ~]# vi .bashrc
HADOOP_HOME=/usr/hadoop-2.6.0
JAVA_HOME=/usr/java/latest
CLASSPATH=.
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export JAVA_HOME
export CLASSPATH
export PATH
export HADOOP_HOME
[[email protected] ~]# source .bashrc
服務啟動
- 初始化操作
NOTE:
初始化操作隻需要在第一次啟動HDFS叢集之前執行,後續不需要再次執行,直接跳過啟動服務即可
- 啟動HDFS叢集
[[email protected] ~]# start-dfs.sh
Starting namenodes on [hadoop]
hadoop: starting namenode, logging to /usr/hadoop-2.6.0/logs/hadoop-root-namenode-hadoop.out
hadoop: starting datanode, logging to /usr/hadoop-2.6.0/logs/hadoop-root-datanode-hadoop.out
Starting secondary namenodes [0.0.0.0]
The authenticity of host '0.0.0.0 (0.0.0.0)' can't be established.
ECDSA key fingerprint is SHA256:yDvdRHO65GeTfU6PJQjEKMap+lEZb8a/JeuesbTsMYs.
ECDSA key fingerprint is MD5:d4:bf:fe:86:d3:ed:2d:fc:5f:a2:2b:e5:86:0c:ae:ee.
Are you sure you want to continue connecting (yes/no)? yes
0.0.0.0: Warning: Permanently added '0.0.0.0' (ECDSA) to the list of known hosts.
0.0.0.0: starting secondarynamenode, logging to /usr/hadoop-2.6.0/logs/hadoop-root-secondarynamenode-hadoop.out
- 驗證服務是否啟動成功
# 1. java的指令 jps,檢視java程序清單
[[email protected] ~]# jps
10995 SecondaryNameNode # HDFS小秘
10796 NameNode # HDFS Master
10877 DataNode # HDFS Slaves
# 2. 通路HDFS的Web UI
http://伺服器位址:50070
# 3. 檢視分布式系統日志
[[email protected] hadoop-2.6.0]# cd logs/
[[email protected] logs]# ll
總用量 92
-rw-r--r--. 1 root root 24249 8月 12 16:12 hadoop-root-datanode-hadoop.log
-rw-r--r--. 1 root root 714 8月 12 16:12 hadoop-root-datanode-hadoop.out
-rw-r--r--. 1 root root 30953 8月 12 16:17 hadoop-root-namenode-hadoop.log
-rw-r--r--. 1 root root 714 8月 12 16:12 hadoop-root-namenode-hadoop.out
-rw-r--r--. 1 root root 22304 8月 12 16:13 hadoop-root-secondarynamenode-hadoop.log
-rw-r--r--. 1 root root 714 8月 12 16:12 hadoop-root-secondarynamenode-hadoop.out
-rw-r--r--. 1 root root 0 8月 12 16:12 SecurityAuth-root.audit
- 關閉服務
指令操作
HDFS分布式檔案系統,操作類似于Linux檔案系統
比如Linux:cp、mv、rm、cat、mkdir 常用指令非常類似
文法:
hdfs dfs -參數
Usage: hadoop fs [generic options]
[-appendToFile <localsrc> ... <dst>]
[-cat [-ignoreCrc] <src> ...] # 檢視文本檔案内容
[-checksum <src> ...]
[-chgrp [-R] GROUP PATH...] # 修改屬組
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...] # 修改權限
[-chown [-R] [OWNER][:[GROUP]] PATH...] # 修改屬主
[-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>] # 從本地拷貝到HDFS
[-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] # 從HDFS拷貝到本地
[-count [-q] [-h] <path> ...] # 計數
[-cp [-f] [-p | -p[topax]] <src> ... <dst>] # 拷貝
[-createSnapshot <snapshotDir> [<snapshotName>]]
[-deleteSnapshot <snapshotDir> <snapshotName>]
[-df [-h] [<path> ...]]
[-du [-s] [-h] <path> ...]
[-expunge]
[-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] # 下載下傳
[-getfacl [-R] <path>]
[-getfattr [-R] {-n name | -d} [-e en] <path>]
[-getmerge [-nl] <src> <localdst>]
[-help [cmd ...]] # 幫助
[-ls [-d] [-h] [-R] [<path> ...]] # 檢視目錄清單
[-mkdir [-p] <path> ...] # 建立檔案夾
[-moveFromLocal <localsrc> ... <dst>] # 從本地移動到HDFS
[-moveToLocal <src> <localdst>] # 将HDFS中的檔案移動到本地
[-mv <src> ... <dst>] # HDFS中的檔案或檔案夾的移動
[-put [-f] [-p] [-l] <localsrc> ... <dst>] # 上傳
[-renameSnapshot <snapshotDir> <oldName> <newName>]
[-rm [-f] [-r|-R] [-skipTrash] <src> ...] # 删除
[-rmdir [--ignore-fail-on-non-empty] <dir> ...] # 删除檔案夾
[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
[-setfattr {-n name [-v value] | -x name} <path>]
[-setrep [-R] [-w] <rep> <path> ...]
[-stat [format] <path> ...]
[-tail [-f] <file>] # 檢視文本檔案的末尾内容
[-test -[defsz] <path>]
[-text [-ignoreCrc] <src> ...]
[-touchz <path> ...]
[-usage [cmd ...]]
JAVA API操作
- 環境搭建(以windows平台為例)
- 解壓縮Hadoop的安裝包
# 如解壓縮安裝到E:\\根目錄
-
拷貝相容檔案到安裝目錄下的bin檔案夾中
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-DSuUicVE-1590287261139)(D:\Learnspace\training camp\day01\圖檔\2019081202.png)]
-
在windows的hosts檔案中添加主機名和IP位址的映射關系
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-XtN6Dy84-1590287261141)(D:\Learnspace\training camp\day01\圖檔\2019081203.png)]
- 重新開機開發工具
- 配置HADOOP_HOME環境變量
- 解壓縮Hadoop的安裝包
- 實戰
- 建立Maven工程,并導入HDFS Client Driver
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
- 測試代碼
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; /** * hdfs java api測試 * FileSystem */ public class HDFSDemo { /** * hdfs 用戶端操作對象 */ private FileSystem fileSystem = null; private Configuration configuration = null; @Before public void doBefore() throws URISyntaxException, IOException { URI uri = new URI("hdfs://hadoop:9000"); configuration = new Configuration(); fileSystem = FileSystem.get(uri, configuration); } /** * 檔案上傳 * put * copyFromLocal * moveFromLocal * * @org.apache.hadoop.security.AccessControlException: Permission denied: user=Administrator, access=WRITE, inode="/baizhi":root:supergroup:drwxr-xr-x * 解決方案: 1. 修改權限 (UGO) o+w 2. 修改操作hdfs使用者身份:-DHADOOP_USER_NAME=root 3. 關閉hdfs權限檢查功能: hdfs-site.xml <property> <name>dfs.permissions.enabled</name> <value>false</value> </property> */ @Test public void testUpload() throws IOException { Path src = new Path("G:\\apache-tomcat-7.0.85.zip"); Path dst = new Path("/baizhi"); fileSystem.copyFromLocalFile(src, dst); } @Test public void testUpload2() throws IOException { FileInputStream src = new FileInputStream("F:\\生态圖.png"); Path dst = new Path("/baizhi/test"); FSDataOutputStream dstOutputStream = fileSystem.create(dst); IOUtils.copyBytes(src, dstOutputStream, configuration); } /** * 下載下傳檔案 * get * copyToLocal * moveToLocal */ @Test public void testDownload() throws IOException { Path src = new Path("/baizhi/test"); Path dst = new Path("G:\\1.png"); fileSystem.copyToLocalFile(src, dst); } @Test public void testDownload2() throws IOException { FSDataInputStream inputStream = fileSystem.open(new Path("/baizhi/test")); FileOutputStream outputStream = new FileOutputStream("G:\\2.png"); IOUtils.copyBytes(inputStream, outputStream, configuration); } /** * 删除檔案 */ @Test public void testDelete() throws IOException { // fileSystem.delete(new Path("/baizhi/test"),false); // true代表遞歸删除 fileSystem.delete(new Path("/baizhi"), true); } @Test public void testOther() throws IOException { // rwxrw-r-- /baizhi // fileSystem.mkdirs(new Path("/baizhi"), new FsPermission(FsAction.ALL, FsAction.READ_WRITE, FsAction.READ)); boolean exists = fileSystem.exists(new Path("/baizhi")); System.out.println(exists?"存在":"不存在"); } @After public void doAfter() throws IOException { fileSystem.close(); } }
- 建立Maven工程,并導入HDFS Client Driver
HDFS架構
HDFS采用master/slave架構。一個HDFS叢集由一個Namenode和一定數目的Datanode組成。Namenode是一個中心伺服器,負責管理檔案系統的名字空間(namespace)以及用戶端對檔案的通路。叢集中的Datanode一般是一個節點一個,負責管理它所在節點上的資料存儲。 HDFS暴露了檔案系統的名字空間,使用者能夠以檔案的形式在上面存儲資料。從内部看,一個檔案其實被分成一個或多個資料塊,這些塊存儲在一組Datanode上。 Namenode執行檔案系統的名字空間操作,比如打開、關閉、重命名檔案或目錄。它也負責确定資料塊到具體Datanode節點的映射。 Datanode負責處理檔案系統用戶端的讀寫請求,在Namenode的統一排程下進行資料塊的建立、删除和複制。
- Namenode:存儲系統中繼資料、 namespace、管理datanode、接收datanode狀态彙報
- Datanode:存儲塊資料,響應用戶端對塊的讀寫請求,接收namenode的塊管理指令
- Block:HDFS存儲資料的基本機關,預設值是128MB,實際塊大小0~128MB
- Rack:機架,對datanode所在主機的實體辨別,辨別主機的位置,優化存儲和計算
架構圖
Block的複制原理
中繼資料(MetaData)的持久化機制
Namenode使用記憶體存儲MetaData,存在安全風險,HDFS提供了中繼資料的持久化機制
好處:保證中繼資料絕對不會丢失,并且 fsimage
加速了Namenode中繼資料的恢複速度
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-MByduazm-1590287261151)(D:\Learnspace\training camp\day02\圖檔\2019081302.png)]
HDFS常見問題
- 為什麼HDFS不适合小檔案存儲?
情況 Namenode占用 Datanode占用 10000個檔案總共128MB 10000個中繼資料 >>150B 1個128MB檔案 1個中繼資料 ==150B - 小檔案過多,會過多占用namenode的記憶體,并浪費block
- HDFS适用于高吞吐量,而不适用于低時間延遲的通路。檔案過小,尋道時間大于資料讀寫時間,這不符合HDFS的設計原則
-
Namenode和SecondaryNamenode差別?
Namenode主要維護兩個元件,一個是 fsimage ,一個是 editlog
- fsimage儲存了最新的中繼資料檢查點,包含整個HDFS檔案系統的所有目錄和檔案資訊。對于目錄來說包括修改時間、通路權限控制資訊(目錄所屬使用者、所在組)等;對于檔案來說包括資料塊描述資訊、通路時間、修改時間等。
- editlog主要是在Namenode已經啟動的情況下對HDFS進行的各種更新操作進行記錄,HDFS用戶端執行的所有寫操作都會被記錄到editlog中。
為了避免editlog不斷增加,SecondaryNamenode會周期性合并fsimage和editlog生成新的fsimage。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-0nfWmVPS-1590287261153)(D:\Learnspace\training camp\day02\圖檔\2019081303.png)]
三、YARN
架構了解
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html
Apache Hadoop YARN (Yet Another Resource Negotiator,另一種資源協調者)是一種新的 Hadoop 資源管理器,它是一個通用資源管理系統,可為上層應用提供統一的資源管理和排程,它的引入為叢集在資源使用率、資源統一管理和資料共享等方面帶來了很多好處。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-8mcQx9xd-1590287261154)(D:\Learnspace\training camp\day03\圖檔\2019081401.png)]
- ResourceManager:在系統的所有應用程式之間仲裁資源的最終權限
- NodeManager:是每台機器的架構代理,負責容器,監視其資源使用情況(CPU,記憶體,磁盤,網絡)并将其報告給ResourceManager的Scheduler(排程器)
- App Master:應用程式的Master,負責任務在計算過程中的監控、故障轉移,每個Job作業隻有一個
- Container:表示一個計算容器
環境搭建
- 修改
mapred-site.xml
[[email protected] ~]# cd /usr/hadoop-2.6.0/ [[email protected] hadoop-2.6.0]# mv etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xml [[email protected] hadoop-2.6.0]# vi etc/hadoop/mapred-site.xml # 添加以下内容 <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property>
- 修改
yarn-site.xml
[[email protected] hadoop-2.6.0]# vi etc/hadoop/yarn-site.xml <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.hostname</name> <value>hadoop</value> </property>
- 啟動YARN的服務
僞分布式的YARN叢集
[[email protected] hadoop-2.6.0]# start-yarn.sh starting yarn daemons starting resourcemanager, logging to /usr/hadoop-2.6.0/logs/yarn-root-resourcemanager-hadoop.out hadoop: starting nodemanager, logging to /usr/hadoop-2.6.0/logs/yarn-root-nodemanager-hadoop.out [[email protected] hadoop-2.6.0]# jps 6892 ResourceManager # master 6974 NodeManager # slave
四、MapReduce
思想了解
Hadoop MapReduce是一個軟體架構,基于該架構能夠容易地編寫應用程式,這些應用程式能夠運行在由上千台商用機器組成的大叢集上,并以一種可靠的,具有容錯能力的方式并行地處理上TB級别的海量資料集。這個定義里面有着這些關鍵詞:
一是軟體架構,二是并行處理,三是可靠且容錯,四是大規模叢集,五是海量資料集。
MapReduce擅長處理大資料,它為什麼具有這種能力呢?這可由MapReduce的設計思想發覺。MapReduce的思想就是“分而治之”或者“化繁為簡”。
-
負責“分”,即把複雜的任務分解為若幹個“簡單的任務”來處理。 “簡單的任務”包含三層含義:Mapper
- 一是資料或計算的規模相對于原任務大大縮小;
- 二是就近計算原則,即任務會配置設定到存放着所需資料的節點上進行計算;
- 三是這些小任務可以并行計算,彼此間幾乎沒有依賴關系。
-
主要負責對map階段的結果進行彙總Reducer
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-LQAYmADH-1590287261155)(D:\Learnspace\training camp\day03\圖檔\2019081402.png)]
基本開發
建立Maven工程,導入依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.6.0</version>
</dependency>
開發MapReduce應用程式
單詞計數的應用程式
MapReduce應用程式的兩個階段:
- Mapper:将大任務拆分為若幹個小任務,将非結構化的資料映射為KV結構的資料
- Reducer:負責統計計算
準備樣例檔案
How are you
Where are you from
Welcome to BJ
Are you ok
将模拟資料上傳到HDFS中
[[email protected] ~]# hdfs dfs -put data.txt /baizhi
定義Mapper任務
package com.baizhi;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* *Writable表示Hadoop提供的序列化對象
* LongWritable
* IntWritable
* String ---> Text
* ...
* <p>
* Mapper階段
* keyIn: LongWritable 每行資料的首字元的offset(位置)
* valueIn: Text 一行記錄
* keyOut: Text 單詞
* valueOut: IntWritable 初始值 1
*/
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 映射方法
* How are you ---> (how,1) (are,1) (you,1)
*
* @param key keyIn
* @param value valueIn
* @param context 上下文對象(MapReduce應用程式運作的上下文資訊載體)
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.toLowerCase().split(" ");
for (String word : words) {
// 輸出處理完成kv資料
context.write(new Text(word), new IntWritable(1));
}
}
}
定義Reducer任務
package com.baizhi;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
/**
* reducer階段 統計和計算
* keyIn:類型等價于Mapper的keyOut
* valueIn:類型等價于Mapper的valueOut
* keyOut:單詞 Text
* valueOut:總次數 IntWritable
*/
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 統計計算方法
* how are you
* are you ok
* are [1,1]
* you [1,1]
* how [1]
*
* @param key 單詞
* @param values key相同的初始值的集合
* @param context 上下文對象
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
Iterator<IntWritable> iterator = values.iterator(); //擷取疊代器對象
while (iterator.hasNext()){
int num = iterator.next().get(); // 1
count += num;
}
// 計算完成後 輸出計算結果
context.write(key,new IntWritable(count));
}
}
初始化類
package com.baizhi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* 單詞計數的初始化類
*/
public class WordCountApplication {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1. 建立MapReduce任務對象
Configuration conf = new Configuration();
String jobName = "wordcount";
Job job = Job.getInstance(conf,jobName);
job.setJarByClass(WordCountApplication.class);
//2. 設定計算資料的輸入格式和計算結果的輸出格式(文本)
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//3. 指定計算資料的來源位置以及計算結果的輸出位置
TextInputFormat.addInputPath(job,new Path("/baizhi/data.txt"));
// 注意:計算結果的輸出目錄必須不存在
TextOutputFormat.setOutputPath(job,new Path("/baizhi/result"));
//4. 指定MapReduce應用的Mapper階段和Reducer階段的實作類
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//5. 設定Mapper階段和Reducer階段的KeyOut和ValueOut的類型
job.setMapOutputKeyClass(Text.class); // mapper的keyOut的類型
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6. 送出任務
job.waitForCompletion(true); // true 輸出運作日志
}
}
将MapReduce應用程式打為 jar
包
jar
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-wE80SRkY-1590287261156)(D:\Learnspace\training camp\day03\圖檔\2019081403.png)]
測試運作
- 将應用jar包 上傳到Linux作業系統中
-
使用指令送出MapReduce應用程式
文法:
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-97rcOtnY-1590287261158)(D:\Learnspace\training camp\day03\圖檔\2019081404.png)]hadoop jar xxx.jar 入口類的全限定名
檢視計算結果
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-HLRaaTz0-1590287261160)(D:\Learnspace\training camp\day03\圖檔\2019081405.png)]
第二個案例(流量統計)
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-pz1gYwf3-1590287261161)(D:\Learnspace\training camp\day03\圖檔\2019081406.png)]
MapReduce應用程式的其它運作方式
注意:
在生産環境中,MapReduce Application一定是運作在YARN分布式叢集中的,
但是,在開發測試MapReduce應用程式時,我們可以使用以下方式來測試代碼
本地計算 + 本地資料
本地計算指的是借助于Windows平台的Hadoop環境模拟運作MapReduce程式
本地資料指的是計算的資料來源于Windows平台,并且輸出到本地
- 修改初始化類中如下代碼
// 注意:file:/// 表示使用本地檔案系統中的資料
TextInputFormat.addInputPath(job,new Path("file:///e:\\ssby.txt"));
// 注意:計算結果的輸出目錄必須不存在
TextOutputFormat.setOutputPath(job,new Path("file:///e:\\result"));
- 運作程式
右鍵初始化類 ---> Run as
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-kA7wPnj7-1590287261163)(D:\Learnspace\training camp\day03\圖檔\2019081407.png)]# 如出現以下異常 Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z # 解決方案: 1. 在項目的根目錄中建立包 org.apache.hadoop.io.nativeio 2. 在包中建立類 NativeIO 3. 找到Hadoop的NativeIO類将所有的代碼複制到自建的NativeIO中 4. 修改NativeIO中的源碼(關聯源碼是557行,未關聯是287行),修改為:return true; 5. 重新運作,得到運作結果
本地計算 + 遠端資料
- 修改初始化類
//3. 指定計算資料的來源以及計算結果的輸出位置
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop:9000/baizhi/data.txt"));
// 注意:計算結果的輸出目錄必須不存在
TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop:9000/baizhi/result3"));
- 運作程式
右鍵初始化類 --> Run as
- 通路控制異常,添加虛拟機參數
-DHADOOP_USER_NAME=root
遠端計算 + 遠端資料
遠端計算指MapReduce應用程式依然運作在YARN叢集中
遠端資料指資料來源于HDFS或者輸出到HDFS
- 修改初始化類
// 添加遠端計算的支援 //=============================================================== conf.set("fs.defaultFS", "hdfs://hadoop:9000/"); conf.set("mapreduce.job.jar", "file:///F:\\IdeaProjects\\20190812\\hadoop-mapreduce\\target\\hadoop-mapreduce-1.0-SNAPSHOT.jar"); conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.hostname", "hadoop"); conf.set("yarn.nodemanager.aux-services", "mapreduce_shuffle"); conf.set("mapreduce.app-submission.cross-platform", "true"); conf.set("dfs.replication", "1"); //===============================================================
- 将Maven項目重新打包
maven plugin ---> package---> xxx.jar
- 運作程式
右鍵初始化類 ---> Run as
作業
- 某系統被通路的日志樣例資料,資料格式如下:
# 用戶端的ip位址 請求時間 請求方式 通路資源 響應的位元組大小 狀态碼
192.168.0.3 2019-08-14 15:30:15 GET /index.jsp 300 200
11.135.14.110 2019-08-14 15:32:10 POST /user/login.do 500 404
...
-
(Page View): 系統的通路量PV
mapreduce map: k: 日期 v: 1 reduce: k: 日期 values: [1,1,1,1]
-
(Unique Visitor): 獨立使用者的通路量UV
mapreduce map: k: 日期 v:ip reduce: k: 日期 values:[ip,ip,ip] values ---> Set
MapReduce程式的運作流程
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-4f4VN1l0-1590287261165)(D:\Learnspace\training camp\day04\圖檔\2019081501.png)]
MapReduce任務送出的源碼剖析
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-LRwh2Dl6-1590287261167)(D:\Learnspace\training camp\day04\圖檔\2019081502.png)]
InputFormat和OutputFormat
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-9FzWcvNP-1590287261169)(D:\Learnspace\training camp\day04\圖檔\2019081503.png)]
InputFormat
InputFormat資料的輸入格式對象
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-swrCTYqM-1590287261171)(D:\Learnspace\training camp\day04\圖檔\2019081504.png)]
TextInputFormat例析
-
getSplits
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-ZN7DZETL-1590287261171)(D:\Learnspace\training camp\day04\圖檔\2019081505.png)]
-
createRecordReader
特點:1. InputFormat決定如何對計算的資料集進行邏輯切割(==140.8MB==) 2. InputFormat決定如何解析讀取資料切片(split)中的資料内容,并且map任務的keyIn和valueIn的類型由RecordReader中的key、value決定 3. 一個InputSplit會由一個Map任務進行映射處理 4. Inputformat負責輸入資料的合法性校驗
常見的InputFormat
- FileInputFormat
-
: 基于文本的資料輸入格式對象TextInputFormat
特點:按行讀取文本中的資料 KeyIn:LongWritable ValueIn:Text
-
NLineInputFormat
特點:将文本中的N行(預設為1行)資料作一個資料切片 KeyIn:LongWritable ValueIn:Text
設定N行:
conf.set("mapreduce.input.lineinputformat.linespermap","3");
-
KeyValueLineRecordReader
特點:按照KV解析文本中的資料 KeyIn:Text ValueIn:Text
資料切片的計算規則等同于
資料切片的讀取方式按照KV的結構進行解析TextInputFormat
預設為mapreduce.input.keyvaluelinerecordreader.key.value.separator
例如:\t
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
-
FixedLengthInputFormat
-
CombineTextInputFormat
特點:将多個小檔案的内容整合到一個資料切片中 KeyIn:LongWritable ValueIn: Text
-
- DBInputFormat
-
DBInputFormat
特點: 從資料庫中擷取資料,将獲得的資料作為Map任務的輸入
KeyIn: LongWritable ValueIn:extends DBWritable
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-AmPk94qT-1590287261173)(D:\Learnspace\training camp\day04\圖檔\2019081506.png)]
- 開發自定義的Writable對象,讀寫資料庫表中的記錄
package com.baizhi.inputformat.db; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Date; /** * 通過OrderWritable對象讀寫資料庫的記錄 */ public class OrderWritable implements DBWritable { private Integer orderId; private Double totalMoney; private Date createTime; private Integer userId; public OrderWritable() { } public OrderWritable(Integer orderId, Double totalMoney, Date createTime, Integer userId) { this.orderId = orderId; this.totalMoney = totalMoney; this.createTime = createTime; this.userId = userId; } public void write(PreparedStatement pstm) throws SQLException { pstm.setInt(2, this.orderId); pstm.setDouble(3, this.totalMoney); java.sql.Date date = new java.sql.Date(this.createTime.getTime()); pstm.setDate(4, date); pstm.setInt(5, this.userId); } public void readFields(ResultSet rs) throws SQLException { this.orderId = rs.getInt("order_id"); this.totalMoney = rs.getDouble("total_money"); this.createTime = rs.getDate("create_time"); this.userId = rs.getInt("user_id"); } }
- 開發處理的Map任務
package com.baizhi.inputformat.db; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.Date; public class OrderMapper extends Mapper<LongWritable, OrderWritable, Text, DoubleWritable> { /** value: 資料庫一行記錄 */ @Override protected void map(LongWritable key, OrderWritable value, Context context) throws IOException, InterruptedException { Date createTime = value.getCreateTime(); Integer userId = value.getUserId(); Double totalMoney = value.getTotalMoney(); String month = createTime.getYear() + "-" + createTime.getMonth() + "-" + userId; context.write(new Text(month), new DoubleWritable(totalMoney)); } }
- 開發統計的Reduce任務
package com.baizhi.inputformat.db; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; public class OrderReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double sum = 0.0D; Iterator<DoubleWritable> iterator = values.iterator(); while (iterator.hasNext()) { DoubleWritable money = iterator.next(); sum += money.get(); } context.write(key, new DoubleWritable(sum)); } }
- 設定初始化類
package com.baizhi.inputformat.db; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.iq80.leveldb.DB; import java.io.IOException; public class OrderComputApplication { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); // 設定資料源資訊 configuration.set(DBConfiguration.DRIVER_CLASS_PROPERTY,"com.mysql.jdbc.Driver"); configuration.set(DBConfiguration.URL_PROPERTY,"jdbc:mysql://localhost:3306/vue"); configuration.set(DBConfiguration.USERNAME_PROPERTY,"root"); configuration.set(DBConfiguration.PASSWORD_PROPERTY,"root"); Job job = Job.getInstance(configuration, "order"); job.setJarByClass(OrderComputApplication.class); job.setInputFormatClass(DBInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // select order_id,total_money... from t_order where ... order by ... DBInputFormat.setInput(job,OrderWritable.class,"t_order",null,null, "order_id","total_money","create_time","user_id"); TextOutputFormat.setOutputPath(job,new Path("file:///E:/result5")); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); job.waitForCompletion(true); } }
- 引入資料源的驅動jar包
本地計算:在Maven項目中導入MySQL的依賴即可
遠端計算:将MySQL的驅動jar包上傳到
中Hadoop安裝目錄的/share/hadoop/yarn/lib
- 開發自定義的Writable對象,讀寫資料庫表中的記錄
-
IutputFormat作用
結論:
- 決定如何對計算的資料集進行邏輯切割
- 決定如何解析讀取資料切片(split)中的資料内容
- 負責輸入資料的合法性校驗
OutputFormat
OutputFormat資料的輸出格式對象,決定了如何将Reducer的計算結果輸出到指定的存儲系統中
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-HPTYoyCw-1590287261175)(D:\Learnspace\training camp\day05\圖檔\2019081601.png)]
常見的OutputFormat
- FileOutputFormat:基于檔案的資料輸出格式
-
TextOutputFormat
特點:計算的結果以文本的形式儲存在檔案中,文本中一行結果為Reduce方法的keyOut valueOut
-
- DBOutputFormat:基于資料庫的資料輸出格式
-
DBOutputFormat
特點:将Reducer的計算結果輸出儲存到資料庫,reduce方法每輸出一次則在資料庫産生一條記錄
-
- TableOutputFormat:基于HBase的資料輸出格式
OutputFormat作用
結論:
決定計算結果以何種格式儲存到指定的存儲系統中
2. 校驗計算結果的輸出位置是否合法
Shuffle原理剖析
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-3Wzlsjf3-1590287261177)(D:\Learnspace\training camp\day05\圖檔\2019081602.png)]
Shuffle是指對Map任務的輸出結果進行分區、排序、合并等處理後交給Reduce任務處理的過程,分為Map端的操作和Reduce端的操作。
Shuffle過程
-
Map端的Shuffle
Map的輸出結果首先被緩存到記憶體,當緩存區容量到達80%(緩沖區預設100MB),就啟動溢寫操作。當啟動溢寫操作時,首先需要把緩存中的資料進行分區,然後對每個分區的資料進行排序和合并(combine),之後再寫入磁盤檔案。每次溢寫操作都會生成一個新的磁盤檔案,随着Map任務的執行,磁盤中就會生成多個溢寫檔案。在Map任務全部結束前,這些溢寫檔案會被歸并成一個大的磁盤檔案,然後通知相應的Reduce任務來拉取自己所應處理的分區資料。
-
在Reduce端的Shuffle過程
Reduce任務會從Map端的不同Map機器上拉取自己所應處理的分區資料,然後對分區資料進行排序合并後交給Reduce任務去處理。
作用
- 保證每一個Reduce任務處理的資料量大緻是相同的
- Map任務輸出的key相同,分區也一定相同,并且肯定是相同的Reduce處理的,保證計算結果的準确性
- Reduce任務的數量決定了分區的數量,Reduce任務越多計算處理的并行度也就越高
Reduce任務的數量(預設為1)可以通過:
job.setNumReduceTasks(數量)
特點
- Map端溢寫時,key相同所在分區一定相同
- Map端溢寫時,排序減少了Reduce任務全局排序的複雜度
- Map端溢寫時,合并(Combiner【可選】)減少溢寫檔案的體積,提高了Reduce任務在Fetch資料時的效率,它是一種MapReduce優化政策
- Reduce端計算或者輸出時,它的資料都是有序的
Shuffle源碼追蹤
-
MapTask
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-dABOuAJw-1590287261178)(D:\Learnspace\training camp\day05\圖檔\2019081603.png)]
-
ReduceTask
(略)
建議閱讀
資料清洗
資料清洗是指将原始資料處理成有價值資料的過程,這一過程稱為資料清洗。
企業大資料開發的基本流程:
- 采集資料(Flume、Logstash)先儲存到MQ(Kafka)中
- 将MQ中的暫存資料存放到HDFS中儲存
- 資料清洗(低價值密度的資料處理)存放到HDFS
- 算法幹預(MapReduce),計算結果儲存到HDFS或者HBase
- 計算結果的可視化展示(ECharts、Highcharts)
需求
現有某系統某天的Nginx的通路日志,格式如下:
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
110.52.250.126 - - [30/May/2013:17:38:20 +0800] "GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1" 200 1292
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_1.gif HTTP/1.1" 200 680
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_2.gif HTTP/1.1" 200 682
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/filetype/common.gif HTTP/1.1" 200 90
大資料處理的算法,需要參數==用戶端的ip位址、請求時間、請求資源、響應狀态碼
正規表達式提取資料
Regex Expression主要作用:字元串、
比對
、
抽取
替換
文法
規則 | 解釋 |
---|---|
. | 比對任意字元 |
\d | 比對任意數字 |
\D | 比對任意非數字 |
\w | 配置a-z和A-Z |
\W | 比對非a-z和A-Z |
\s | 比對空白符 |
^ | 比對字元串的開頭 |
$ | 比對字元串的末尾 |
規則的比對次數
文法 | 解釋 |
---|---|
* | 規則比對0到n次 |
? | 規則比對1次 |
{n} | 規則比對n次 |
{n,m} | 規則比對n到m次 |
+ | 規則比對1到n次(至少1次) |
應用
# 比對手機号碼 11位數值構成
\d{11}
# 郵箱位址校驗 @
.+@.+
使用正規表達式提取Nginx通路日志中的四項名額
測試站點:http://regex101.com
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-IFmbJs8a-1590287261180)(D:\Learnspace\training camp\day06\圖檔\2019081901.png)]
分析後得到需要的正規表達式
^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*\[(.*)\]\s"\w*\s(.*)\sHTTP\/1.1"\s(\d{3}).*$
使用MapReduce分布式并行計算架構進行資料清洗
注意:因為資料清洗不涉及統計計算,是以MapReduce程式通常隻有map任務,沒有reduce任務 job.setNumReduceTasks(0)
實作代碼
資料清洗的Mapper
package com.baizhi.dataclean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class DataCleanMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
/**
* @param key
* @param value nginx通路日志中的一行記錄(原始資料)
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String regex = "^(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}).*\\[(.*)\\]\\s\"\\w*\\s(.*)\\sHTTP\\/1.1\"\\s(\\d{3}).*$";
String line = value.toString();
final Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
final Matcher matcher = pattern.matcher(line);
while (matcher.find()) {
// 四項關鍵名額 ip 請求時間 請求資源 響應狀态碼
String clientIp = matcher.group(1);
// yyyy-MM-dd HH:mm:ss
String accessTime = matcher.group(2);
String accessResource = matcher.group(3);
String status = matcher.group(4);
// 30/May/2013:17:38:21 +0800
// 30/05/2013:17:38:21
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
try {
Date date = sdf.parse(accessTime);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String finalDate = sdf2.format(date);
context.write(new Text(clientIp + " " + finalDate + " " + accessResource + " " + status), null);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
}
初始化類
package com.baizhi.dataclean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class DataCleanApplication {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration(), "data clean");
job.setJarByClass(DataCleanApplication.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.setInputPaths(job,new Path("file:///E:/access.log"));
TextOutputFormat.setOutputPath(job,new Path("file:///E:/final"));
job.setMapperClass(DataCleanMapper.class);
// 注意:資料清洗通常隻有map任務而沒有reduce任務
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.waitForCompletion(true);
}
}
資料傾斜
資料分區預設政策
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-CFdJRlwz-1590287261180)(D:\Learnspace\training camp\day06\圖檔\2019081902.png)]
資料傾斜是指大量key相同的資料交由一個Reduce任務去統計計算,造成”閑的閑死,忙的忙死“這樣的現象,不符合分布式并行計算的設計初衷。
現象
- 某一個Reduce任務運作特别耗時
- Reduce任務運作時,記憶體突然溢出
解決方案
- 增加執行Reduce任務機器的JVM記憶體(硬體的水準擴充)
- 增加Reduce任務的數量,每個Reduce任務隻負責極少部分的資料處理,并且Reduce任務的數量增加提高了資料計算的并行度
Reduce任務的正确數量:0.95或者1.75 * (NodeManage數量 * 每個節點的最大容器數量)
- 自定義分區規則Partitioner
package com.baizhi.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 自定義分區規則
*/
public class CustomPartitioner extends Partitioner<Text, LongWritable> {
/**
* @param key
* @param value
* @param i numReduceTasks
* @return 分區序号
*/
public int getPartition(Text key, LongWritable value, int i) {
if (key.toString().equals("CN-GD")) return 0;
else if (key.toString().equals("CN-GX")) return 1;
else if (key.toString().equals("CN-HK")) return 2;
else if (key.toString().equals("JP-TY")) return 3;
else return 4;
}
}
- 合理使用
,将key相同的value進行歸并Combiner
在Combiner合并時,要求value必須能支援疊代計算,并且不能影響Reduce任務的輸入
Combiner通常就是Reducer任務
// 優化政策:combiner合并操作
job.setCombinerClass(MyReducer.class);
五、Hadoop完全分布式叢集
架構
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-BVsr3Kwd-1590287261183)(D:\Learnspace\training camp\day06\圖檔\2019081903.png)]
環境搭建
準備3台虛拟機
- node1:
192.168.12.130
- node2:
192.168.12.131
- node3:
192.168.12.132
服務劃分
服務名 | node1 | node2 | node3 |
---|---|---|---|
Namenode | Y(主) | Y(備) | |
Datanode | Y | Y | Y |
Journal node | Y | Y | Y |
Zookeeper | Y | Y | Y |
ResourceManager | Y(主) | Y(備) | |
NodeManger | Y | Y | Y |
配置步驟
準備安裝包
# jdk1.8+
# centos7.2
# hadoop-2.6.0
# zookeeper-3.4.6
修改主機名和IP位址的映射檔案
[[email protected] ~]# vi /etc/hosts
192.168.12.130 node1
192.168.12.131 node2
192.168.12.132 node3
關閉防火牆
[[email protected] ~]# systemctl stop firewalld
[[email protected] ~]# systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
SSH免密登入
[[email protected] ~]# ssh-keygen -t rsa
[[email protected] ~]# ssh-copy-id node1
[[email protected] ~]# ssh-copy-id node2
[[email protected] ~]# ssh-copy-id node3
同步時鐘
[[email protected] ~]# date -s '2018-12-1 20:06:00'
2018年 12月 01日 星期六 20:06:00 CST
[[email protected] ~]# clock -w
[[email protected] ~]# date
2018年 12月 01日 星期六 20:06:10 CST
注意:伺服器叢集時鐘如果一緻,可以跳過此步驟!
修改伺服器的主機名
[[email protected] ~]# vi /etc/hostname
# 192.168.12.130
node1
# 192.168.12.131
node2
# 192.168.12.132
node3
重新開機機器,生效
安裝JDK1.8+
# 将叢集搭建的安裝包上傳到某節點
# 利用scp指令拷貝到其它節點
[[email protected] ~]# scp jdk-8u191-linux-x64.rpm [email protected]:~
jdk-8u191-linux-x64.rpm
[[email protected] ~]# scp jdk-8u191-linux-x64.rpm [email protected]:~
jdk-8u191-linux-x64.rpm
安裝ZooKeeper
[[email protected] ~]# scp zookeeper-3.4.6.tar.gz [email protected]:~
zookeeper-3.4.6.tar.gz 100% 17MB 84.3MB/s 00:00
[[email protected] ~]# scp zookeeper-3.4.6.tar.gz [email protected]:~
zookeeper-3.4.6.tar.gz 100% 17MB 73.4MB/s 00:00
[[email protected] ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr
[[email protected] ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg
tickTime=2000
dataDir=/root/zkdata
clientPort=2181
initLimit=5
syncLimit=2
server.1=node1:2887:3887
server.2=node2:2887:3887
server.3=node3:2887:3887
[[email protected] ~]# mkdir -p /root/zkdata
# node1執行此指令
[[email protected] ~]# cd zkdata/
[[email protected] zkdata]# vi myid
1
# node2執行此指令
[[email protected] ~]# cd zkdata/
[[email protected] zkdata]# vi myid
2
# node3執行此指令
[[email protected] ~]# cd zkdata/
[[email protected] zkdata]# vi myid
3
# 啟動ZooKeeper叢集
[[email protected] ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh start /usr/zookeeper-3.4.6/conf/zoo.cfg
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/conf/zoo.cfg
Starting zookeeper ... STARTED
# 确認zookeper服務是否正常:方法一
[[email protected] ~]# jps
1777 QuorumPeerMain
1811 Jps
# 确認zookeper服務是否正常:方法二
[[email protected] ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh status /usr/zookeeper-3.4.6/conf/zoo.cfg
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/conf/zoo.cfg
Mode: leader
安裝Hadoop
#1.将hadoop的安裝包遠端拷貝到其它的節點
[[email protected] ~]# scp hadoop-2.6.0_x64.tar.gz [email protected]:~
hadoop-2.6.0_x64.tar.gz 100% 172MB 113.8MB/s 00:01
[[email protected] ~]# scp hadoop-2.6.0_x64.tar.gz [email protected]:~
hadoop-2.6.0_x64.tar.gz 100% 172MB 120.8MB/s 00:01
#2.安裝
[[email protected] ~]# tar -zxf hadoop-2.6.0_x64.tar.gz -C /usr
#3.配置java和hadoop的環境變量
[[email protected] ~]# vi ~/.bashrc
HADOOP_HOME=/usr/hadoop-2.6.0
JAVA_HOME=/usr/java/latest
CLASSPATH=.
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export JAVA_HOME
export CLASSPATH
export PATH
export HADOOP_HOME
[[email protected] ~]# source ~/.bashrc
修改Hadoop的配置檔案
-
core-site.xml
[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/core-site.xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://mycluster</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/hadoop-2.6.0/hadoop-${user.name}</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>30</value>
</property>
<property>
<name>net.topology.script.file.name</name>
<value>/usr/hadoop-2.6.0/etc/hadoop/rack.sh</value>
</property>
- 建立機架腳本檔案,該腳本可以根據IP判斷機器所處的實體位置
[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/rack.sh
while [ $# -gt 0 ] ; do
nodeArg=$1
exec</usr/hadoop-2.6.0/etc/hadoop/topology.data
result=""
while read line ; do
ar=( $line )
if [ "${ar[0]}" = "$nodeArg" ] ; then
result="${ar[1]}"
fi
done
shift
if [ -z "$result" ] ; then
echo -n "/default-rack"
else
echo -n "$result "
fi
done
[[email protected] ~]# chmod u+x /usr/hadoop-2.6.0/etc/hadoop/rack.sh
[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/topology.data
192.168.12.130 /rack1
192.168.12.131 /rack2
192.168.12.132 /rack2
[[email protected] ~]# /usr/hadoop-2.6.0/etc/hadoop/rack.sh 192.168.23.137
/rack1
-
hdfs-site.xml
[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/hdfs-site.xml
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>node1:2181,node2:2181,node3:2181</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>node1:9000</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>node2:9000</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://node1:8485;node2:8485;node3:8485/mycluster</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
-
slaves
[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/slaves
node1
node2
node3
啟動HDFS HA叢集的服務
[[email protected] ~]# hadoop-daemon.sh start journalnode
[[email protected] ~]# hdfs namenode -format
[[email protected] ~]# hadoop-daemon.sh start namenode
[[email protected] ~]# hdfs namenode -bootstrapStandby
[[email protected] ~]# hadoop-daemon.sh start namenode
# zkfc: zookeeper failover controller
[[email protected]|2 ~]# hdfs zkfc -formatZK (可以在node1或者node2任意一台注冊namenode資訊)
[[email protected] ~]# hadoop-daemon.sh start zkfc (哨兵)
[[email protected] ~]# hadoop-daemon.sh start zkfc (哨兵)
[[email protected] ~]# hadoop-daemon.sh start datanode
注意:CentOS-7.x版本需要安裝一個中間依賴服務 [[email protected] ~]# yum install -y psmisc
YARN的HA叢集
-
mapred-site.xml
[[email protected] ~]# cp /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml.template /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml [[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property>
-
yarn-site.xml
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster1</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>node2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>node3</value> </property> <property> <name>yarn.resourcemanager.zk-address</name> <value>node1:2181,node2:2181,node3:2181</value> </property>
- 啟動YARN
[[email protected] ~]# yarn-daemon.sh start resourcemanager [[email protected] ~]# yarn-daemon.sh start resourcemanager [[email protected] ~]# yarn-daemon.sh start nodemanager
- 檢視ResourceManager HA狀态
[[email protected] ~]# yarn rmadmin -getServiceState rm1 active [[email protected] ~]# yarn rmadmin -getServiceState rm2 standby
namenode
zkfc: zookeeper failover controller
[[email protected]|2 ~]# hdfs zkfc -formatZK (可以在node1或者node2任意一台注冊namenode資訊)
[[email protected] ~]# hadoop-daemon.sh start zkfc (哨兵)
[[email protected] ~]# hadoop-daemon.sh start zkfc (哨兵)
[[email protected] ~]# hadoop-daemon.sh start datanode
> 注意:CentOS-7.x版本需要安裝一個中間依賴服務
>
> `[[email protected] ~]# yum install -y psmisc`
#### YARN的HA叢集
- `mapred-site.xml`
```xml
[[email protected] ~]# cp /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml.template /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml
[[email protected] ~]# vi /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
-
yarn-site.xml
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster1</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>node2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>node3</value> </property> <property> <name>yarn.resourcemanager.zk-address</name> <value>node1:2181,node2:2181,node3:2181</value> </property>
- 啟動YARN
[[email protected] ~]# yarn-daemon.sh start resourcemanager [[email protected] ~]# yarn-daemon.sh start resourcemanager [[email protected] ~]# yarn-daemon.sh start nodemanager
- 檢視ResourceManager HA狀态
[[email protected] ~]# yarn rmadmin -getServiceState rm1 active [[email protected] ~]# yarn rmadmin -getServiceState rm2 standby