天天看点

HMBASE的REGION分配HMBASE的REGION分配

HMBASE的REGION分配

Region assign分为meta的分配与userregion assign,同时包含hbase启动时与rs下线,因此从4个方面来说明regionassign

启动时的metaregion assign

针对master启动时的补充说明:

Hmaster.run.finishInitialization方法中:

得到WALs目录下所有子目录,如果WALs目录下的日志目录的名称为ServerName-splitting,去掉-splitting部分

log目录格式:hdfs://<namenode>/hbase/WALs/<servername>-splitting/...

orhdfs://<namenode>/hbase/WALs/<servername>/...

取出日志目录的ServerName,检查现在master的ServerManager.onlineServers中不包含的ServerName,

并返回这些个ServerName列表

通过hbase.hlog.split.skip.errors配置如果在加载log目录时出现错误(IOException)是否跳过重试,

默认为false,也就是需要重试

通过hbase.hlog.split.failure.retry.interval配置重试的间隔,默认为30*1000ms

主要作用metaregion log split

Set<ServerName>previouslyFailedServers= this.fileSystemManager

.getFailedServersFromLogFolders();

//remove stale recovering regions from previous run

通过SplitLogManager在zk中的路径下删除过期的replay日志路径

如果分布式日志重播hbase.master.distributed.log.replay配置为false,

直接删除recovering-regions路径(zookeeper.znode.recovering.regions配置)下的内容,

并结束此方法调用

如果分布式日志重播hbase.master.distributed.log.replay配置为true,

得到zookeeper.znode.splitlog配置的路径下的所有子路径(存放的是split的task名称),默认为splitWAL

每一个splittask的任务名称的最后一个”/”线后面部分是WALs的ServerName的URLDecoder.encode值

读取并检查子路径下的内容,转换成SplitLogTask任务检查是否ZooKeeperProtos.SplitLogTask.State.DONE状态,

如果不是,把此splittask对应的ServerName添加到方法开始时定义的列表中

读取recovering-regions路径(zookeeper.znode.recovering.regions配置)下的所有子路径,

每一个子路径通过region的encodename命名,同时检查每一个region下是否包含没重播日志的server子路径

检查没重播日志的server是否在刚才记录的servers

(调用此方法传入的servers(未启动)与splittask非DONE状态的servers)中,

如果不包含,删除此region的splitlog路径,并同时删除下面的子路径

this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);

//log splitting for hbase:meta server

通过metaRegionTracker在配置zookeeper.znode.metaserver的路径下

(默认meta-region-server)得到上一次metaregion运行的servername,

如果servername不为空(也就是cluster不是第一次运行),同时未启动的server列表中包含上一次运动的server

对metaregion进行splitlog操作

splitlog先检查分布式日志重播hbase.master.distributed.log.replay配置,

如果配置为true,在recovering-regions路径(zookeeper.znode.recovering.regions配置)

并在路径下创建此metaregion的encodename的子路径,

同时在region子路径下把上一次运行metaregion的server也创建为子路径

示例:1->/hbase/recovering-regions/regionname,2->/hbase/recovering-regions/regionname/servername

如果配置为false,首先得到/hbase/WALs/servername路径重命名为/hbase/WALs/servername-splitting

在zookeeper.znode.splitlog配置的路径下,默认为splitWAL,生成servername-splitting子路径

并在生成的路径下写入生成的SplitLogTask数据,

在SplitLogManager.tasks中添加到一个Task,key为splitlog的path,value为此task,

同时在SplitLogManager.deadWorkers中添加此server(要做splitlog),等待task执行完成,

针对split的具体分析请点击查看hbase的replay分析

ServerName oldMetaServerLocation= this.catalogTracker.getMetaLocation();

if(oldMetaServerLocation!= null&& previouslyFailedServers.contains(oldMetaServerLocation)){

splitMetaLogBeforeAssignment(oldMetaServerLocation);

//Note: we can't remove oldMetaServerLocation frompreviousFailedServers list because it

//may also host user regions

}

