天天看點

HBase源碼分析之org.apache.hadoop.hbase.regionserver包

1、Splitpolicy是Region在滿足什麼條件下,需要split操作,0.94版本中預設是IncreasingToUpperBoundRegionSplitPolicy 1)ConstantSizeRegionSplitPolicy, 2)KeyPrefixRegionSplitPolicy, 3)RegionSplitPolicy,可以在配置檔案中指定policy的實作hbase.regionserver.region.split.policy,預設是IncreasingToUpperBoundRegionSplitPolicy 4)IncreasingToUpperBoundRegionSplitPolicy,  對于split,并不是設定了hbase.hregion.max.filesize(預設10G)為很大就保證不split了,需要有以下的算法,參見 IncreasingToUpperBoundRegionSplitPolicy是0.94.0預設region split政策 這裡的split有一個判斷條件,先計算這tableRegionsCount(regionserver上的這個table的online的region個數), 然後循環計算此region的所有store是否太大,這是通過getSizeToCheck方法計算出一個size,若目前的store總大小大于這個值,則表示此region需要split. getSizeToCheck的計算方法首先判斷tableRegionsCount是否等于0,若是則傳回hbase.hregion.max.filesize ,若不是,則計算Math.min(getDesiredMaxFileSize(),

this.flushSize * (tableRegionsCount * tableRegionsCount)。

2、memstore flush,compact,split

FlushRequester, MemStoreFlusher,實作FlushRequester接口,基于線程服務,在RegionServer啟動的時候初始化,主要做flush memstore中的cache。            這裡有一些參數需要關注一下,  

hbase.regionserver.global.memstore.upperLimit hbase.regionserver.global.memstore.lowerLimit
           *. 如果全局memstore size超過了HighWaterMark(0.4), Block, 強制逐個flush region并且等待, 直到memstore size小于  HighWaterMark(0.4)
*. 如果全局memstore size在HighWaterMark(0.4)和LowWaterMark(0.35)之間,送出一個flush request給flush thread. 不阻塞.

          hbase.hregion.memstore.flush.size 針對Region中的所有的memstore的記憶體之和memstoreSize的,當 memstoreSize大于該值時,就請求flush操作。

          hbase.hregion.memstore.block.multiplier(2),memstore預設大小(flushsize)的2倍(預設值)-----blockingMemStoreFile(region server在寫入時會檢查每個region對應的memstore的總大小是否超過了memstore預設大小的2倍(hbase.hregion.memstore.block.multiplier決定),如果超過了則鎖住memstore不讓新寫請求進來并觸發flush,避免産生OOM)

hbase.hstore.compaction.ratio(.12f),當一個檔案<=sum(比它小的檔案) * hbase.hstore.compaction.ratio 将被選擇 hbase.hstore.compaction.min(hbase.hstore.compactionThreshold,老的版本) 一次壓縮前,至少有多少個storeFile被選擇 預設: 3 hbase.hstore.compaction.max 每個“小”合并的HStoreFiles最大數量。預設: 10 hbase.hstore.blockingStoreFiles 當一個HStore含有多于這個值的HStoreFiles(每一個memstore flush産生一個HStoreFile)的時候,會執行一個合并操作, update會阻塞直到合并完成,直到超過了hbase.hstore.blockingWaitTime的值 預設: 7 hbase.hstore.blockingWaitTime hbase.hstore.blockingStoreFiles所限制的StoreFile數量會導緻update阻塞,這個時間是來限制阻塞時間的。當超過了這個時間,HRegion會停止阻塞update操作,不過合并還有沒有完成。預設為90s. 預設: 90000 hbase.hregion.majorcompaction 一個Region中的所有HStoreFile的major compactions的時間間隔。預設是1天。 設定為0就是禁用這個功能。

在region做put,delete等操作的過程中,會檢查size是否超過了memstore中的限制,如超過,則調用HRegion.requestFlush---> MemStoreFlusher.requestFlush請求flush,待flush線程處理。 那麼什麼時候做flush cache的調用哪?flush cache的調用,可以由HBaseAdmin管理指令發起(HBaseAdmin-->RegionServer-->HRegion.flushcache,調用StoreFlusher的相關接口進行操作);也可以由MemStoreFlusher線程run發起flushRegion的操作(在調用HRegion.flushcache前,先判斷目前的storeFiles數量是否太多了 >hbase.hstore.blockingStoreFiles,如果是,則進行split&compact操作;flushcache操作傳回是否需要compact的操作),在flush(該過程參見後面說明)完成後,會判斷是否需要split(IncreasingToUpperBoundRegionSplitPolicy)和compact,若需要,則由RegionServer的compactSplitThread線程進行處理(compactSplitThread調用Store.compact(CompactionRequest cr) 會阻塞store寫操作)。   flush的過程, 調用boolean HRegion.flushCache(),傳回是否flush的标記,是否compact                 ---internalFlushcache(final HLog wal, final long myseqid, MonitoredTask status)                    flush的memstore到disk上,參見後面StoreFlusher的介紹                    寫FLUSH COMPLETED 到log上,附帶memstore flush時的seq Num                    清除memstore中的資料 StoreFlusher,     prepare---建立snapshot,在建立過程中,需要阻塞write操作( HRegion.updatesLock.writeLock().lock();)

    flushCache---flush cache ,建立storeFile     commit---送出flush,把storeFile加入到Store中,并清除memstore snapshot(此時判斷是否需要進行compact操作)(調用Store.updateStorefiles,會對store的lock加寫鎖,這樣其他的寫資料的操作就會堵塞<讀鎖>)

MemStore,    snapshot,在做snapshot時候,會阻塞寫操作(lock.writeLock().lock())   add/delete/getNextRow(lock.readLock().lock()),在此類操作的同時,不可以做snapshot。

CompactSplitThread,開啟多線程服務進行compact,split操作,會阻塞寫操作 (compactSplitThread調用Store.compact(CompactionRequest cr) 會阻塞store寫操作)。 CompactionRequest,由該類做具體的compact的動作,作為一個線程的call執行。

3、Server端region的内部scan   先調用HRegionInterface打開一個server端scaner(public long openScanner(final byte [] regionName, final Scan scan)),傳回scaner的Id   然後根據scanerId,調用HRegionInterface的Result next(final long scannerId) 擷取結果   HRegionServer.next(scanId)                      region.getCoprocessorHost().preScannerNext(s,results, nbRows);                             RegionScannerImpl.next(List results)                                  region.getCoprocessorHost().postScannerNext(s,results, nbRows); InternalScan InternalScanner, 不同于用戶端的scaner,操作的是RowResults,server端操作的是HStoreKeys and byte[] -----RegionScanner,           -----RegionScannerImpl(KeyValueHeap storeHeap,Scan scan)---一個請求一個執行個體,                         在構造函數中添加所有的family的storeScaner到heap中                         StoreScanner scanner = store.getScanner(scan, entry.getValue());                         this.storeHeap = new KeyValueHeap(scanners, comparator);                                                         -----調用 next(List<KeyValue> outResults, int limit),                                    ----startRegionOperation();加對該region操作的讀鎖 (保證在該region操作期間,隻能加讀鎖,不可以加寫鎖,保證資料的一緻性)                                    ---MVCC                                   -----nextInternal(limit)                                               --疊代peekRow()                                                    ---KeyValueScanner.peek()                                                            ----KeyValueHeap中的peek()-->調用StoreScanner.peek()-->Store内部的Scanners<MemStoreScaner StoreFileScaners>.peek()                                                                                     //StoreScanner的構造函數中會調用getScannersNoCompaction,擷取的是store下的memstorescanner和HFileStoreScanner                                   ------- closeRegionOperation();釋放對該region的操作的讀鎖

HBase源碼分析之org.apache.hadoop.hbase.regionserver包

KeyValueScanner(接口類), KeyValueHeap,         KeyvalueScaner之間的heap(KeyValue)的合并,在region層次合并擁有的stores,在Store的層次合并memstore和其下的storeFiles StoreScanner,scan memstore(包括snapshot)和StoreFiles,如果設定了booleamfilter就采用booleamfilter MemStoreScaner,基于memstore的scaner,實作具體的KeyValue的peek操作 StoreFileScaner,基于storeFiles的scaner,實作具體的KeyValue的peek操作    storeFileScanner會利用StoreFile.Reader(HFileReaderV2)從StoreFiles讀取keyValue    scan和memstore以及blockcache的結合    BlockCache的使用,是在HFile的級别上的,當讀取HFile時,會先檢查Block Cache部分,如果設定用cache block的話,    就直接從cache中讀取該block,否則從HFile中讀取,參見HFileReaderV2.readBlock部分。

4、Leases, 在對行操作的時候,按照正常的流程進行,行操作前獲得鎖,行操作完成後釋放鎖  同時Lease線程自動的按照 leaseCheckFrequency的頻次從queue中擷取lease進行檢查,是否超過leasePeriod,超過的話,就釋放lease,并調用LeaseListener實作lease過期的回調動作,如釋放鎖等。 LeaseListener,       RowLockListener,       ScannerListener, 以下是HRegion中擷取行鎖和釋放行鎖的相關代碼,         ---internalObtainRowLock(CountDownLatch(1),await)              while (true) {         CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);         if (existingLatch == null) {           break;         } else {           // row already locked           if (!waitForLock) {             return null;           }           try {             if (!existingLatch.await(this.rowLockWaitDuration,                             TimeUnit.MILLISECONDS)) {                 return null;             }           } catch (InterruptedException ie) {

            // Empty           }         }       }         ----releaseRowLock(由過期後的RowLockListener進行調用,countDown)             if (!existingLatch.await(this.rowLockWaitDuration,                             TimeUnit.MILLISECONDS)) {                 return null;   LruHashMap, LruBlockCache,   5、WAL

在RegionServer啟動的過程中,初始化WAL log,比如server1在/hbase/.logs/server1建立log HLog,       WAL的具體實作,存儲有對HStore變更的所有記錄,做定期的roll,每一個server有一個HLog,包含多個file,          當資料flush到store file後,小于該store file中的lsn(log sequence number)的log都可以廢除了。        在資料buffer flush過程中,不允許對日志做roll操作,但允許append。         在打開一個region的時候,需要保證目前Hlog的seqNum比該region中store的最大的seq num要大,否則把Hlog 的seqNum設為該region的 seqNum 重點關注以下幾個操作,     a、append操作(HRegionInfo,HLogKey,WALEdit),往sync線程中的buffer(LogSyncer中的pendingWrites)中寫,用于批量往hdfs中寫的目的。      b、sync操作,可以由單獨的線程LogSyncer做定時的sync,也可以在調用append中如果設定了需要sync的标記,則做sync(flush)操作。            線上程LogSyncer的實作中,先調用HDFS的輸出流SequenceFileLogWriter做append操作(hlogFlush),再調用SequenceFileLogWriter.sync,該sync操作是SequenceFile.Write.syncFs或者FSDataOutputStream.flush操作。          在調用append的過程指定dosync的實作中,若region對應的是META表則做flush操作,若該表沒有設定DEFERRED_LOG_FLUSH,則也需要做flush操作。            DEFERRED_LOG_FLUSH是在table 定義的時候指定的,預設為false。若為true,則無需确認WAL是否sync(寫入磁盤)即可傳回。      調用時機,HRegion的internalDelete和internalPut時候調用(doSync預設是true)。  對于在寫資料的時候,是否寫WAL log,可以在Put或者Delete用戶端API中設定,如                     Put.setWriteToWAL(false);                     Delete.setWriteToWAL(false); LogRoller,對HLog做定期的rolling    MultiVersionConsistencyControl,

  6、HRegionServer HRegionServer, 構造函數: 建立RPC Server 線程run:  a、Do pre-registration initializations;  zookeeper相關      打開到zookeeper連接配接,      阻塞式的跟蹤zookeeper上的master znode節點是否有效(該節點由master設定,代表master已經起來了),      阻塞式的跟蹤zookeeper上的cluster  znode節點是否有效,判斷叢集是否起來(該節點由master設定),      開啟了跟蹤系統表ROOT、META表的region的在zookeeper上unassign節點的狀态 開啟MemStoreFlusher,CompactSplitThread,CompactionChecker,Leaser,HRegionThriftServer線程服務 b、向HMaster report,zookeeper rs上注冊臨時server節點,初始化rootDir,HLOG c、注冊MBean d、循環進行,do Metircs, report to master目前的serverload 直到叢集關閉

  7、HRegion,    HRegion中的鎖包括: a、Region級别的close鎖,   // Used to guard closes   final ReentrantReadWriteLock lock =new ReentrantReadWriteLock();                 ------保證在其他操作的時候(比如compact(CompactionRequest)、flushcache()、                          其他對region的scan操作以及寫操作調用startRegionOperation() 和對應的closeRegionOperation()),阻塞close和split region的操作。 b、HRegion級别的更新鎖,   // Stop updates lock   private final ReentrantReadWriteLock updatesLock =new ReentrantReadWriteLock();                 -----在Put,Append,Delete,Increment的時加讀鎖                      在internalFlushCache的時加寫鎖,保證在寫操作的時,阻塞fluscache操作。 c、行級别的鎖,   private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =     new ConcurrentHashMap<HashedBytes, CountDownLatch>();                  -------<rowkey,latch>   private final ConcurrentHashMap<Integer, HashedBytes> lockIds =     new ConcurrentHashMap<Integer, HashedBytes>();               ------------<lockId, rowKey>       保證對隻有一個線程對同一個行操作。當超過租借lease期限,也會自動釋放該行鎖

    Store, StoreFile, StoreFileScanner, StoreFlusher,       未完,待補充和完善。。。。

繼續閱讀