天天看點

hadoop 核心概念及入門Hadoop

Hadoop

Hadoop 核心概念

什麼是 Hadoop

HADOOP是apache旗下的一套開源軟體平台HADOOP提供利用伺服器叢集,根據使用者的自定義業務邏輯,對海量資料進行分布式處理,HADOOP的核心元件有:HDFS(分布式檔案系統)、YARN(運算資源排程系統)、MAPREDUCE(分布式運算程式設計架構),廣義上來說,HADOOP通常是指一個更廣泛的概念——HADOOP生态圈

Hadoop 産生背景

HADOOP最早起源于Nutch。Nutch的設計目标是建構一個大型的全網搜尋引擎,包括網頁抓取、索引、查詢等功能,但随着抓取網頁數量的增加,遇到了嚴重的可擴充性問題——如何解決數十億網頁的存儲和索引問題。2003年、2004年谷歌發表的兩篇論文為該問題提供了可行的解決方案。——分布式檔案系統(GFS),可用于處理海量網頁的存儲——分布式計算架構MAPREDUCE,可用于處理海量網頁的索引計算問題。Nutch的開發人員完成了相應的開源實作HDFS和MAPREDUCE,并從Nutch中剝離成為獨立項目HADOOP,到2008年1月,HADOOP成為Apache頂級項目,迎來了它的快速發展期。

HADOOP生态圈以及各組成部分的簡介

  • HDFS:分布式檔案系統
  • MAPREDUCE:分布式運算程式開發架構
  • HIVE:基于大資料技術(檔案系統+運算架構)的SQL資料倉庫工具
  • HBASE:基于HADOOP的分布式海量資料庫
  • ZOOKEEPER:分布式協調服務基礎元件
  • Mahout:基于mapreduce/spark/flink等分布式運算架構的機器學習算法庫
  • Oozie:工作流排程架構
  • Sqoop:資料導入導出工具
  • Flume:日志資料采集架構

分布式系統概述

注:由于大資料技術領域的各類技術架構基本上都是分布式系統,是以,了解hadoop、storm、spark等技術架構,都需要具備基本的分布式系統概念

什麼是分布式

分布式系統是由一組通過網絡進行通信、為了完成共同的任務而協調工作的計算機節點組成的系統。分布式系統的出現是為了用廉價的、普通的機器完成單個計算機無法完成的計算、存儲任務。其目的是利用更多的機器,處理更多的資料 。分布式系統的特點是:硬體獨立,各裝置之間獨立,互不依賴、 軟體統一,對使用者來說,就像是跟單個系統打交道。

為什麼需要分布式

為了性能擴充:系統負載高,單台機器無法承受,希望通過多台機器來提高系統負載能力 為了增強可靠性:軟體不是完美的,網絡不是完美的,甚至機器也不是完美的,随時可能出錯,為了避免故障,需要将業務分散開保留一定的備援度

分布式軟體系統(Distributed Software Systems)

  1. 該軟體系統會劃分成多個子系統或子產品,各自運作在不同的機器上,子系統或子產品之間通過網絡通信進行協作,實作最終的整體功能
  2. 比如分布式作業系統、分布式程式設計語言及其編譯(解釋)系統、分布式檔案系統和分布式資料庫系統等。

為什麼使用分布式

  • 單機處理能力存在瓶頸;
  • 更新單機處理能力的成本效益越來越低;
  • 分布式系統穩定性、可用性好

web日志資料挖掘

“Web點選流日志”包含着網站營運很重要的資訊,通過日志分析,我們可以知道網站的通路量,哪個網頁通路人數最多,哪個網頁最有價值,廣告轉化率、訪客的來源資訊,訪客的終端資訊等。

hadoop叢集搭建

叢集簡介

HADOOP叢集具體來說包含兩個叢集:HDFS叢集和YARN叢集,兩者邏輯上分離,但實體上常在一起HDFS叢集:負責海量資料的存儲,叢集中的角色主要有 NameNode / DataNode YARN叢集:負責海量資料運算時的資源排程,叢集中的角色主要有 ResourceManager / NodeManager (那mapreduce是什麼呢?它其實是一個應用程式開發包)

伺服器配置設定

本叢集搭建案例,以5節點為例進行搭建,角色配置設定如下:

節點 程序
node-1 NameNode ResourceManager
node-2 SecondaryNameNode
node-3 DataNode NodeManager
node-4 DataNode NodeManager
node-5 DataNode NodeManager

伺服器系統配置

  1. 配置系統網絡,安裝常用依賴包如檔案上傳、編輯器、網絡工具
  2. 同步時間、配置各主機名稱映射、配置ssh免密登陸
  3. 安裝java環境