得到在recovering-regions路径(zookeeper.znode.recovering.regions配置)下的metaregion的所有server

这些server表示需要日志重播,针对metaregion的,主要作用在对meta的logsplit

Set<ServerName>previouslyFailedMetaRSs= getPreviouselyFailedMetaServersFromZK();

this.initializationBeforeMetaAssignment= true;

//initializeload balancer

初始化loadbalancer

this.balancer.setClusterStatus(getClusterStatus());

this.balancer.setMasterServices(this);

this.balancer.initialize();

//Make sure meta assigned before proceeding.

status.setStatus("AssigningMeta Region");

执行meta的assign操作

assignMeta(status);

Hmaster启动时执行metaassign

启动时hmaster.run.finishInitialization调用assignMeta方法

voidassignMeta(MonitoredTaskstatus)

throwsInterruptedException, IOException, KeeperException {

//Work on meta region

通过hbase.catalog.verification.timeout配置超时时间,默认为1000ms

intassigned =0;

longtimeout =this.conf.getLong("hbase.catalog.verification.timeout",1000);

status.setStatus("Assigninghbase:meta region");

ServerNamelogReplayFailedMetaServer= null;

得到AssignmentManager中生成的RegionStates实例,此实例中保存所有region的相关状态

RegionStates regionStates= assignmentManager.getRegionStates();

添加metaregion到regionStates中,设置metaregion的状态为offline或者split(如果需要split)

regionStates.createRegionState(HRegionInfo.FIRST_META_REGIONINFO);

第一步:

执行processRegionInTransition

通过zookeeper.znode.unassigned配置的路径,默认为region-in-transition

检查region是否在此路径下存在子路径,不存在表示region不需要事物处理,直接返回方法调用返回false,

否则执行如下流程:

此时表示/hbase/region-in-transition/region-name有值,取出路径下的值,转换成RegionTransition对象

检查regionStates中的regionsInTransition列表中是否已经包含此region,

如果包含表示region已经在做transition,直接返回true,否则执行如下流程

a1.检查ServerManager.onlineServers如果不包含此RegionTransition对象所在的server时,

a2.接下来检查region在RegionStates的regionsInTransition中是否不包含,并在regionAssignments中包含此region

a3.更新region的状态为offline,从regionStates中得到region原来的RegionState与ServerName

并修改RegionState状态为offline添加到RegionStates.regionsInTransition与regionStates中

a3.1从regionsInTransition中移出此region

a3.2从regionAssignments中移出此region,此处返回为region所有的server

a3.3检查serverHoldings的server所分配的所有region中是否包含a3.2中移出此region所在的server

a3.4从serverHoldings得到的所有的此server对应的region中移出此region

a3.5检查此server对应的region是否为空(server中不在包含分配的region),在serverHoldings移出此server

b1.重新把region/RegionTransition对应的Server的RegionState状态设置为offline

b2.如果region是metaregion,把server设置到meta-region-server路径的值中

b3.如果region不是meta,把region与server更新到RegionStates.lastAssignments中

添加到server为region的最后一次分配server

b4.如果server在ServerManager中还不在isServerDead中存在,

通过ServerManager.expireServer设置此server下线

c.返回false.

根据RegionTransition实例的EventType操作分配流程,并返回true给方法调用:

通过AssignmentManager.assign去创建/hbase/region-in-transition/region-name

通过RegionPlan来实现REGION分配的RS选择,在ASSIGN时,通过balancer来控制选择那一个RS

调用RS的openRegion方法,执行region分配,

在master这端通过AssignmentManager.nodeDataChanged来监听rs对region的asign状态修改

RS实现AdminProtos.AdminService.BlockingInterface接口

待分析

第二步:

检查processRegionInTransition方法返回结果,如果是false,不做处理,直接返回false.

否则表示processRegionInTransition方法返回为true,一直等待regiontransition的处理完成。并返回true

booleanrit =this.assignmentManager

.processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);

