大家好,很高興在這裡跟大家分享下rocketmq源碼實作,如有不對的地方歡迎指正。
接着上篇文章繼續展開namesrv注冊過程。

1 public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig
.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyServerHandler());
}
當broker啟動的時候指定namesrv位址後,由netty接收連接配接,分為以下幾個步驟:
1.1 第一步channel注冊 NettyConnetManageHandler
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
super.channelRegistered(ctx);
}
列印出遠端注冊位址,調用父類注冊channel
1.2 第二步 channel激活
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress
.toString(), ctx.channel()));
}
}
列印出channel激活位址,調用父類激活操作
判斷NettyRemotingServer channelEventListener不為空的時候,放入Netty連接配接事件
主要是處理 CONNECT,CLOSE,IDLE,EXCEPTION 處理Channel被關閉,下線broker
2 public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
NettyDecoder.decode解碼ByteBuf轉換成RemotingCommand,伺服器與用戶端通過傳遞 RemotingCommand來互動,包含Header,Body
private int code; code碼
private LanguageCode language = LanguageCode.JAVA;互動語言
private int version = 0;版本
private int opaque = RequestId.getAndIncrement();操作
private int flag = 0;标示
private String remark;
private HashMap<String, String> extFields;擴充字段
private transient CommandCustomHeader customHeader;使用者header
private transient byte[] body;消息體
3 NettyServerHandler.channelRead0 netty服務處理
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) { 根據 command type判斷
case REQUEST_COMMAND: 請求指令
proce***equestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND: 響應指令
proce***esponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
處理消息接受處理,分以下步驟:
3.1 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair =
null == matched ? this.defaultRequestProcessor : matched;
擷取預設的請求處理
3.2 if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
這是rpc 勾作用是是在處理之前或者處理之後擴充一些操作
if (rpcHook != null) {
rpcHook
.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
final RemotingCommand response = pair.getObject1().proce***equest(ctx, cmd);
真正的處理在這裡,稍後在做詳細解釋
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
cmd, response);
}
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(cmd.getOpaque());
response.markResponseType();
try {
ctx.writeAndFlush(response);
響應response及重新整理緩沖池
}
catch (Throwable e) {
plog.error("process request over, but response failed", e);
plog.error(cmd.toString());
plog.error(response.toString());
}
}
else {
// 收到請求,但是沒有傳回應答,可能是proce***equest中進行了應答,忽略這種情況
}
}
}
catch (Throwable e) {
plog.error("process request exception", e);
plog.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(
RemotingSysResponseCode.SYSTEM_ERROR,//
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(cmd.getOpaque());
ctx.writeAndFlush(response);
}
}
}
};
try {
// 這裡需要做流控,要求線程池對應的隊列必須是有大小限制的
pair.getObject2().submit(run);
}
catch (RejectedExecutionException e) {
// 每個線程10s列印一次
if ((System.currentTimeMillis() % 10000) == 0) {
plog.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
+ ", too many requests and system thread pool busy, RejectedExecutionException " //
+ pair.getObject2().toString() //
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"too many requests and system thread pool busy, please try another server");
response.setOpaque(cmd.getOpaque());
ctx.writeAndFlush(response);
}
如果線程池已滿,發生拒絕exception的時候,傳回線程池忙碌
}
}
這裡需要特别注意:采用線程池對請求做限流的處理
3.3 String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED,
error);
response.setOpaque(cmd.getOpaque());
ctx.writeAndFlush(response);
plog.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
如果沒有預設注冊的各個RPC處理器則傳回不支援異常
3.4 final RemotingCommand response = pair.getObject1().proce***equest(ctx, cmd);
這裡分析下真正處理的細節 包含設定 kv config配置,擷取kv config,删除kv config,注冊broker,下線broker,根據Topic擷取Broker Name、隊列數,擷取注冊到Name Server的所有Broker叢集資訊
3.5 switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
// 新版本Broker,支援Filter Server
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
// 低版本Broker,不支援Filter Server
else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KV_CONFIG_BY_VALUE:
return getKVConfigByValue(ctx, request);
case RequestCode.DELETE_KV_CONFIG_BY_VALUE:
return deleteKVConfigByValue(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
default:
break;
}
根據請求code做響應的操作,比如注冊broker資訊
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
// 新版本Broker,支援Filter Server
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
// 低版本Broker,不支援Filter Server
else {
return this.registerBroker(ctx, request);
}
3.6 final RemotingCommand response =
RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
建立response響應RemotingCommand
final RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.readCustomHeader();
擷取響應請求頭
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request
.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
解碼定制的頭資訊
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
解碼請求body包含topic config
}
else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion()
.setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);
}
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(//
requestHeader.getClusterName(), // 1叢集名稱
requestHeader.getBrokerAddr(), // 2broker位址
requestHeader.getBrokerName(), // 3broker名稱
requestHeader.getBrokerId(), // 4brokerID
requestHeader.getHaServerAddr(),// 5HA位址
registerBrokerBody.getTopicConfigSerializeWrapper(), // 6topic config配置
registerBrokerBody.getFilterServerList(),//擷取過濾服務
ctx.channel()// 7擷取broker channel
);
注冊broker資訊
responseHeader.setHaServerAddr(result.getHaServerAddr());
設定HA位址,broker啟動後 端口+1作為HA備用位址
responseHeader.setMasterAddr(result.getMasterAddr());
設定主位址
// 擷取順序消息 topic 清單
byte[] jsonValue =
this.namesrvController.getKvConfigManager().getKVListByNamespace(
NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
轉載于:https://blog.51cto.com/haqiaolong/1631306