hadoop叢集安裝部署

# 下載下傳
$ wget https://mirrors.shu.edu.cn/apache/hadoop/common/hadoop-2.7.7/hadoop-2.7.7.tar.gz

# 解壓縮
$  tar -zxvf hadoop-2.7.7/hadoop-2.7.7.tar.gz

# 修改系統環境變量
$ vi /etc/profile

export HADOOP_HOME=/export/servers/hadoop2.7.7
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin


# 配置環境變量
$ vi  hadoop-env.sh

export JAVA_HOME=/usr/local/jdk1.8.0_191

# core-site.xml配置如下
$ vi core-site.xml

<configuration>   
    <property>   
        <name>fs.defaultFS</name>   
        <value>hdfs://node-1:9000</value>   
    </property>   
    <property>   
        <name>hadoop.tmp.dir</name>   
        <value>/export/data/hadoop/tmp</value>
    </property>
</configuration>


# hdfs-site.xml配置如下
$ vi  hdfs-site.xml

<configuration>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>/export/data/hadoop/name</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/export/data/hadoop/data</value>
    </property>

    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>

    <property>
        <name>dfs.secondary.http.address</name>
        <value>node-2:50090</value>
    </property>

</configuration>

# mapred-site.xml配置如下
$ vi mapred-site.xml

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>


# yarn-site.xml配置如下
$ vi yarn-site.xml

<configuration>

  <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>node-1</value>
    </property>

    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>

</configuration>

# 修改 slaves 配置
$ vim salves

node-3
node-4
node-5

           

啟動hadoop叢集

# 初始化HDFS
$ hadoop namenode -format

# 啟動HDFS
$ $HADOOP_HOME/sbin/start-dfs.sh

# 啟動yarn
$ $HADOOP_HOME/sbin/start-yarn.sh

# 啟動hdfs和yarn
$ $HADOOP_HOME/sbin/start-all.sh

# hadoop常用指令

# 檢視hadoop版本
$ hadoop verion

# 運作jar檔案
$ hadoop jar

# 建立檔案夾
$ hadoop fs -mkdir -p /data/input

# 遞歸檢視檔案夾
$ hadoop fs -ls -R /data/input

# 上傳本地檔案到HDFS
$ hadoop fs -put /root/words.txt  /data/input

# 檢視HDFS上的檔案
$ hadoop fs -cat /data/input/words.txt

# 删除HDFS上的檔案
$ hadoop fs -rm -f /data/input/words.txt

           

運作一個mapreduce程式

使用hadoop自帶的例子程式,運作一個示例mr程式

$ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar wordcount /data/input /data/output
           

叢集使用初步

可打開web控制台檢視 HDFS 和 yarn 叢集資訊 hdfs 的 web 頁面是 http://node-1:50070/ yarn 叢集資訊的 web 頁面是http://node-1:8088/

mapreduce是hadoop中的分布式運算程式設計架構,隻要按照其程式設計規範,隻需要編寫少量的業務邏輯代碼即可實作一個強大的海量資料并發處理程式

HDFS詳解

HDFS前言

設計思想

  • 分而治之:将大檔案、大批量檔案,分布式存放在大量伺服器上,以便于采取分而治之的方式對海量資料進行運算分析;

在大資料系統中作用:

  • 為各類分布式運算架構(如:mapreduce,spark,tez,……)提供資料存儲服務

重點概念

  • 檔案切塊,副本存放,中繼資料

HDFS的概念和特性

首先,它是一個檔案系統,用于存儲檔案,通過統一的命名空間——目錄樹來定位檔案

其次,它是分布式的,由很多伺服器聯合起來實作其功能,叢集中的伺服器有各自的角色

重要特性如下

  • HDFS中的檔案在實體上是分塊存儲(block),塊的大小可以通過配置參數( dfs.blocksize)來規定,預設大小在hadoop2.x版本中是128M,老版本中是64M
  • HDFS檔案系統會給用戶端提供一個統一的抽象目錄樹,用戶端通過路徑來通路檔案,形如:hdfs://host:port/xxx/xxx/file.data
  • 目錄結構及檔案分塊資訊(中繼資料)的管理由namenode節點承擔namenode是HDFS叢集主節點,負責維護整個hdfs檔案系統的目錄樹,以及每一個路徑(檔案)所對應的block塊資訊(block的id,及所在的datanode伺服器)
  • 檔案的各個 block 的存儲管理由datanode節點承擔 datanode 是HDFS叢集從節點,每一 個block 都可以在多個 datanode 上存儲多個副本(副本數量也可以通過參數設定dfs.replication)
  • HDFS是設計成适應一次寫入,多次讀出的場景,且不支援檔案的修改

