天天看點

Hhadoop-2.7.0中HDFS寫檔案源碼分析(二):用戶端實作(1)

一、綜述

      HDFS寫檔案是整個Hadoop中最為複雜的流程之一,它涉及到HDFS中NameNode、DataNode、DFSClient等衆多角色的分工與合作。

      首先上一段代碼,用戶端是如何寫檔案的:

      隻有簡單的6行代碼,用戶端封裝的如此簡潔,各元件間的RPC調用、異常處理、容錯等均對用戶端透明。

      總體來說,最簡單的HDFS寫檔案大體流程如下:

      1、用戶端擷取檔案系統執行個體FileSyStem,并通過其create()方法擷取檔案系統輸出流outputStream;

            1.1、首先會聯系名位元組點NameNode,通過ClientProtocol.create()RPC調用,在名位元組點上建立檔案中繼資料,并擷取檔案狀态FileStatus;

            1.2、通過檔案狀态FileStatus構造檔案系統輸出流outputStream;

      2、通過檔案系統輸出流outputStream寫入資料;

             2.1、首次寫入會首先向名位元組點申請資料塊,名位元組點能夠掌握叢集DataNode整體狀況,配置設定資料塊後,連同DataNode清單資訊傳回給用戶端;

             2.2、用戶端采用流式管道的方式寫入資料節點清單中的第一個DataNode,并由清單中的前一個DataNode将資料轉發給後面一個DataNode;

             2.3、确認資料包由DataNode經過管道依次傳回給上遊DataNode和用戶端;

             2.4、寫滿一個資料塊後,向名位元組點送出一個資料;

             2.5、再次重複2.1-2.4過程;

      3、向名位元組點送出檔案(complete file),即告知名位元組點檔案已寫完,然後關閉檔案系統輸出流outputStream等釋放資源。

      可以看出,在不考慮異常等的情況下,上述過程還是比較複雜的。本文,我将着重闡述下HDFS寫資料時,用戶端是如何實作的,關于NameNode、DataNode等的配合等,後續文章将陸續推出,敬請關注!

二、實作分析

      我們将帶着以下問題來分析用戶端寫入資料過程:

      1、如何擷取資料輸出流?

      2、如何通過資料輸出流寫入資料?

      3、資料輸出流關閉時都做了什麼?

      4、如果發生異常怎麼辦?即如何容錯?

      (一)如何擷取資料輸出流?

      HDFS用戶端擷取資料流是一個複雜的過程,流程圖如下:

