天天看点

HBase源码分析之org.apache.hadoop.hbase.regionserver包

1、Splitpolicy是Region在满足什么条件下,需要split操作,0.94版本中默认是IncreasingToUpperBoundRegionSplitPolicy 1)ConstantSizeRegionSplitPolicy, 2)KeyPrefixRegionSplitPolicy, 3)RegionSplitPolicy,可以在配置文件中指定policy的实现hbase.regionserver.region.split.policy,默认是IncreasingToUpperBoundRegionSplitPolicy 4)IncreasingToUpperBoundRegionSplitPolicy,  对于split,并不是设置了hbase.hregion.max.filesize(默认10G)为很大就保证不split了,需要有以下的算法,参见 IncreasingToUpperBoundRegionSplitPolicy是0.94.0默认region split策略 这里的split有一个判断条件,先计算这tableRegionsCount(regionserver上的这个table的online的region个数), 然后循环计算此region的所有store是否太大,这是通过getSizeToCheck方法计算出一个size,若当前的store总大小大于这个值,则表示此region需要split. getSizeToCheck的计算方法首先判断tableRegionsCount是否等于0,若是则返回hbase.hregion.max.filesize ,若不是,则计算Math.min(getDesiredMaxFileSize(),

this.flushSize * (tableRegionsCount * tableRegionsCount)。

2、memstore flush,compact,split

FlushRequester, MemStoreFlusher,实现FlushRequester接口,基于线程服务,在RegionServer启动的时候初始化,主要做flush memstore中的cache。            这里有一些参数需要关注一下,  

hbase.regionserver.global.memstore.upperLimit hbase.regionserver.global.memstore.lowerLimit
           *. 如果全局memstore size超过了HighWaterMark(0.4), Block, 强制逐个flush region并且等待, 直到memstore size小于  HighWaterMark(0.4)
*. 如果全局memstore size在HighWaterMark(0.4)和LowWaterMark(0.35)之间,提交一个flush request给flush thread. 不阻塞.

          hbase.hregion.memstore.flush.size 针对Region中的所有的memstore的内存之和memstoreSize的,当 memstoreSize大于该值时,就请求flush操作。

          hbase.hregion.memstore.block.multiplier(2),memstore默认大小(flushsize)的2倍(默认值)-----blockingMemStoreFile(region server在写入时会检查每个region对应的memstore的总大小是否超过了memstore默认大小的2倍(hbase.hregion.memstore.block.multiplier决定),如果超过了则锁住memstore不让新写请求进来并触发flush,避免产生OOM)

hbase.hstore.compaction.ratio(.12f),当一个文件<=sum(比它小的文件) * hbase.hstore.compaction.ratio 将被选择 hbase.hstore.compaction.min(hbase.hstore.compactionThreshold,老的版本) 一次压缩前,至少有多少个storeFile被选择 默认: 3 hbase.hstore.compaction.max 每个“小”合并的HStoreFiles最大数量。默认: 10 hbase.hstore.blockingStoreFiles 当一个HStore含有多于这个值的HStoreFiles(每一个memstore flush产生一个HStoreFile)的时候,会执行一个合并操作, update会阻塞直到合并完成,直到超过了hbase.hstore.blockingWaitTime的值 默认: 7 hbase.hstore.blockingWaitTime hbase.hstore.blockingStoreFiles所限制的StoreFile数量会导致update阻塞,这个时间是来限制阻塞时间的。当超过了这个时间,HRegion会停止阻塞update操作,不过合并还有没有完成。默认为90s. 默认: 90000 hbase.hregion.majorcompaction 一个Region中的所有HStoreFile的major compactions的时间间隔。默认是1天。 设置为0就是禁用这个功能。

在region做put,delete等操作的过程中,会检查size是否超过了memstore中的限制,如超过,则调用HRegion.requestFlush---> MemStoreFlusher.requestFlush请求flush,待flush线程处理。 那么什么时候做flush cache的调用哪?flush cache的调用,可以由HBaseAdmin管理命令发起(HBaseAdmin-->RegionServer-->HRegion.flushcache,调用StoreFlusher的相关接口进行操作);也可以由MemStoreFlusher线程run发起flushRegion的操作(在调用HRegion.flushcache前,先判断当前的storeFiles数量是否太多了 >hbase.hstore.blockingStoreFiles,如果是,则进行split&compact操作;flushcache操作返回是否需要compact的操作),在flush(该过程参见后面说明)完成后,会判断是否需要split(IncreasingToUpperBoundRegionSplitPolicy)和compact,若需要,则由RegionServer的compactSplitThread线程进行处理(compactSplitThread调用Store.compact(CompactionRequest cr) 会阻塞store写操作)。   flush的过程, 调用boolean HRegion.flushCache(),返回是否flush的标记,是否compact                 ---internalFlushcache(final HLog wal, final long myseqid, MonitoredTask status)                    flush的memstore到disk上,参见后面StoreFlusher的介绍                    写FLUSH COMPLETED 到log上,附带memstore flush时的seq Num                    清除memstore中的数据 StoreFlusher,     prepare---创建snapshot,在创建过程中,需要阻塞write操作( HRegion.updatesLock.writeLock().lock();)

    flushCache---flush cache ,创建storeFile     commit---提交flush,把storeFile加入到Store中,并清除memstore snapshot(此时判断是否需要进行compact操作)(调用Store.updateStorefiles,会对store的lock加写锁,这样其他的写数据的操作就会堵塞<读锁>)

MemStore,    snapshot,在做snapshot时候,会阻塞写操作(lock.writeLock().lock())   add/delete/getNextRow(lock.readLock().lock()),在此类操作的同时,不可以做snapshot。

CompactSplitThread,开启多线程服务进行compact,split操作,会阻塞写操作 (compactSplitThread调用Store.compact(CompactionRequest cr) 会阻塞store写操作)。 CompactionRequest,由该类做具体的compact的动作,作为一个线程的call执行。

3、Server端region的内部scan   先调用HRegionInterface打开一个server端scaner(public long openScanner(final byte [] regionName, final Scan scan)),返回scaner的Id   然后根据scanerId,调用HRegionInterface的Result next(final long scannerId) 获取结果   HRegionServer.next(scanId)                      region.getCoprocessorHost().preScannerNext(s,results, nbRows);                             RegionScannerImpl.next(List results)                                  region.getCoprocessorHost().postScannerNext(s,results, nbRows); InternalScan InternalScanner, 不同于客户端的scaner,操作的是RowResults,server端操作的是HStoreKeys and byte[] -----RegionScanner,           -----RegionScannerImpl(KeyValueHeap storeHeap,Scan scan)---一个请求一个实例,                         在构造函数中添加所有的family的storeScaner到heap中                         StoreScanner scanner = store.getScanner(scan, entry.getValue());                         this.storeHeap = new KeyValueHeap(scanners, comparator);                                                         -----调用 next(List<KeyValue> outResults, int limit),                                    ----startRegionOperation();加对该region操作的读锁 (保证在该region操作期间,只能加读锁,不可以加写锁,保证数据的一致性)                                    ---MVCC                                   -----nextInternal(limit)                                               --迭代peekRow()                                                    ---KeyValueScanner.peek()                                                            ----KeyValueHeap中的peek()-->调用StoreScanner.peek()-->Store内部的Scanners<MemStoreScaner StoreFileScaners>.peek()                                                                                     //StoreScanner的构造函数中会调用getScannersNoCompaction,获取的是store下的memstorescanner和HFileStoreScanner                                   ------- closeRegionOperation();释放对该region的操作的读锁

HBase源码分析之org.apache.hadoop.hbase.regionserver包

KeyValueScanner(接口类), KeyValueHeap,         KeyvalueScaner之间的heap(KeyValue)的合并,在region层次合并拥有的stores,在Store的层次合并memstore和其下的storeFiles StoreScanner,scan memstore(包括snapshot)和StoreFiles,如果设置了booleamfilter就采用booleamfilter MemStoreScaner,基于memstore的scaner,实现具体的KeyValue的peek操作 StoreFileScaner,基于storeFiles的scaner,实现具体的KeyValue的peek操作    storeFileScanner会利用StoreFile.Reader(HFileReaderV2)从StoreFiles读取keyValue    scan和memstore以及blockcache的结合    BlockCache的使用,是在HFile的级别上的,当读取HFile时,会先检查Block Cache部分,如果设置用cache block的话,    就直接从cache中读取该block,否则从HFile中读取,参见HFileReaderV2.readBlock部分。

4、Leases, 在对行操作的时候,按照正常的流程进行,行操作前获得锁,行操作完成后释放锁  同时Lease线程自动的按照 leaseCheckFrequency的频次从queue中获取lease进行检查,是否超过leasePeriod,超过的话,就释放lease,并调用LeaseListener实现lease过期的回调动作,如释放锁等。 LeaseListener,       RowLockListener,       ScannerListener, 以下是HRegion中获取行锁和释放行锁的相关代码,         ---internalObtainRowLock(CountDownLatch(1),await)              while (true) {         CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);         if (existingLatch == null) {           break;         } else {           // row already locked           if (!waitForLock) {             return null;           }           try {             if (!existingLatch.await(this.rowLockWaitDuration,                             TimeUnit.MILLISECONDS)) {                 return null;             }           } catch (InterruptedException ie) {

            // Empty           }         }       }         ----releaseRowLock(由过期后的RowLockListener进行调用,countDown)             if (!existingLatch.await(this.rowLockWaitDuration,                             TimeUnit.MILLISECONDS)) {                 return null;   LruHashMap, LruBlockCache,   5、WAL

在RegionServer启动的过程中,初始化WAL log,比如server1在/hbase/.logs/server1建立log HLog,       WAL的具体实现,存储有对HStore变更的所有记录,做定期的roll,每一个server有一个HLog,包含多个file,          当数据flush到store file后,小于该store file中的lsn(log sequence number)的log都可以废除了。        在数据buffer flush过程中,不允许对日志做roll操作,但允许append。         在打开一个region的时候,需要保证当前Hlog的seqNum比该region中store的最大的seq num要大,否则把Hlog 的seqNum设为该region的 seqNum 重点关注以下几个操作,     a、append操作(HRegionInfo,HLogKey,WALEdit),往sync线程中的buffer(LogSyncer中的pendingWrites)中写,用于批量往hdfs中写的目的。      b、sync操作,可以由单独的线程LogSyncer做定时的sync,也可以在调用append中如果设置了需要sync的标记,则做sync(flush)操作。            在线程LogSyncer的实现中,先调用HDFS的输出流SequenceFileLogWriter做append操作(hlogFlush),再调用SequenceFileLogWriter.sync,该sync操作是SequenceFile.Write.syncFs或者FSDataOutputStream.flush操作。          在调用append的过程指定dosync的实现中,若region对应的是META表则做flush操作,若该表没有设置DEFERRED_LOG_FLUSH,则也需要做flush操作。            DEFERRED_LOG_FLUSH是在table 定义的时候指定的,默认为false。若为true,则无需确认WAL是否sync(写入磁盘)即可返回。      调用时机,HRegion的internalDelete和internalPut时候调用(doSync默认是true)。  对于在写数据的时候,是否写WAL log,可以在Put或者Delete客户端API中设置,如                     Put.setWriteToWAL(false);                     Delete.setWriteToWAL(false); LogRoller,对HLog做定期的rolling    MultiVersionConsistencyControl,

  6、HRegionServer HRegionServer, 构造函数: 创建RPC Server 线程run:  a、Do pre-registration initializations;  zookeeper相关      打开到zookeeper连接,      阻塞式的跟踪zookeeper上的master znode节点是否有效(该节点由master设置,代表master已经起来了),      阻塞式的跟踪zookeeper上的cluster  znode节点是否有效,判断集群是否起来(该节点由master设置),      开启了跟踪系统表ROOT、META表的region的在zookeeper上unassign节点的状态 开启MemStoreFlusher,CompactSplitThread,CompactionChecker,Leaser,HRegionThriftServer线程服务 b、向HMaster report,zookeeper rs上注册临时server节点,初始化rootDir,HLOG c、注册MBean d、循环进行,do Metircs, report to master当前的serverload 直到集群关闭

  7、HRegion,    HRegion中的锁包括: a、Region级别的close锁,   // Used to guard closes   final ReentrantReadWriteLock lock =new ReentrantReadWriteLock();                 ------保证在其他操作的时候(比如compact(CompactionRequest)、flushcache()、                          其他对region的scan操作以及写操作调用startRegionOperation() 和对应的closeRegionOperation()),阻塞close和split region的操作。 b、HRegion级别的更新锁,   // Stop updates lock   private final ReentrantReadWriteLock updatesLock =new ReentrantReadWriteLock();                 -----在Put,Append,Delete,Increment的时加读锁                      在internalFlushCache的时加写锁,保证在写操作的时,阻塞fluscache操作。 c、行级别的锁,   private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =     new ConcurrentHashMap<HashedBytes, CountDownLatch>();                  -------<rowkey,latch>   private final ConcurrentHashMap<Integer, HashedBytes> lockIds =     new ConcurrentHashMap<Integer, HashedBytes>();               ------------<lockId, rowKey>       保证对只有一个线程对同一个行操作。当超过租借lease期限,也会自动释放该行锁

    Store, StoreFile, StoreFileScanner, StoreFlusher,       未完,待补充和完善。。。。

继续阅读