DataBlockScanner是運作在資料節點DataNode上的一個背景線程。它為所有的塊池管理塊掃描。針對每個塊池,一個BlockPoolSliceScanner對象将會被建立,其運作在一個單獨的線程中,為該塊池掃描、校驗資料塊。當一個BPOfferService服務變成活躍或死亡狀态,該類中的blockPoolScannerMap将會更新。
我們先看下DataBlockScanner的成員變量,如下:
首先是由構造函數确定的三個成員變量:所屬資料節點DataNode執行個體datanode、所屬存儲FsDatasetSpi執行個體dataset、配置資訊Configuration執行個體conf,對應構造函數如下:
然後設定了一個靜态變量,5s的線程休眠周期,即SLEEP_PERIOD_MS,另外兩個重要的成員變量是:
1、TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap
存儲塊池ID到對應BlockPoolScanner執行個體的映射。當一個BPOfferService服務變成活躍或死亡狀态,blockPoolScannerMap将會随之更新。
2、Thread blockScannerThread
資料塊掃描線程。
既然DataBlockScanner實作了Runnable接口,那麼它肯定是作為一個線程在DataNode節點上運作的,我們看下DataNode是如何對其進行構造及啟動的,代碼如下:
首先,如果blockScanner不為null,直接傳回,說明之前已經初始化并啟動了,然後,确定資料塊校驗功能無法開啟的原因reason:
1、如果參數dfs.datanode.scan.period.hours未配置,或者配置為0,說明資料塊校驗功能已關閉;
2、SimulatedFSDataset不支援資料塊校驗;
如果資料塊校驗功能無法開啟的原因為null,構造DataBlockScanner執行個體,并調用其start()方法啟動該線程,否則在日志檔案中記錄周期性資料塊校驗掃描無法啟用的原因。
DataBlockScanner線程啟動的start()方法如下:
實際上它是基于DataBlockScanner執行個體建立一個線程blockScannerThread,将線程blockScannerThread設定為背景線程,然後啟動線程blockScannerThread。
DataBlockScanner線程已建立,并啟動,那麼我們看下它是如何工作的,接下來看下它的run()方法,代碼如下:
run()方法邏輯比較清晰,大體如下:
1、首先初始化目前塊池ID,即currentBpId,預設為空,再确定第一次運作标志firstRun,預設當然應該為true;
2、接下來進入一個while循環,循環的條件是如果所屬資料節點DataNode執行個體datanode正常運作,且目前線程沒有被中斷:
2.1、處理第一次運作标志位firstRun:
2.1.1、如果不是第一次運作,線程休眠5s:即firstRun為false,這時如果發生InterruptedException異常,中斷blockScannerThread線程,然後跳過,繼續下一輪循環;
2.1.2、第一次運作時先将firstRun标志設定為false;
2.2、擷取下一個塊池切片掃描器BlockPoolSliceScanner執行個體bpScanner,通過調用getNextBPScanner()方法,傳入目前塊池ID,即currentBpId來實作,首次循環,currentBpId為空,後續會傳入之前處理的值,下面會對其進行更新;
2.3、如果bpScanner為null,跳過,繼續下一輪循環;
2.4、設定目前塊池ID,即currentBpId,從塊池切片掃描器BlockPoolSliceScanner執行個體bpScanner中擷取;
2.5、如果目前塊池對應的心跳服務BPOfferService不是活躍的,不對它進行處理,調用removeBlockPool()方法從blockPoolScannerMap中移除資料,并關閉對應BlockPoolSliceScanner,然後跳過,執行下一輪循環;
2.6、調用塊池切片掃描器BlockPoolSliceScanner執行個體bpScanner的scanBlockPoolSlice()方法,掃描對應塊池裡的資料塊,進行資料塊校驗;
3、退出循環後,周遊blockPoolScannerMap中的每個BlockPoolSliceScanner執行個體bpss,挨個調用對應shutdown()方法,停止塊池切片掃描器BlockPoolSliceScanner。
我們接下來看下比較重要的getNextBPScanner()方法,代碼如下:
它的主要作用就是尋找下一個塊池ID以進行scan,其存在一個整體的while循環,循環的條件為如果所屬資料節點DataNode執行個體datanode正常運作,且目前blockScannerThread線程沒有被中斷,循環内做以下處理:
1、調用waitForInit()方法等待初始化;
2、目前對象上使用synchronized進行同步,當blockPoolScannerMap大小大于0,即存在BlockPoolSliceScanner執行個體時,做以下處理:
2.1、設定lastScanTime用于記錄上次浏覽時間,預設值為0;
2.2、周遊blockPoolScannerMap集合,取出每個塊池ID,即bpid,計算最早的上次浏覽時間lastScanTime,和對應塊池ID,即nextBpId:
2.2.1、根據塊池ID,即bpid,取出其對應BlockPoolSliceScanner執行個體的上次浏覽時間t;
2.2.2、如果t不為0,且如果塊池ID為null,或者t小于lastScanTime,則将t指派給lastScanTime,bpid指派給nextBpId,也就是計算最早的上次浏覽時間lastScanTime,和對應塊池ID,即nextBpId;
2.3、如果對應塊池ID,即nextBpId為null,則取比上次處理的塊池currentBpId高的key作為nextBpId,如果還不能取出的話,那麼取第一個塊池ID,作為nextBpId;
2.4、如果nextBpId不為空,那麼從blockPoolScannerMap中擷取其對應BlockPoolSliceScanner執行個體傳回;
3、如果blockPoolScannerMap大小等于0,或者上述2找不到的話,記錄warn日志,No block pool is up, going to wait,然後等待5s後繼續下一輪循環;
最後,實在找不到就傳回null。
可見,getNextBPScanner()方法優先選取最早處理過的塊池,找不到的話再按照之前處理過的塊池ID增長的順序,找下一個塊池ID,按照塊池ID大小順序到尾部的話,再折回取第一個。
其中等待初始化的waitForInit()方法比較簡單,代碼如下:
它本質上是等所有塊池都被上報至blockPoolScannerMap集合後,才認為已完成初始化,然後再挑選塊池ID,否則線程休眠5s,繼續等待。代碼注釋比較詳細,這裡不再贅述!
擷取到塊池ID,并擷取到其對應的塊池切片掃描器BlockPoolSliceScanner執行個體bpScanner了,接下來就是調用bpScanner的scanBlockPoolSlice()方法,掃描該塊池的資料塊,并做資料塊校驗工作了。這方面的内容,請閱讀《HDFS源碼分析資料塊校驗之BlockPoolSliceScanner》一文,這裡不再做介紹。
到了這裡,各位看官可能有個疑問,選取塊池所依賴的blockPoolScannerMap集合中的資料是哪裡來的呢?答案就在處理資料節點心跳的BPServiceActor線程中,在完成資料塊彙報、處理來自名位元組點NameNode的相關指令等操作後,有如下代碼被執行:
很簡單,資料節點彙報資料塊給名位元組點,并執行來自名位元組點的相關指令後,就可以通過資料節點DataNode中成員變量blockScanner的addBlockPool()方法,添加塊池,代碼如下:
邏輯很簡單,首先需要看看blockPoolScannerMap集合中是否存在塊池blockPoolId,存在即傳回,否則根據塊池blockPoolId、資料節點datanode、存儲dataset、配置資訊conf等構造BlockPoolSliceScanner執行個體bpScanner,将塊池blockPoolId與bpScanner的映射關系存儲到blockPoolScannerMap中,最後記錄日志資訊。
我們在上面也提到了如果目前塊池對應的心跳服務BPOfferService不是活躍的,那麼會調用removeBlockPool()方法,移除對應的塊池,代碼如下:
代碼很簡單,不再贅述。
總結
DataBlockScanner是運作在資料節點DataNode上的一個背景線程,它負責管理所有塊池的資料塊掃描工作。當資料節點DataNode發送心跳給名位元組點NameNode進行資料塊彙報并執行完傳回的指令時,會在DataBlockScanner的内部集合blockPoolScannerMap中注冊塊池ID與為此新建立的BlockPoolSliceScanner對象的關系,然後DataBlockScanner内部線程blockScannerThread周期性的挑選塊池currentBpId,并擷取塊池切片掃描器BlockPoolSliceScanner執行個體bpScanner,繼而調用其scanBlockPoolSlice()方法,掃描對應塊池裡的資料塊,進行資料塊校驗。塊池選擇的主要依據就是優先選擇掃描時間最早的,也就是自上次掃描以來最長時間沒有進行掃描的,按照這一依據選擇不成功的話,則預設按照塊池ID遞增的順序循環選取塊池。