Hhadoop-2.7.0中HDFS寫檔案源碼分析(二):用戶端實作(1)

      以DistributedFileSystem為例,create()是其入口方法,DistributedFileSystem内部封裝了一個DFS的用戶端,如下:

      在DistributedFileSystem的初始化方法initialize()中,會構造這個檔案系統用戶端,如下:

      而create()方法就是通過這個檔案系統用戶端dfs擷取資料輸出流的,如下:

      FileSystemLinkResolver是一個檔案系統連結解析器(抽象類),我們待會再分析它,這裡隻要知道,該抽象類執行個體化後會通過resolve()方法--doCall()方法得到資料輸出流即可。接着往下DFSClient的create()方法,省略部分代碼,如下:

      實際上,它又通過DFSOutputStream的newStreamForCreate()方法來擷取資料輸出流,并開啟檔案租約。租約的内容我們後續再講,繼續看下如何擷取檔案輸出流的,如下:

      大體可以分為三步:

      1、首先,通過DFSClient中nameNode的Create()方法,在HDFS檔案系統名位元組點中建立一個檔案,并傳回檔案狀态HdfsFileStatus;

      2、構造一個資料輸出流;

      3、啟動資料輸出流。

      上述連接配接NameNode節點建立檔案的過程中,如果發生瞬時錯誤,會充分利用重試機制,增加系統容錯性。DFSClient中nameNode的Create()方法,實際上是調用的是用戶端與名位元組點間的RPC--ClientProtocol的create()方法,該方法的作用即是在NameNode上建立一個空檔案,并傳回檔案狀态。檔案狀态主要包括以下資訊:

      繼續看如何構造一個資料輸出流,實際上它是通過構造DFSOutputStream執行個體擷取的,而DFSOutputStream的構造方法如下:

      首先計算資料包塊大小,然後構造資料流對象,後續就依靠這個資料流對象來通過管道發送流式資料。接下來便是啟動資料輸出流,如下:

      很簡單,實際上也就是啟動資料流對象,通過這個資料流對象實作資料的發送。

      中間為什麼會有計算資料包塊大小這一步呢?原來,資料的發送是通過一個個資料包發送出去的,而不是通過資料塊發送的。設想下,如果按照一個資料塊(預設128M)大小發送資料,合理嗎?至于資料包大小是如何确定的,我們後續再講。

      (二)如何通過資料輸出流寫入資料?

      下面,該看看如何通過資料輸出流寫入資料了。要解決這個問題,首先分析下DFSOutputStream和DataStreamer是什麼。

      1、DFSOutputStream

      DFSOutputStream是分布式檔案系統輸出流,它内部封裝了兩個隊列:發送資料包隊列和确認資料包隊列,如下:

      用戶端寫入的資料,會addLast入發送資料包隊列dataQueue,然後交給DataStreamer處理。

      2、DataStreamer

      DataStreamer是一個背景工作線程,它負責在資料流管道中往DataNode發送資料包。它從NameNode申請擷取一個新的資料塊ID和資料塊位置,然後開始往DataNode的管道寫入流式資料包。每個資料包都有一個序列号sequence number。當一個資料塊所有的資料包被發送出去,并且每個資料包的确認資訊acks被接收到的話,DataStreamer關閉目前資料塊,然後再向NameNode申請下一個資料塊。

      是以,才會有上述發送資料包和确認資料包這兩個隊列。

      DataStreamer内部有很多變量,大體如下:

      有很多比較簡單,不再贅述。這裡隻講解幾個比較重要的:

      1、BlockConstructionStage stage

      目前資料塊構造階段。針對create()這種寫入 來說,開始時預設是BlockConstructionStage.PIPELINE_SETUP_CREATE,即管道初始化時需要向NameNode申請資料塊及所在資料節點的狀态,這個很容易了解。有了資料塊和其所在資料節點所在清單,才能形成管道清單不是?在資料流傳輸過程中,即一個資料塊寫入的過程中,雖然有多次資料包寫入,但狀态始終為DATA_STREAMING,即正在流式寫入的階段。而當發生異常時,則是PIPELINE_SETUP_STREAMING_RECOVERY狀态,即需要從流式資料中進行恢複,如果一個資料塊寫滿,則會進入下一個周期,PIPELINE_SETUP_CREATE->DATA_STREAMING,最後資料全部寫完後,狀态會變成PIPELINE_CLOSE,并且如果發生異常的話,會有一個特殊狀态對應,即PIPELINE_CLOSE_RECOVERY。而append開始時則是對應的狀态PIPELINE_SETUP_APPEND及異常狀态PIPELINE_SETUP_APPEND_RECOVERY,其它則一緻。

      2、volatile boolean hasError = false

      這個狀态位用來标記資料寫入過程中,是否存在錯誤,友善進行容錯。

      3、ResponseProcessor response

      響應處理器。這個也是背景工作線程,它會處理來自DataNode回複流中的确認包,确認資料是否發送成功,如果成功,将确認包從确認資料包隊列中移除,否則進行容錯處理。

      create()模式下的DataStreamer構造比較簡單,如下:

      isAppend設定為false,即不是append寫入,BlockConstructionStage預設為PIPELINE_SETUP_CREATE,即需要向NameNode寫入資料塊。

      我們首先看下DataStreamer是如何發送資料的。上面講到過,DFSOutputStream中包括兩個隊列:發送資料包隊列和确認資料包隊列。這類似于兩個生産者消--費者模型。針對發送資料包隊列,外部寫入者為生産者,DataStreamer為消費者。外部持續寫入資料至發送資料包隊列,DataStreamer則從中消費資料,判斷是否需要申請資料塊,然後寫入資料節點流式管道。而确認資料包隊列,DataStreamer為生産者,ResponseProcessor為消費者。首先,确認資料包隊列資料的産生,是DataStreamer發送資料給DataNode後,從發送資料包隊列挪過來的,而當ResponseProcessor線程确認接收到資料節點的ack确認包後,再從資料确認隊列中删除。

      關于ResponseProcessor線程,稍後再講。

      資料寫入過程之DataStreamer

      首先看DataStreamer的run()方法,它會在資料流沒有關閉,且dfs用戶端正在運作的情況下,一直循環,循環内處理的大體流程如下:

      1、如果遇到一個錯誤(hasErro),且響應器尚未關閉,關閉響應器,使之join等待;

      2、如果有DataNode相關IO錯誤,先預先處理,初始化一些管道和流的資訊,并決定外部是否等待,等待意即可以進行容錯處理,不等待則數目錯誤比較嚴重,無法進行容錯處理:這裡還判斷了errorIndex标志位和restartingNodeIndex的大小,意思是是否是由某個具體資料節點引起的錯誤,如果是的話,這種錯誤理論上是可以處理的;

      3、沒有資料時,等待一個資料包發送:等待的條件是:目前流沒有關閉(!streamerClosed)、沒有錯誤(hasError)、dfs用戶端正在 運作(dfsClient.clientRunning )、dataQueue隊列大小為0,且目前階段不是DATA_STREAMING,或者在需要sleep(doSleep)或者上次發包距離本次時間未超過門檻值的情況下為DATA_STREAMING

            意思是各種标記為正常,資料流處于正常發送的過程或者可控的非正常發送過程中,可控表現在狀态位doSleep,即上傳錯誤檢查中認為理論上可以進行修複,但是需要sleep已完成recovery的初始化,或者距離上次發送未超過時間的門檻值等。

      4、如果資料流關閉、存在錯誤、用戶端正常運作标志位異常時,執行continue:這個應該是對容錯等的處理,讓程式及時響應錯誤;

      5、擷取将要發送的資料包:

            如果資料發送隊列為空,構造一個心跳包;否則,取出隊列中第一個元素,即待發送資料包。

      6、如果目前階段是PIPELINE_SETUP_CREATE,申請資料塊,設定pipeline,初始化資料流:append的setup階段則是通過setupPipelineForAppendOrRecovery()方法完成的,并同樣會初始化資料流;

      7、擷取資料塊中的上次資料位置lastByteOffsetInBlock,如果超過資料塊大小,報錯;

      8、 如果是資料塊的最後一個包:等待所有的資料包被确認,即等待datanodes的确認包acks,如果資料流關閉,或者資料節點IO存在錯誤,或者用戶端不再正常運作,continue,設定階段為pipeline關閉

      9、發送資料包:将資料包從dataQueue隊列挪至ackQueue隊列,通知dataQueue的所有等待者,将資料寫入遠端的DataNode節點,并flush,如果發生異常,嘗試标記主要的資料節點錯誤,友善容錯處理;

      10、更新已發送資料大小:可以看出,資料包中存儲了其在資料塊中的位置LastByteOffsetBlock,也就标記了已經發送資料的總大小;

      11、資料塊寫滿了嗎?如果是最後一個資料塊,等待确認包,調用endBlock()方法結束一個資料塊 ;

      如果上述流程發生錯誤,hasError标志位設定為true,并且如果不是一個DataNode引起的原因,流關閉标志設定為true。

      最後,沒有資料需要發送,或者發生緻命錯誤的情況下,調用closeInternal()方法關閉内部資源。

      未完待續,請關注《Hhadoop-2.7.0中HDFS寫檔案源碼分析(二):用戶端實作(2)》。

三、代碼分析