天天看點

HBase源碼分析之HRegionServer上compact流程分析

        前面三篇文章中,我們詳細叙述了compact流程是如何在HRegion上進行的,了解了它的很多細節方面的問題。但是,這個compact在HRegionServer上是如何進行的?合并時檔案是如何選擇的呢?在這篇文章中,你将找到答案!

        首先,在HRegionServer内部,我們發現,它定義了一個CompactSplitThread類型的成員變量compactSplitThread,單看字面意思,這就是一個合并分裂線程,那麼它會不會就是HRegionServer上具體執行合并的工作線程呢?我們一步一步來看。

        要了解它是什麼,能夠做什麼,那麼就必須要看看它的實作,找到CompactSplitThread類,so,開始我們的分析之旅吧!

        首先,看下CompactSplitThread中都定義可哪些變量,如下:

        其中,關于Region的Spilt、Merge相關的成員變量我們暫時忽略,等到專門講解split、merge時再單獨介紹。這裡,先了解下CompactSplitThread中都有哪些關于compact的成員變量,大體可以分為三類:

        1、第一類是配置參數及其預設值相關的,涉及到large、small合并線程數參數和其預設值以及HBase整體配置變量Configuration類型的conf;

        2、第二類是線程池,包括long合并線程池longCompactions和short合并線程池shortCompactions,它們統一使用的Java中的ThreadPoolExecutor;

        3、第三類是CompactSplitThread的載體,或者說工作的環境,HRegionServer執行個體server。

        既然已經存在合并的線程池,那麼很簡單,将合并線程扔到線程池中等待排程就是了。那麼是由哪些方法來完成的這一步呢?答案就在requestCompaction()及requestSystemCompaction()系列方法,而這一系列的requestCompaction()和requestSystemCompaction()方法參數不同,也僅意味着應用場景不同而已,最終還是要落到requestCompactionInternal()方法上的。同時,需要強調一點,requestCompaction()方法和requestSystemCompaction()方法有一個顯著的差別,那就是在最終調用requestCompactionInternal()方法時,前者傳入的selectNow為true,而後者傳入的selectNow為false,這點需要特别注意下,下面也會講到。先撇開都哪些地方會調用requestCompaction()系列方法,也就是compact發起的時機、條件等,我們後續會分析,這裡我們先來看下requestCompactionInternal(),代碼如下:

        直接說下大體流程吧!首先,需要做一些必要的檢查,比如比如HRegionServer是否已停止、HRegion對應的表是否允許Compact操作等,然後這裡有一個關鍵的地方,就是上述的selectNow,如果不是system compaction,selectNow為true,也就意味着它需要調用selectCompaction()方法,擷取CompactionContext,而這本質上就是要選取待合并檔案。我們先看下selectCompaction()方法,代碼如下:

        而這個方法最終還是調用HStore的requestCompaction()方法來擷取CompactionContext,繼續分析:

        這裡我們隻叙述下主要過程,requestCompaction()方法的處理邏輯大體如下:

        1、如果對應HRegion不可寫,直接傳回null;

        2、在我們做合并之前,試着擺脫不必要的檔案來簡化事情;

        3、通過存儲引擎storeEngine建立合并上下文CompactionContext類型的compaction;

        4、加讀鎖;

        5、如果存在協處理器:通過CompactionContext的preSelect()方法,選擇StoreFile,傳回StoreFilel清單;

        6、如果合并請求為空,即不存在協處理器:調用CompactionContext的select()方法,初始化compaction中的合并請求requst;

        7、如果之前傳入的請求baseRequest不為空,則合并之;

        8、擷取合并請求request;

        9、從合并請求request中擷取待合并檔案集合selectedFiles;

        10、将選擇的檔案集合加入到filesCompacting中,解答了之前文章的疑問;

        11、設定标志位forceMajor:是否為major合并;

        12、request中設定優先級、設定描述資訊;

        13、解除讀鎖;

        14、調用HRegion的reportCompactionRequestStart()方法,彙報一個compact請求開始;

        15、傳回合并上下文compaction。

        現在我們着重看下如何通過調用CompactionContext的select()方法初始化compaction中的合并請求requst,其他步驟比較簡單,在此不一一叙述了。

        現在我們就看下其預設實作類DefaultCompactionContext中的select()方法,代碼如下:

        它是利用合并政策compactionPolicy的selectCompaction()方法,擷取合并請求request。那麼按照上面講的,我看下合并政策的一種實作RatioBasedCompactionPolicy的selectCompaction()方法實作,代碼如下:

        我們撿重點的說,大體流程如下:

        1、根據傳入的參數candidateFiles,建立一個候選的StoreFile清單;

               candidateFiles為通過storeFileManager.getStorefiles()方法擷取的Store下的全部存儲檔案。

        2、确定futureFiles,如果filesCompacting為空則為0,否則為1;

        3、從候選清單candidateSelection中排除正在合并的檔案,即filesCompacting中的檔案;

        4、驗證是否包含所有檔案,設定标志位isAllFiles,判斷的條件就是此時的候選清單candidateSelection大小是否等于初始的candidateFiles清單大小,而candidateFiles代表了Store下的全部檔案;

        5、如果不是強制的Major合并,且不包含所有的檔案,則調用skipLargeFiles()方法,跳過較大檔案,并再次确定标志位isAllFiles;

        6、确定isTryingMajor,共兩種情況:

            (1)強制Major合并,且包含所有問檔案,且是一個使用者合并;

            (2)強制Major合并,且包含所有問檔案,或者本身就是一個Major合并,同時,必須是candidateSelection的數目小于配置的達到合并條件的最大檔案數目;

        7、candidates中存在引用的話,則視為是在分裂後的檔案,即isAfterSplit為true;

        8、如果不是TryingMajor,且不是在分裂後isAfterSplit,再次篩選檔案:

              8.1、通過filterBulk()方法取出不應該位于Minor合并的檔案;

              8.2、通過applyCompactionPolicy()方法,使用一定的算法,進行檔案的篩選;

              8.3、通過checkMinFilesCriteria()方法,判斷是否滿足合并時最小檔案數的要求;

        9、通過removeExcessFiles()方法在candidateSelection中移除過量的檔案;

        10、檢視是否為全部檔案:再次确定标志位isAllFiles;

        11、利用candidateSelection構造合并請求CompactionRequest對象result;

        12、設定請求中的标志位;

        13、傳回合并請求CompactionRequest對象result。

        我們主要分析下其中檔案篩選的一些方法。

        首先看跳過大檔案的skipLargeFiles()方法,代碼如下:

        它會周遊檔案清單candidates,最主要的一個判斷,清單指定位置的檔案大小是否超過門檻值comConf.getMaxCompactSize(),這個門檻值優先取參數hbase.hstore.compaction.max.size,參數未配置的話取Long.MAX_VALUE。

        其次再看下取出不應該位于Minor合并的檔案的filterBulk()方法,代碼如下:

        它根據StoreFile的标志位excludeFromMinorCompaction判斷,而excludeFromMinorCompaction為true是當HFile資訊的中繼資料中存在EXCLUDE_FROM_MINOR_COMPACTION标志時設定的,說了這麼多,其實它就是要排除BulkLoad進入HBase的檔案!

        然後,我們再看下比較複雜的applyCompactionPolicy()方法,代碼如下:

        這個applyCompactionPolicy()方法是RatioBasedCompactionPolicy合并政策的精髓,我們需要細細分析,它的主要步驟為:

        1、如果檔案清單為空,原樣傳回;

        2、擷取檔案合并比例:取參數hbase.hstore.compaction.ratio,預設為1.2,如果可以在峰值使用,取參數hbase.hstore.compaction.ratio.offpeak,預設為5.0,也就是說将參數調整大些;

        3、計算待合并檔案數目countOfFiles;

        4、定義用于存放檔案大小的數組fileSizes;

        5、定義用于存放該檔案之後在最大檔案數這個範圍内所有檔案(包含該檔案)大小合計的數組sumSize;

        6、倒序周遊candidates檔案清單:

              6.1、将檔案大小放入數組fileSizes指定位置;

              6.2、tooFar表示後移動最大檔案數位置的檔案大小,其實也就是從i開始剛剛滿足達到最大檔案數位置的那個檔案,也就是說,從i至tooFar數目為合并時允許的最大檔案數,它類似于一個平滑的檔案視窗;

              6.3、計算合計:該檔案大小fileSizes[i] + (截止到下一個檔案大小sumSize[i + 1]) - 後移動最大檔案數位置的檔案大小,也就是說sumSize[i]對應的被統計檔案,永遠是滿足合并時允許的最大檔案數這個門檻值的,它相當于一個滑動的區間,區間大小為合并時允許的最大檔案數,sumSize[i]對應的值為已該i開始所處檔案視窗的所有檔案大小合計。

        7、正序循環,如果檔案數目滿足最小合并時允許的最小檔案數,且該位置的檔案大小,大于合并時允許的檔案最小大小與下一個檔案視窗檔案總大小乘以一定比例中的較大者,則繼續,實際上就是選擇出一個檔案視窗内能兼顧最小檔案數和最小檔案大小的一組檔案;

        8、保證最小檔案數目的要求,必要時進行截取;

        9、截取并傳回截取後的檔案清單。

        上面的一個中心思想就是選出滿足條件的最小的一組檔案來合并。

        緊接着,我們看下檢測是否滿足最小檔案數大的checkMinFilesCriteria()方法,代碼如下:

        很直接有木有,不滿足合并時最小檔案數要求,直接clear,太奔放了!

        最後,我們看下如何移除過量的檔案,即removeExcessFiles()方法,代碼如下:

        它是要求待合并檔案數不能超過系統設定的合并時最大檔案數。

        至此,合并請求的生成和檔案的選擇就到此為止了。

        接下來再回到CompactSplitThread的requestCompactionInternal()方法,看下它對線程池是如何處理的。這裡,它首先假設大部分合并都是small,是以它将系統引發的合并放進small pool,然後在特定的時機再做決斷,如果有必要的話會挪至large pool。也就是說,如果selectNow為false,即系統自身引發的合并,比如MemStore flush、compact檢查線程等,統一放入到shortCompactions中,即small pool;而如果是人為觸發的,即selectNow為true,比如HBase