(注:适合用來做資料分析,并不适合用來做網盤應用,因為,不便修改,延遲大,網絡開銷大,成本太高)

HDFS常用指令

dfs == hadoop fs

# 在hdfs上建立目錄
$ dfs -mkdir /xxx

# 顯示目錄資訊
$ dfs -ls -r /xxx

# 遞歸檢視HDFS目錄檔案
$ dfs -lsr  /xxx

# 等同于copyFromLocal 
$ dfs -put a.html /xxx

# 等同于copyToLocal,就是從hdfs下載下傳檔案到本地
$ dfs -get /xxx/a.html

# 删除檔案夾下所有檔案
$ dfs -rm -r -f /data

# 顯示檔案内容
$ dfs -cat  /xxx/a.txt

# 從本地剪切粘貼到hdfs
$ dfs -moveFromLocal

# 從hdfs剪切粘貼到本地
$ dfs -moveToLocal

# 追加一個檔案到已經存在的檔案末尾
$ dfs -appendToFile

# 顯示一個檔案的末尾
$ dfs -tail

# 從hdfs的一個路徑拷貝hdfs的另一個路徑
$ dfs -cp

# 從hdfs的一個路徑拷貝hdfs的另一個路徑
$ dfs -mv

# 删除檔案或檔案夾
$ dfs -rm
           

HSFS原理

  • 概述
    • HDFS叢集分為兩大角色:NameNode、DataNode (Secondary Namenode)
    • NameNode負責管理整個檔案系統的中繼資料
    • DataNode 負責管理使用者的檔案資料塊
    • 檔案會按照固定的大小(blocksize)切成若幹塊後分布式存儲在若幹台datanode上
    • 每一個檔案塊可以有多個副本,并存放在不同的datanode上
    • Datanode會定期向Namenode彙報自身所儲存的檔案block資訊,而namenode則會負責保持檔案的副本數量
    • HDFS的内部工作機制對用戶端保持透明,用戶端請求通路HDFS都是通過向namenode申請來進行
  • HDFS上傳檔案的步驟

    用戶端要向HDFS寫資料,首先要跟namenode通信以确認可以寫檔案并獲得接收檔案block的datanode,然後,用戶端按順序将檔案逐個block傳遞給相應datanode,并由接收到block的datanode負責向其他datanode複制block的副本

    • 根namenode通信請求上傳檔案,namenode檢查目标檔案是否已存在,父目錄是否存在
    • namenode傳回是否可以上傳
    • client請求第一個 block該傳輸到哪些datanode伺服器上
    • namenode傳回3個datanode伺服器ABC
    • client請求3台dn中的一台A上傳資料(本質上是一個RPC調用,建立pipeline),A收到請求會繼續調
    • 然後B調用C,将真個pipeline建立完成,逐級傳回用戶端
    • client開始往A上傳第一個block(先從磁盤讀取資料放到一個本地記憶體緩存),以packet為機關,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答
    • 當一個block傳輸完成之後,client再次請求namenode上傳第二個block的伺服器。
  • HDFS 讀資料流程

    用戶端将要讀取的檔案路徑發送給namenode,namenode擷取檔案的元資訊(主要是block的存放位置資訊)傳回給用戶端,用戶端根據傳回的資訊找到相應datanode逐個擷取檔案的block并在用戶端本地進行資料追加合并進而獲得整個檔案

    • 跟namenode通信查詢中繼資料,找到檔案塊所在的datanode伺服器
    • 挑選一台datanode(就近原則,然後随機)伺服器,請求建立socket流
    • datanode開始發送資料(從磁盤裡面讀取資料放入流,以packet為機關來做校驗)
    • 用戶端以packet為機關接收,現在本地緩存,然後寫入目标檔案

namenode職責

負責用戶端請求的響應,中繼資料的管理(查詢,修改)

中繼資料管理

namenode對資料的管理采用了三種存儲形式:

1.記憶體中繼資料(NameSystem)

2.磁盤中繼資料鏡像檔案

3.資料記錄檔檔案(可通過日志運算出中繼資料)

中繼資料存儲機制

  • 記憶體中有一份完整的中繼資料(記憶體meta data)
  • 磁盤有一個“準完整”的中繼資料鏡像(fsimage)檔案在 namenode 的工作目錄中
  • 用于銜接記憶體metadata和持久化中繼資料鏡像fsimage之間的記錄檔 edits 檔案