通过RS的RPC接口调用getRegionInfo方法,检查RS中是否有META的注册REGION,如果上一步返回false,

此调用也null,也就是false,因为上一步调用返回false表示没有分配meta,

因此通过getRegionInfo调用时meta还没有注册,因此返回null,否则分配到RS成功,此处返回也就是true

booleanmetaRegionLocation= this.catalogTracker.verifyMetaRegionLocation(timeout);

得到metaregion最后一次分配的RS,如果上一步返回false,此时拿到的RS不是最新的RS,

ServerName currentMetaServer= this.catalogTracker.getMetaLocation();

如果metaregion没有分配成功

if(!metaRegionLocation){

assigned++;

此时rit表示不需要regiontransition

if(!rit) {

如果上次分配META的RS不为空,表示这个RS需要日志split,执行splitlog,并重新执行assignMeta

if(currentMetaServer!= null){

if(serverManager.isServerOnline(currentMetaServer)){

LOG.info("Forcingexpire of " + currentMetaServer);

serverManager.expireServer(currentMetaServer);

}

splitMetaLogBeforeAssignment(currentMetaServer);

if(this.distributedLogReplay){

logReplayFailedMetaServer= currentMetaServer;

}

}

assignmentManager.assignMeta();

}

}else{

//Region already assigned. We didn't assign it. Add to in-memory state.

Region分配成功,设置region状态为open

在RegionStates.lastAssignments中添加到此region对应的server

regionsInTransition中添加region状态为open

regionStates中添加region状态为open

regionStates.updateRegionState(

HRegionInfo.FIRST_META_REGIONINFO,State.OPEN,currentMetaServer);

在RegionStates.regionAssignments添加region对应的server

在RegionStates.serverHoldings中指定server中添加一个open的region

this.assignmentManager.regionOnline(

HRegionInfo.FIRST_META_REGIONINFO,currentMetaServer);

}

启动metatable

enableMeta(TableName.META_TABLE_NAME);

.........此处省去一些处理代码

}

master中监听rs对regionopen的状态修改:

通过AssignmentManager.nodeDataChanged,

当rs把状态修改为opening(RS_ZK_REGION_OPENING)时,更新RegionStates中此region的状态为opening

当rs把状态修改为opened(RS_ZK_REGION_OPENED)时,更新RegionStates中此region的状态为open

同时生成一个OpenedRegionHandler删除zk中此region的transition,具体请自行查看代码。

regionserver启动时执行metaassign

Master通过AssignmentManager.assignMeta时

