1、Splitpolicy是Region在满足什么条件下,需要split操作,0.94版本中默认是IncreasingToUpperBoundRegionSplitPolicy 1)ConstantSizeRegionSplitPolicy, 2)KeyPrefixRegionSplitPolicy, 3)RegionSplitPolicy,可以在配置文件中指定policy的实现hbase.regionserver.region.split.policy,默认是IncreasingToUpperBoundRegionSplitPolicy 4)IncreasingToUpperBoundRegionSplitPolicy, 对于split,并不是设置了hbase.hregion.max.filesize(默认10G)为很大就保证不split了,需要有以下的算法,参见 IncreasingToUpperBoundRegionSplitPolicy是0.94.0默认region split策略 这里的split有一个判断条件,先计算这tableRegionsCount(regionserver上的这个table的online的region个数), 然后循环计算此region的所有store是否太大,这是通过getSizeToCheck方法计算出一个size,若当前的store总大小大于这个值,则表示此region需要split. getSizeToCheck的计算方法首先判断tableRegionsCount是否等于0,若是则返回hbase.hregion.max.filesize ,若不是,则计算Math.min(getDesiredMaxFileSize(),
this.flushSize * (tableRegionsCount * tableRegionsCount)。
2、memstore flush,compact,split
FlushRequester, MemStoreFlusher,实现FlushRequester接口,基于线程服务,在RegionServer启动的时候初始化,主要做flush memstore中的cache。 这里有一些参数需要关注一下,
hbase.regionserver.global.memstore.upperLimit hbase.regionserver.global.memstore.lowerLimit
*. 如果全局memstore size超过了HighWaterMark(0.4), Block, 强制逐个flush region并且等待, 直到memstore size小于 HighWaterMark(0.4)
*. 如果全局memstore size在HighWaterMark(0.4)和LowWaterMark(0.35)之间,提交一个flush request给flush thread. 不阻塞.
hbase.hregion.memstore.flush.size 针对Region中的所有的memstore的内存之和memstoreSize的,当 memstoreSize大于该值时,就请求flush操作。
hbase.hregion.memstore.block.multiplier(2),memstore默认大小(flushsize)的2倍(默认值)-----blockingMemStoreFile(region server在写入时会检查每个region对应的memstore的总大小是否超过了memstore默认大小的2倍(hbase.hregion.memstore.block.multiplier决定),如果超过了则锁住memstore不让新写请求进来并触发flush,避免产生OOM)
hbase.hstore.compaction.ratio(.12f),当一个文件<=sum(比它小的文件) * hbase.hstore.compaction.ratio 将被选择 hbase.hstore.compaction.min(hbase.hstore.compactionThreshold,老的版本) 一次压缩前,至少有多少个storeFile被选择 默认: 3 hbase.hstore.compaction.max 每个“小”合并的HStoreFiles最大数量。默认: 10 hbase.hstore.blockingStoreFiles 当一个HStore含有多于这个值的HStoreFiles(每一个memstore flush产生一个HStoreFile)的时候,会执行一个合并操作, update会阻塞直到合并完成,直到超过了hbase.hstore.blockingWaitTime的值 默认: 7 hbase.hstore.blockingWaitTime hbase.hstore.blockingStoreFiles所限制的StoreFile数量会导致update阻塞,这个时间是来限制阻塞时间的。当超过了这个时间,HRegion会停止阻塞update操作,不过合并还有没有完成。默认为90s. 默认: 90000 hbase.hregion.majorcompaction 一个Region中的所有HStoreFile的major compactions的时间间隔。默认是1天。 设置为0就是禁用这个功能。
在region做put,delete等操作的过程中,会检查size是否超过了memstore中的限制,如超过,则调用HRegion.requestFlush---> MemStoreFlusher.requestFlush请求flush,待flush线程处理。 那么什么时候做flush cache的调用哪?flush cache的调用,可以由HBaseAdmin管理命令发起(HBaseAdmin-->RegionServer-->HRegion.flushcache,调用StoreFlusher的相关接口进行操作);也可以由MemStoreFlusher线程run发起flushRegion的操作(在调用HRegion.flushcache前,先判断当前的storeFiles数量是否太多了 >hbase.hstore.blockingStoreFiles,如果是,则进行split&compact操作;flushcache操作返回是否需要compact的操作),在flush(该过程参见后面说明)完成后,会判断是否需要split(IncreasingToUpperBoundRegionSplitPolicy)和compact,若需要,则由RegionServer的compactSplitThread线程进行处理(compactSplitThread调用Store.compact(CompactionRequest cr) 会阻塞store写操作)。 flush的过程, 调用boolean HRegion.flushCache(),返回是否flush的标记,是否compact ---internalFlushcache(final HLog wal, final long myseqid, MonitoredTask status) flush的memstore到disk上,参见后面StoreFlusher的介绍 写FLUSH COMPLETED 到log上,附带memstore flush时的seq Num 清除memstore中的数据 StoreFlusher, prepare---创建snapshot,在创建过程中,需要阻塞write操作( HRegion.updatesLock.writeLock().lock();)
flushCache---flush cache ,创建storeFile commit---提交flush,把storeFile加入到Store中,并清除memstore snapshot(此时判断是否需要进行compact操作)(调用Store.updateStorefiles,会对store的lock加写锁,这样其他的写数据的操作就会堵塞<读锁>)
MemStore, snapshot,在做snapshot时候,会阻塞写操作(lock.writeLock().lock()) add/delete/getNextRow(lock.readLock().lock()),在此类操作的同时,不可以做snapshot。
CompactSplitThread,开启多线程服务进行compact,split操作,会阻塞写操作 (compactSplitThread调用Store.compact(CompactionRequest cr) 会阻塞store写操作)。 CompactionRequest,由该类做具体的compact的动作,作为一个线程的call执行。
3、Server端region的内部scan 先调用HRegionInterface打开一个server端scaner(public long openScanner(final byte [] regionName, final Scan scan)),返回scaner的Id 然后根据scanerId,调用HRegionInterface的Result next(final long scannerId) 获取结果 HRegionServer.next(scanId) region.getCoprocessorHost().preScannerNext(s,results, nbRows); RegionScannerImpl.next(List results) region.getCoprocessorHost().postScannerNext(s,results, nbRows); InternalScan InternalScanner, 不同于客户端的scaner,操作的是RowResults,server端操作的是HStoreKeys and byte[] -----RegionScanner, -----RegionScannerImpl(KeyValueHeap storeHeap,Scan scan)---一个请求一个实例, 在构造函数中添加所有的family的storeScaner到heap中 StoreScanner scanner = store.getScanner(scan, entry.getValue()); this.storeHeap = new KeyValueHeap(scanners, comparator); -----调用 next(List<KeyValue> outResults, int limit), ----startRegionOperation();加对该region操作的读锁 (保证在该region操作期间,只能加读锁,不可以加写锁,保证数据的一致性) ---MVCC -----nextInternal(limit) --迭代peekRow() ---KeyValueScanner.peek() ----KeyValueHeap中的peek()-->调用StoreScanner.peek()-->Store内部的Scanners<MemStoreScaner StoreFileScaners>.peek() //StoreScanner的构造函数中会调用getScannersNoCompaction,获取的是store下的memstorescanner和HFileStoreScanner ------- closeRegionOperation();释放对该region的操作的读锁

KeyValueScanner(接口类), KeyValueHeap, KeyvalueScaner之间的heap(KeyValue)的合并,在region层次合并拥有的stores,在Store的层次合并memstore和其下的storeFiles StoreScanner,scan memstore(包括snapshot)和StoreFiles,如果设置了booleamfilter就采用booleamfilter MemStoreScaner,基于memstore的scaner,实现具体的KeyValue的peek操作 StoreFileScaner,基于storeFiles的scaner,实现具体的KeyValue的peek操作 storeFileScanner会利用StoreFile.Reader(HFileReaderV2)从StoreFiles读取keyValue scan和memstore以及blockcache的结合 BlockCache的使用,是在HFile的级别上的,当读取HFile时,会先检查Block Cache部分,如果设置用cache block的话, 就直接从cache中读取该block,否则从HFile中读取,参见HFileReaderV2.readBlock部分。
4、Leases, 在对行操作的时候,按照正常的流程进行,行操作前获得锁,行操作完成后释放锁 同时Lease线程自动的按照 leaseCheckFrequency的频次从queue中获取lease进行检查,是否超过leasePeriod,超过的话,就释放lease,并调用LeaseListener实现lease过期的回调动作,如释放锁等。 LeaseListener, RowLockListener, ScannerListener, 以下是HRegion中获取行锁和释放行锁的相关代码, ---internalObtainRowLock(CountDownLatch(1),await) while (true) { CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); if (existingLatch == null) { break; } else { // row already locked if (!waitForLock) { return null; } try { if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { return null; } } catch (InterruptedException ie) {
// Empty } } } ----releaseRowLock(由过期后的RowLockListener进行调用,countDown) if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { return null; LruHashMap, LruBlockCache, 5、WAL
在RegionServer启动的过程中,初始化WAL log,比如server1在/hbase/.logs/server1建立log HLog, WAL的具体实现,存储有对HStore变更的所有记录,做定期的roll,每一个server有一个HLog,包含多个file, 当数据flush到store file后,小于该store file中的lsn(log sequence number)的log都可以废除了。 在数据buffer flush过程中,不允许对日志做roll操作,但允许append。 在打开一个region的时候,需要保证当前Hlog的seqNum比该region中store的最大的seq num要大,否则把Hlog 的seqNum设为该region的 seqNum 重点关注以下几个操作, a、append操作(HRegionInfo,HLogKey,WALEdit),往sync线程中的buffer(LogSyncer中的pendingWrites)中写,用于批量往hdfs中写的目的。 b、sync操作,可以由单独的线程LogSyncer做定时的sync,也可以在调用append中如果设置了需要sync的标记,则做sync(flush)操作。 在线程LogSyncer的实现中,先调用HDFS的输出流SequenceFileLogWriter做append操作(hlogFlush),再调用SequenceFileLogWriter.sync,该sync操作是SequenceFile.Write.syncFs或者FSDataOutputStream.flush操作。 在调用append的过程指定dosync的实现中,若region对应的是META表则做flush操作,若该表没有设置DEFERRED_LOG_FLUSH,则也需要做flush操作。 DEFERRED_LOG_FLUSH是在table 定义的时候指定的,默认为false。若为true,则无需确认WAL是否sync(写入磁盘)即可返回。 调用时机,HRegion的internalDelete和internalPut时候调用(doSync默认是true)。 对于在写数据的时候,是否写WAL log,可以在Put或者Delete客户端API中设置,如 Put.setWriteToWAL(false); Delete.setWriteToWAL(false); LogRoller,对HLog做定期的rolling MultiVersionConsistencyControl,
6、HRegionServer HRegionServer, 构造函数: 创建RPC Server 线程run: a、Do pre-registration initializations; zookeeper相关 打开到zookeeper连接, 阻塞式的跟踪zookeeper上的master znode节点是否有效(该节点由master设置,代表master已经起来了), 阻塞式的跟踪zookeeper上的cluster znode节点是否有效,判断集群是否起来(该节点由master设置), 开启了跟踪系统表ROOT、META表的region的在zookeeper上unassign节点的状态 开启MemStoreFlusher,CompactSplitThread,CompactionChecker,Leaser,HRegionThriftServer线程服务 b、向HMaster report,zookeeper rs上注册临时server节点,初始化rootDir,HLOG c、注册MBean d、循环进行,do Metircs, report to master当前的serverload 直到集群关闭
7、HRegion, HRegion中的锁包括: a、Region级别的close锁, // Used to guard closes final ReentrantReadWriteLock lock =new ReentrantReadWriteLock(); ------保证在其他操作的时候(比如compact(CompactionRequest)、flushcache()、 其他对region的scan操作以及写操作调用startRegionOperation() 和对应的closeRegionOperation()),阻塞close和split region的操作。 b、HRegion级别的更新锁, // Stop updates lock private final ReentrantReadWriteLock updatesLock =new ReentrantReadWriteLock(); -----在Put,Append,Delete,Increment的时加读锁 在internalFlushCache的时加写锁,保证在写操作的时,阻塞fluscache操作。 c、行级别的锁, private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows = new ConcurrentHashMap<HashedBytes, CountDownLatch>(); -------<rowkey,latch> private final ConcurrentHashMap<Integer, HashedBytes> lockIds = new ConcurrentHashMap<Integer, HashedBytes>(); ------------<lockId, rowKey> 保证对只有一个线程对同一个行操作。当超过租借lease期限,也会自动释放该行锁
Store, StoreFile, StoreFileScanner, StoreFlusher, 未完,待补充和完善。。。。