天天看點

rocketmq namesrv 第二章接收處理過程

  大家好,很高興在這裡跟大家分享下rocketmq源碼實作,如有不對的地方歡迎指正。

接着上篇文章繼續展開namesrv注冊過程。

rocketmq namesrv 第二章接收處理過程

1 public void initChannel(SocketChannel ch) throws Exception {

           ch.pipeline().addLast(

                  defaultEventExecutorGroup,

                  new NettyEncoder(),

                  new NettyDecoder(),

                  new IdleStateHandler(0, 0, nettyServerConfig

                                    .getServerChannelMaxIdleTimeSeconds()),

                                new NettyConnetManageHandler(),

                                new NettyServerHandler());

   }

   當broker啟動的時候指定namesrv位址後,由netty接收連接配接,分為以下幾個步驟:

   1.1 第一步channel注冊 NettyConnetManageHandler

        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

            final String remoteAddress =                    RemotingHelper.parseChannelRemoteAddr(ctx.channel());

            log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);

            super.channelRegistered(ctx);

        }

        列印出遠端注冊位址,調用父類注冊channel

   1.2 第二步 channel激活

        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());

            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);

            super.channelActive(ctx);

            if (NettyRemotingServer.this.channelEventListener != null) {

                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress

                    .toString(), ctx.channel()));

            }

        }

        列印出channel激活位址,調用父類激活操作

        判斷NettyRemotingServer channelEventListener不為空的時候,放入Netty連接配接事件

        主要是處理 CONNECT,CLOSE,IDLE,EXCEPTION 處理Channel被關閉,下線broker

2 public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

        ByteBuf frame = null;

        try {

            frame = (ByteBuf) super.decode(ctx, in);

            if (null == frame) {

                return null;

            }

            ByteBuffer byteBuffer = frame.nioBuffer();

            return RemotingCommand.decode(byteBuffer);

        } catch (Exception e) {

            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);

            RemotingUtil.closeChannel(ctx.channel());

        } finally {

            if (null != frame) {

                frame.release();

            }

        }

        return null;

      }

      NettyDecoder.decode解碼ByteBuf轉換成RemotingCommand,伺服器與用戶端通過傳遞            RemotingCommand來互動,包含Header,Body

      private int code; code碼

      private LanguageCode language = LanguageCode.JAVA;互動語言

      private int version = 0;版本

      private int opaque = RequestId.getAndIncrement();操作

      private int flag = 0;标示

      private String remark;

      private HashMap<String, String> extFields;擴充字段

      private transient CommandCustomHeader customHeader;使用者header

      private transient byte[] body;消息體