publicOpenRegionResponse openRegion(finalRpcControllercontroller,

finalOpenRegionRequest request)throwsServiceException {

.........此处省去一些处理代码

OpenRegionResponse.Builderbuilder =OpenRegionResponse.newBuilder();

finalintregionCount= request.getOpenInfoCount();

finalMap<TableName,HTableDescriptor>htds =

newHashMap<TableName,HTableDescriptor>(regionCount);

finalbooleanisBulkAssign= regionCount> 1;

for(RegionOpenInfo regionOpenInfo: request.getOpenInfoList()){

finalHRegionInfo region= HRegionInfo.convert(regionOpenInfo.getRegion());

intversionOfOfflineNode= -1;

if(regionOpenInfo.hasVersionOfOfflineNode()){

versionOfOfflineNode= regionOpenInfo.getVersionOfOfflineNode();

}

HTableDescriptor htd;

try{

检查提交的region是否在rs中的onlineRegions中存在,

finalHRegion onlineRegion= getFromOnlineRegions(region.getEncodedName());

if(onlineRegion!= null){

.........此处省去一些处理代码

如果region已经存在,从meta表中得到此region对应的server

Pair<HRegionInfo,ServerName>p =MetaReader.getRegion(

this.catalogTracker,region.getRegionName());

if(this.getServerName().equals(p.getSecond())){

检查region的transition是否完成,

Boolean closing= regionsInTransitionInRS.get(region.getEncodedNameAsBytes());

.........此处省去一些注释

if(!Boolean.FALSE.equals(closing)

&&getFromOnlineRegions(region.getEncodedName())!= null){

.........此处省去一些日志打印代码

如果region的transitionn为非false,

同时onlineRegions中包含此region,设置response为已经打开的region

builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);

continue;

}

} else{

.........此处省去一些日志打印代码

否则从onlineRegions中移出此region

从regionFavoredNodesMap的region转换地址列表中移出此region

removeFromOnlineRegions(onlineRegion,null);

}

}

LOG.info("Open" +region.getRegionNameAsString());

htd= htds.get(region.getTable());

if(htd ==null){

htd= this.tableDescriptors.get(region.getTable());

htds.put(region.getTable(),htd);

}

finalBoolean previous= this.regionsInTransitionInRS.putIfAbsent(

region.getEncodedNameAsBytes(),Boolean.TRUE);

.........此处省去一些代码

//We are opening this region. If it moves back and forth for whateverreason, we don't

//want to keep returning the stale moved record while we are opening/ifwe close again.

从movedRegions列表中移出此region

removeFromMovedRegions(region.getEncodedName());

if(previous== null){

//check if the region to be opened is marked in recovering state in ZK

如果此region有需要日志重播,添加到recoveringRegions列表中

if(this.distributedLogReplay

&&SplitLogManager.isRegionMarkedRecoveringInZK(this.getZooKeeper(),

region.getEncodedName())){

this.recoveringRegions.put(region.getEncodedName(),null);

}

//If there is no action in progress, we can submit a specific handler.

//Need to pass the expected version in the constructor.

执行metaregion的打开操作。通过handler.process方法

通过hbase.assignment.timeout.management配置是否需要assign超时管理,默认为false

通过hbase.master.assignment.timeoutmonitor.timeout配置assign超时时行,默认为600000ms=10minute

检查是否在rs中的onlineRegions中存在,如果存在,不在向下执行

检查region是否开启RIT(也就是在rs中的regionsInTransitionInRS此region为true),如果没有开启,不在向下执行

把zk中region的状态从offline更新为opening

执行openRegion();方法操作,得到一个HRegion实例,见rs中OpenRegionHandler的openRegion()方法分析

如果region在recoveringRegions列表中存在值,更新region.setRecovering为true,

把HRegion实例添加到recoveringRegions对应的regionvalue中。需要做日志重播的region实例

调用updateMeta(finalHRegion r)更新meta表(meta表更新zk),此方法调用会一直等待,直到openRegion完成。

UpdateMeta会通过一个线程支调用rs的postOpenDeployTasks方法,检查store是否需要compact,并发起compact请求

得到region中所有的store中最小的seqid的值,检查zk中recovering的seqid是否小过此值,如果是更新zk中的seqid

如果是metaregion,更新zk中meta的server地址为当前server的地址。

如果是用户region,更新meta表中此region现在对应的server,与打开region的openSeqNum(当前最大的store的seqid+1).

通过handler.transitionToOpened设置region的状态从opening到opened

把region添加到onlineRegions列表中,移出regionsInTransitionInRS中的此region

设置response的响应为opend

if(region.isMetaRegion()){

this.service.submit(newOpenMetaHandler(this,this,region,htd,

versionOfOfflineNode));

} else{

用户region的打开处理

updateRegionFavoredNodesMapping(region.getEncodedName(),

regionOpenInfo.getFavoredNodesList());

this.service.submit(newOpenRegionHandler(this,this,region,htd,

versionOfOfflineNode));

}

}

builder.addOpeningState(RegionOpeningState.OPENED);

} catch(KeeperExceptionzooKeeperEx){

.........此处省去一些处理代码

} catch(IOException ie){

.........此处省去一些处理代码

}

}