注:當用戶端對hdfs中的檔案進行新增或者修改操作,操作記錄首先被記入edits日志檔案中,當用戶端操作成功後,相應的中繼資料會更新到記憶體meta.data中

中繼資料手動檢視

  • 可以通過hdfs的一個工具來檢視edits中的資訊
  • bin/hdfs oev -i edits -o edits.xml
  • bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml

中繼資料的checkpoint

每隔一段時間,會由secondary namenode将namenode上積累的所有edits和一個最新的fsimage下載下傳到本地,并加載到記憶體進行merge(這個過程稱為checkpoint)

中繼資料目錄說明

在第一次部署好Hadoop叢集的時候,我們需要在NameNode(NN)節點上格式化磁盤:

$ $HADOOP_HOME/bin/hdfs namenode -format

格式化完成之後,将會在$dfs.namenode.name.dir/current目錄下如下的檔案結構

current/
|-- VERSION
|-- edits_*
|-- fsimage_0000000000008547077
|-- fsimage_0000000000008547077.md5
|-- seen_txid
           

DATANODE的工作機制

  • Datanode工作職責:

存儲管理使用者的檔案塊資料,定期向namenode彙報自身所持有的block資訊(通過心跳資訊上報,這點很重要,因為,當叢集中發生某些block副本失效時,叢集如何恢複block初始副本數量的問題)

<property>
    <name>dfs.blockreport.intervalMsec</name>
    <value>3600000</value>
</property>
           
  • Datanode掉線判斷時限參數

datanode程序死亡或者網絡故障造成datanode無法與namenode通信,namenode不會立即把該節點判定為死亡,要經過一段時間,這段時間暫稱作逾時時長。HDFS預設的逾時時長為10分鐘+30秒。如果定義逾時時間為timeout,則逾時時長的計算公式為:

timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。

而預設的heartbeat.recheck.interval 大小為5分鐘,dfs.heartbeat.interval預設為3秒。

需要注意的是hdfs-site.xml 配置檔案中的heartbeat.recheck.interval的機關為毫秒,dfs.heartbeat.interval的機關為秒。是以,舉個例子,如果heartbeat.recheck.interval設定為5000(毫秒),dfs.heartbeat.interval設定為3(秒,預設),則總的逾時時間為40秒。

<property>
    <name>heartbeat.recheck.interval</name>
    <value>2000</value>
</property>
<property>
    <name>dfs.heartbeat.interval</name>
    <value>1</value>
</property>
           

驗證DATANODE功能

上傳一個檔案,觀察檔案的block具體的實體存放情況:

在每一台datanode機器上的這個目錄中能找到檔案的切塊:

/export/data/hadoop/tmp/dfs/data/current/BP-193442119-192.168.2.120-1432457733977/current/finalized

hadoop hdfs java用戶端

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;

/**
 * <p>
 *
 * @author leone
 * @since 2018-05-06
 **/
public class HdfsClientTest {

    FileSystem fs = null;

    /**
     * 初始化
     */
    @Before
    public void init() throws Exception {
        Configuration conf = new Configuration();
        fs = FileSystem.get(new URI("hdfs://node-1:9000/"), conf, "root");
    }

    /**
     * 上傳檔案
     *
     * @throws IOException
     */
    @Test
    public void putTest() throws IOException {
        fs.copyFromLocalFile(new Path("file:///D:/tmp/hadoop/input1/words.txt"),
                new Path("hdfs://node-1:9000/hadoop-2.7.7/input1/words.txt"));
    }


    /**
     * 下載下傳檔案
     *
     * @throws Exception
     * @throws IOException
     */
    @Test
    public void download() throws Exception, IOException {
        fs.copyToLocalFile(new Path("/hadoop-2.7.7/input1/words.txt"), new Path("file:///E:/hadoop/words.txt"));
    }


    /**
     * 建立目錄
     *
     * @throws IllegalArgumentException
     * @throws IOException
     */
    @Test
    public void mkdirTest() throws IllegalArgumentException, IOException {
        fs.mkdirs(new Path("/test/input/"));
    }

    /**
     * 删除檔案或目錄
     */
    @Test
    public void deleteTest() throws IOException {
        fs.delete(new Path("/test"), true);
    }

    /**
     * 移動檔案
     *
     * @throws IllegalArgumentException
     * @throws IOException
     */
    @Test
    public void mvFileTest() throws IllegalArgumentException, IOException {
        fs.rename(new Path("/hadoop-2.7.7/input1/words.txt"), new Path("/hadoop-2.7.7/input3/words.txt"));
    }

}

           

繼續閱讀