shell觸發的,則還要看HStore中合并請求大小是否超過門檻值,超過則放入longCompactions,即large pool,否則還是small pool。

        那麼這個HStore中合并請求大小是否超過門檻值是如何計算的呢?我們跟蹤下HStore的throttleCompaction()方法,代碼如下:

        它實際上是調用的合并政策CompactionPolicy的throttleCompaction()方法。那麼,都有哪幾種合并政策呢?總結起來,一共有兩種:RatioBasedCompactionPolicy和StripeCompactionPolicy。現在我們以RatioBasedCompactionPolicy為例來講,另一種StripeCompactionPolicy以後再分析。看下它的throttleCompaction()方法:

        它是将傳入的compactionSize與comConf.getThrottlePoint()來比較的,傳入的compactionSize實際上為上面提到的compaction.getRequest().getSize(),也就是合并請求的大小totalSize,這個totalSize是通過CompactionRequest的recalculateSize()方法計算得到的,代碼如下:

        它周遊待合并檔案StoreFile,擷取其Reader,通過它獲得檔案長度并累加至totalSize。

        而comConf是其父類CompactionPolicy中關于compact配置的CompactionConfiguration類型成員變量,其getThrottlePoint()方法如下:

        實際上取得是CompactionConfiguration的成員變量throttlePoint,而throttlePoint在其構造方法中定義如下:

        優先取參數hbase.regionserver.thread.compaction.throttle,如果參數未配置,預設為最大合并檔案數maxFilesToCompact與MemStore flush大小的兩倍,而這個maxFilesToCompact的取值如下:

        也就是取參數hbase.hstore.compaction.max,參數未配置的話預設為10。那麼MemStore flush大小是如何擷取的呢?它實際上是通過StoreConfigInformation接口的getMemstoreFlushSize()方法擷取的,而需要使用的最終實作該方法的類,還是HStore,代碼如下:

        各位看官可能有疑問了,既然compact是以Store為機關進行的,為什麼這裡擷取的是region的memstoreFlushSize呢?我們知道,HBase并不是一個純粹意義上的列式資料庫,它的MemStore flush的發起,并不是以Store為機關進行的,而是整個Region,這也是HBase一開始飽受诟病的列簇Column Family不能過多的原因。那麼,這裡的memstoreFlushSize就可以很容易了解為什麼要擷取Region的了。

        這個memstoreFlushSize我們之前介紹過,這裡再回顧下,memstoreFlushSize為HRegion上設定的一個門檻值,當MemStore的大小超過這個門檻值時,将會發起flush請求,它的計算首先是由Table決定的,即每個表可以設定自己的memstoreFlushSize,通過關鍵字MEMSTORE_FLUSHSIZE來設定,如果MEMSTORE_FLUSHSIZE未設定,則取參數hbase.hregion.memstore.flush.size,參數未配置的話,則預設為1024*1024*128L,即128M。

        用俺們山東人的話來說,落落了這麼多,到底是什麼意思呢?很簡單,它就是看合并請求中涉及的資料量大小是否超過一個門檻值,超過則放入large pool,未超過則放入small pool。這個門檻值可以通過參數直接配置,不配置的話,則是最大可合并檔案數與引起MemStore的flush的門檻值memstoreFlushSize的兩倍,這個memstore flush到檔案中,是不是就是檔案的總大小呢?檔案數乘以檔案大小,是不是邏輯上近似于待合并資料的大小呢?大體就是這麼個意思。

        好了,“資料”的目的地--線程池選好了,接下來就是該把“資料”放入線程池了。既然是線程池,那麼這個“資料”就應該是一個線程,我們繼續看。

        這一句展現的再明白不過了,将HStore、HRegion、合并上下文CompactionContext、線程池ThreadPoolExecutor包裝成一個CompactionRunner對象,扔入線程池中執行。而CompactionRunner給我們的第一印象就是,它必定是一個可執行的線程。那麼我們就看下它的代碼吧:

        先看類的定義,類的定義中直接展現了,實作了Runnable接口意味着它是一個線程。而它除了構造函數傳入的那四個成員變量外,還有個表示優先級的成員變量queuedPriority,它的初始化是在構造方法中完成的。如果合并上下文compaction為空,則通過HStore的getCompactPriority()方法擷取,否則直接從合并請求中擷取,而合并請求中的,實際上也是通過調用requestCompactionInternal()方法的priority傳入的。我們接下來看下HStore的getCompactPriority()方法:

        它轉而從StoreFileManager中擷取Compact Priority,繼續吧!在StoreFileManager的預設實作DefaultStoreFileManager中,代碼如下:

        優先級為上述blockingFileCount減去目前storefiles的數目。而blockingFileCount優先取參數hbase.hstore.blockingStoreFiles,未配置的話再預設為7。還記得isTooManyStoreFiles這個方法嗎?MemStore在進行flush時會判斷HRegion上每個HStore下的檔案數是否太多,太多則意味着MemStore的flush會被推遲進行,優先進行compact,否則檔案數則會越來越多,而這裡,離blockingFileCount越遠,目前檔案數越小的話,則意味着MemStore的flush可以優先進行,而compact可以在它flush之後再進行,将資源利用效率最大化。

        接下來,我們在看下CompactionRunner中最重要的run()方法,代碼如下:

        run()方法以上來,也是會首選做一些必要的環境判斷,比如HRegionServer是否已停止、HRegion對應的表是否允許Compact操作等。

        然後,針對compaction為null的情況,進行compaction的初始化,即待合并檔案的選擇。在這個過程之前,會先判斷下優先級,之前的Compact優先級指派給oldPriority,擷取HStore的Compact優先級,如果目前優先級queuedPriority大于之前的oldPriority的話,即HStore下檔案數目減少了,則會推遲compact,可以優先進行flush,将該CompactionRunner再扔回線程池。如果優先級滿足條件,則繼續,通過selectCompaction()選擇待合并檔案,并再次判斷下是應該在large池中執行還是應該在small池中執行,此次隻根據上述的那個門檻值來判斷。

        接下來,如果換池了,HStore調用cancelRequestedCompaction()方法取消合并請求,複位compaction為null,換池,并再次放入線程池,後續會再初始化compaction,然後就return。

        如果沒換池的話,確定compaction不為空,調用HRegion的compact,針對store執行compact,計算執行時間,并獲得compact的執行結果,根據合并結果确定下一步操作。

        如果合并成功,如果優先級Priority小于等于0,意味着目前檔案已經太多,則需要發起一次SystemCompaction,否則請求分裂,實際上是看Region的大小是否超過門檻值,進而引起分裂。

        整個CompactSplitThread的工作流程已描述完畢。那麼接下來的問題,就是何時什麼情況下會發起compact請求?發起的compact請求又有如何不同呢?是否會有定期檢查的工作線程,促使compact在滿足一定條件的情況下進行呢?

        且聽下回分解。