returnbuilder.build();

}

rs中OpenRegionHandler的openRegion()方法分析:

此方法中生成HRegion实例,此实例的类通过hbase.hregion.impl进行配置,默认为HRegion

执行HRegion.openRegion方法,initialize-->initializeRegionInternals

此方法会调用并加载region中所有的store,并得到所有store中最大的store的seqid,

把最大的seqid+1就表示现在region的openSeqNum

store(columnfamily)的加载通过一个独立的线程池去完成,

这个线程池的大小通过hbase.hstore.open.and.close.threads.max进行配置,默认为1

每一个store加载时生成HStore实例,生成HStore实例时会加载此cf下所有的storefile,

storefile的加载线程个数:

a.通过hbase.hstore.open.and.close.threads.max进行配置,默认为1

b.取出store中所有的storefile个数,

a/b=storefile的加载线程个数

启动时的userregion assign

master端的处理

Hmaster.run.finishInitialization方法中:

//Fix up assignment manager status

status.setStatus("Startingassignment manager");

执行userregion的分配操作,

this.assignmentManager.joinCluster();

//setcluster status again after user regions are assigned

this.balancer.setClusterStatus(getClusterStatus());

if(!masterRecovery){

//Start balancer and meta catalog janitor after meta and regions have

//been assigned.

status.setStatus("Startingbalancer and catalog janitor");

this.clusterStatusChore= getAndStartClusterStatusChore(this);

loadbalance的定时线程,通过hbase.balancer.period进行配置,默认值300000ms

this.balancerChore= getAndStartBalancerChore(this);

this.catalogJanitorChore= newCatalogJanitor(this,this);

startCatalogJanitorChore();

}

AssignmentManager.joincluster()方法:

voidjoinCluster()throwsIOException,

KeeperException,InterruptedException {

.........此处省去一些注释

a.得到zk中通过zookeeper.znode.tableEnableDisable配置的地址,默认为table路径下的所有ENABLING的table.

b.得到zk中通过zookeeper.znode.tableEnableDisable配置的地址,默认为table路径下的所有DISABLED的table.

c.得到zk中通过zookeeper.znode.tableEnableDisable配置的地址,默认为table路径下的所有DISABLING的table.

代码如下:

Set<TableName>enablingTables = ZKTable.getEnablingTables(watcher);

Set<TableName>disabledOrEnablingTables = ZKTable.getDisabledTables(watcher);

disabledOrEnablingTables.addAll(enablingTables);

Set<TableName>disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);

disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);

d.通过MetaReader.fullScan把meta下所有的region全部scan出来,

meta的scancache通过hbase.meta.scanner.caching配置,默认100

e.通过ServerManager得到所有的onlineServer,

f.如果onlineServer列表中不包含meta中region的server地址,

添加此region到一个key=servername,value=list<region>的map容器中。

g.如果region对应的table不是disabled/disabling的表,设置table为enabled

h.如果table不是disabled/disabling的表,同时region所在的server为onlineserver,

设置region的状态为open,同时把region添加到RegionStates.onlineRegions列表中。

j.返回f中生成的map,这些个map中的region表示server还没有启动起来,需要重新对region进行分配。

Map<ServerName,List<HRegionInfo>>deadServers= rebuildUserRegions();

//This method will assign all user regions if a clean server startup or

//it will reconstruct master state and cleanup any leftovers from

//previous master process.

1.得到region-in-transition路径下所有目前有transition的regionpath,

2.检查是否有server下线或请求下线,启动时正常为false.

3.在2的检查为false的前提下,检查regionStates是否有已经分配的,非meta的region,如果不包含为false

4.在2/3的检查为false的前提下,检查1中的所有regionpath是否有非meta的regionpath,如果不包含为false