3 NettyServerHandler.channelRead0 netty服務處理

      public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

        final RemotingCommand cmd = msg;

        if (cmd != null) {

            switch (cmd.getType()) { 根據 command type判斷

            case REQUEST_COMMAND: 請求指令

                proce***equestCommand(ctx, cmd);

                break;

            case RESPONSE_COMMAND: 響應指令

                proce***esponseCommand(ctx, cmd);

                break;

            default:

                break;

            }

        }

      }

      處理消息接受處理,分以下步驟:

      3.1 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());

        final Pair<NettyRequestProcessor, ExecutorService> pair =

                null == matched ? this.defaultRequestProcessor : matched;

      擷取預設的請求處理

      3.2 if (pair != null) {

            Runnable run = new Runnable() {

                @Override

                public void run() {

                    try {

                        RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();

                        這是rpc 勾作用是是在處理之前或者處理之後擴充一些操作

                        if (rpcHook != null) {

                            rpcHook

                                .doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);

                        }

                        final RemotingCommand response = pair.getObject1().proce***equest(ctx, cmd);

                        真正的處理在這裡,稍後在做詳細解釋

                        if (rpcHook != null) {

                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),

                                cmd, response);

                        }

                        if (!cmd.isOnewayRPC()) {

                            if (response != null) {

                                response.setOpaque(cmd.getOpaque());

                                response.markResponseType();

                                try {

                                    ctx.writeAndFlush(response);

                                    響應response及重新整理緩沖池

                                }

                                catch (Throwable e) {

                                    plog.error("process request over, but response failed", e);

                                    plog.error(cmd.toString());

                                    plog.error(response.toString());

                                }

                            }

                            else {

                                // 收到請求,但是沒有傳回應答,可能是proce***equest中進行了應答,忽略這種情況

                            }

                        }

                    }

                    catch (Throwable e) {

                        plog.error("process request exception", e);

                        plog.error(cmd.toString());

                        if (!cmd.isOnewayRPC()) {

                            final RemotingCommand response =

                                    RemotingCommand.createResponseCommand(

                                        RemotingSysResponseCode.SYSTEM_ERROR,//

                                        RemotingHelper.exceptionSimpleDesc(e));

                            response.setOpaque(cmd.getOpaque());

                            ctx.writeAndFlush(response);

                        }

                    }

                }

            };

            try {

                // 這裡需要做流控,要求線程池對應的隊列必須是有大小限制的

                pair.getObject2().submit(run);

            }

            catch (RejectedExecutionException e) {

                // 每個線程10s列印一次

                if ((System.currentTimeMillis() % 10000) == 0) {

                    plog.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //

                            + ", too many requests and system thread pool busy, RejectedExecutionException " //

                            + pair.getObject2().toString() //

                            + " request code: " + cmd.getCode());

                }

                if (!cmd.isOnewayRPC()) {

                    final RemotingCommand response =

                            RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,

                                "too many requests and system thread pool busy, please try another server");

                    response.setOpaque(cmd.getOpaque());

                    ctx.writeAndFlush(response);

                }

                如果線程池已滿,發生拒絕exception的時候,傳回線程池忙碌

            }

        }

        這裡需要特别注意:采用線程池對請求做限流的處理

      3.3 String error = " request type " + cmd.getCode() + " not supported";

            final RemotingCommand response =

                  RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED,

                        error);

            response.setOpaque(cmd.getOpaque());

            ctx.writeAndFlush(response);

            plog.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);

          如果沒有預設注冊的各個RPC處理器則傳回不支援異常

     3.4 final RemotingCommand response = pair.getObject1().proce***equest(ctx, cmd);

          這裡分析下真正處理的細節 包含設定 kv config配置,擷取kv config,删除kv config,注冊broker,下線broker,根據Topic擷取Broker Name、隊列數,擷取注冊到Name Server的所有Broker叢集資訊  

     3.5 switch (request.getCode()) {

        case RequestCode.PUT_KV_CONFIG:

            return this.putKVConfig(ctx, request);

        case RequestCode.GET_KV_CONFIG:

            return this.getKVConfig(ctx, request);

        case RequestCode.DELETE_KV_CONFIG:

            return this.deleteKVConfig(ctx, request);

        case RequestCode.REGISTER_BROKER:

            Version brokerVersion = MQVersion.value2Version(request.getVersion());

            // 新版本Broker,支援Filter Server

            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {

                return this.registerBrokerWithFilterServer(ctx, request);

            }

            // 低版本Broker,不支援Filter Server

            else {

                return this.registerBroker(ctx, request);

            }

        case RequestCode.UNREGISTER_BROKER:

            return this.unregisterBroker(ctx, request);

        case RequestCode.GET_ROUTEINTO_BY_TOPIC:

            return this.getRouteInfoByTopic(ctx, request);

        case RequestCode.GET_BROKER_CLUSTER_INFO:

            return this.getBrokerClusterInfo(ctx, request);

        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:

            return this.wipeWritePermOfBroker(ctx, request);

        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:

            return getAllTopicListFromNameserver(ctx, request);

        case RequestCode.DELETE_TOPIC_IN_NAMESRV:

            return deleteTopicInNamesrv(ctx, request);

        case RequestCode.GET_KV_CONFIG_BY_VALUE:

            return getKVConfigByValue(ctx, request);

        case RequestCode.DELETE_KV_CONFIG_BY_VALUE:

            return deleteKVConfigByValue(ctx, request);

        case RequestCode.GET_KVLIST_BY_NAMESPACE:

            return this.getKVListByNamespace(ctx, request);

        case RequestCode.GET_TOPICS_BY_CLUSTER:

            return this.getTopicsByCluster(ctx, request);

        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:

            return this.getSystemTopicListFromNs(ctx, request);

        case RequestCode.GET_UNIT_TOPIC_LIST:

            return this.getUnitTopicList(ctx, request);

        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:

            return this.getHasUnitSubTopicList(ctx, request);

        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:

            return this.getHasUnitSubUnUnitTopicList(ctx, request);

        default:

            break;

        }

       根據請求code做響應的操作,比如注冊broker資訊

       case RequestCode.REGISTER_BROKER:

            Version brokerVersion = MQVersion.value2Version(request.getVersion());

            // 新版本Broker,支援Filter Server

            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {

                return this.registerBrokerWithFilterServer(ctx, request);

            }

            // 低版本Broker,不支援Filter Server

            else {

                return this.registerBroker(ctx, request);

            }

    3.6 final RemotingCommand response =

                RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);

        建立response響應RemotingCommand

        final RegisterBrokerResponseHeader responseHeader =

                (RegisterBrokerResponseHeader) response.readCustomHeader();

        擷取響應請求頭

        final RegisterBrokerRequestHeader requestHeader =

                (RegisterBrokerRequestHeader) request

                    .decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

        解碼定制的頭資訊

        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

        if (request.getBody() != null) {

            registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);

            解碼請求body包含topic config

        }

        else {

            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion()

                .setCounter(new AtomicLong(0));

            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);

        }

        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(//

            requestHeader.getClusterName(), // 1叢集名稱

            requestHeader.getBrokerAddr(), // 2broker位址

            requestHeader.getBrokerName(), // 3broker名稱

            requestHeader.getBrokerId(), // 4brokerID

            requestHeader.getHaServerAddr(),// 5HA位址

            registerBrokerBody.getTopicConfigSerializeWrapper(), // 6topic config配置

            registerBrokerBody.getFilterServerList(),//擷取過濾服務

            ctx.channel()// 7擷取broker channel

            );

        注冊broker資訊

        responseHeader.setHaServerAddr(result.getHaServerAddr());

        設定HA位址,broker啟動後 端口+1作為HA備用位址

        responseHeader.setMasterAddr(result.getMasterAddr());

        設定主位址

        // 擷取順序消息 topic 清單

        byte[] jsonValue =

                this.namesrvController.getKvConfigManager().getKVListByNamespace(

                    NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);

        response.setBody(jsonValue);

        response.setCode(ResponseCode.SUCCESS);

        response.setRemark(null);

        return response;

轉載于:https://blog.51cto.com/haqiaolong/1631306