5.如果2/3/4任何一个检查为true,调用processDeadServersAndRecoverLostRegions处理region的分配。

6.否则调用assignAllUserRegions,此处我们分析这个部分。一个新的集群启动

7.通过hbase.master.startup.retainassign配置的值检查是否需要保留分配,默认为true

8.如果7为true,调用assign(Map),否则调用assign(List)

9.检查region的个数是否超过可以批量assign的条件,同时检查server个数是否超过批量assign的条件

region的批量条件通过hbase.bulk.assignment.threshold.regions配置,默认为7

server的批量条件通过hbase.bulk.assignment.threshold.servers配置,默认为3

10.迭代执行每一个region在zk中的region-in-transition路径下创建子路径。设置region状态为PENDING_OPEN

11.调用rs中的openRegion

12.在调用RS的openRegion的过程中会有重试,如果失败,

最大超时时间通过hbase.regionserver.rpc.startup.waittime配置,默认为60000ms

processDeadServersAndRegionsInTransition(deadServers);

recoverTableInDisablingState();

recoverTableInEnablingState();

}

rs中处理userregion的open操作:

publicOpenRegionResponse openRegion(finalRpcControllercontroller,

finalOpenRegionRequest request)throwsServiceException {

try{

checkOpen();

}catch(IOException ie){

thrownewServiceException(ie);

}

requestCount.increment();

OpenRegionResponse.Builderbuilder =OpenRegionResponse.newBuilder();

finalintregionCount= request.getOpenInfoCount();

finalMap<TableName,HTableDescriptor>htds =

newHashMap<TableName,HTableDescriptor>(regionCount);

finalbooleanisBulkAssign= regionCount> 1;

for(RegionOpenInfo regionOpenInfo: request.getOpenInfoList()){

.........此处省去一些处理代码,这部分代码与metaregion的open相同

if(region.isMetaRegion()){

this.service.submit(newOpenMetaHandler(this,this,region,htd,

versionOfOfflineNode));

} else{

用户region的分配

通过handler.process方法

通过hbase.assignment.timeout.management配置是否需要assign超时管理,默认为false

通过hbase.master.assignment.timeoutmonitor.timeout配置assign超时时行,默认为600000ms=10minute

检查是否在rs中的onlineRegions中存在,如果存在,不在向下执行

检查region是否开启RIT(也就是在rs中的regionsInTransitionInRS此region为true),如果没有开启,不在向下执行

把zk中region的状态从offline更新为opening

执行openRegion();方法操作,得到一个HRegion实例,见rs中OpenRegionHandler的openRegion()方法分析

如果region在recoveringRegions列表中存在值,更新region.setRecovering为true,

把HRegion实例添加到recoveringRegions对应的regionvalue中。需要做日志重播的region实例

调用updateMeta(finalHRegion r)更新meta表(用户表更新meta表的数据),此方法调用会一直等待,直到openRegion完成。

UpdateMeta会通过一个线程支调用rs的postOpenDeployTasks方法,检查store是否需要compact,并发起compact请求

得到region中所有的store中最小的seqid的值,检查zk中recovering的seqid是否小过此值,如果是更新zk中的seqid

如果是metaregion,更新zk中meta的server地址为当前server的地址。

如果是用户region,更新meta表中此region现在对应的server,与打开region的openSeqNum(当前最大的store的seqid+1).

通过handler.transitionToOpened设置region的状态从opening到opened

把region添加到onlineRegions列表中,移出regionsInTransitionInRS中的此region

设置response的响应为opend

updateRegionFavoredNodesMapping(region.getEncodedName(),

regionOpenInfo.getFavoredNodesList());

this.service.submit(newOpenRegionHandler(this,this,region,htd,

versionOfOfflineNode));

}

}

builder.addOpeningState(RegionOpeningState.OPENED);

} catch(KeeperExceptionzooKeeperEx){

......................................

}

returnbuilder.build();

}

RS下线的regionassign