天天看點

詳解rocketMq通信子產品&更新構想

作者:技術聯盟總壇

陳鵬輝(承飛) 大淘寶技術 2023-07-26 16:20 發表于浙江

詳解rocketMq通信子產品&更新構想

本文從開發者的角度深入解析了基于netty的通信子產品, 并通過簡易擴充實作微服務化通信工具雛形, 适合于想要了解netty通信架構的使用案例, 想了解中間件通信子產品設計, 以及微服務通信底層架構的同學。希望此文能給大家帶來通信子產品架構靈感。

詳解rocketMq通信子產品&更新構想

概述

網絡通信是很常見的需求,

對于傳統web網頁工具短連接配接場景,浏覽器和伺服器互動,常見為浏覽器通過http協定請求Tomcat伺服器;

對于長連接配接場景, 比如即時通訊,或中間件等實時性要求高的場景,一般采用tcp協定的長連接配接進行全雙工實時通信;

對于java開發者來說,使用原生socket進行tcp開發,效率是比較低的,穩定性可靠性等也不好保障,一般選擇網絡通信架構netty加快開發效率。

對于上層應用來說,netty的标準使用方式依然比較繁瑣,未能很好的适配一些業務使用場景,比如rocketMq根據netty包裝了一層業務架構:通信子產品remoting。

該子產品可用性高,穩定性好,易擴充,經過了中間件産品長期高并發的品質驗證, 值得信任,并廣泛用于其他點對點(指定ip)通信場景,如dleger(raft的java實作)。

有相關通信需求的同學也都可以參考該通信子產品,相信有很多的靈感,或直接使用該通信子產品,帶來開發效率的提升。

本文從一個普通java開發者的視角,去解析該通信子產品

  1. 如何用 - 常見使用方式
  2. 實作原理 - 資料流轉鍊路
  3. 設計關鍵點 - 為什麼要如此設計
  4. 子產品更新 - 實作簡易的微服務化通信工具

本文代碼版本:

<parent>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-remoting</artifactId>
  <version>5.0.1-PREVIEW-SNAPSHOT</version>
</parent>           
詳解rocketMq通信子產品&amp;更新構想

如何用

編寫簡單易懂的測試demo,實作server client的互動流程。

簡單示例 協定code 為寫死 0 1 5 9,輸入測試資訊,輸出使用sysout。

▐啟動server 注冊服務監聽

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Server {
    public static void main(String[] args) throws Exception {
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        // 配置端口
        nettyServerConfig.setListenPort(8888);
        // 配置線程數 netty workGroup 線程池 處理io等低耗時
        nettyServerConfig.setServerSelectorThreads(2);
        // 配置線程數 netty eventGroup 線程池 處理自定義hander/長耗時等
        nettyServerConfig.setServerWorkerThreads(8);
        NettyRemotingServer remotingServer = new NettyRemotingServer(nettyServerConfig, null);


        // 支援共用或獨立的業務處理線程池
        ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
        ExecutorService poolB = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));


        // 業務處理器
        NettyRequestProcessor processA = new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 0:
                        response.setBody(new String("hello sync 0").getBytes());
                    case 1:
                        response.setBody(new String("hello sync 1").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 業務處理器
        NettyRequestProcessor processB = new NettyRequestProcessor(){
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 9:
                        response.setBody(new String("hello sync 9").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 注冊 協定 - 對應的處理器, 類似web url 路由到對應的class
        remotingServer.registerProcessor(0, processA, poolA);
        remotingServer.registerProcessor(1, processA, poolA);
        remotingServer.registerProcessor(9, processB, poolB);


        remotingServer.start();


        System.out.println("start ok " + JSON.toJSONString(nettyServerConfig));
        System.in.read();
    }
}           

▐啟動client 發起調用

import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Client {
    public static void main(String[] args) throws Exception {
        NettyClientConfig nettyServerConfig = new NettyClientConfig();
        // 配置線程數 netty eventGroup 線程池 處理自定義hander/耗時長等
        nettyServerConfig.setClientWorkerThreads(8);
        NettyRemotingClient remotingClient = new NettyRemotingClient(nettyServerConfig, null);


        // 支援共用或獨立的業務處理線程池
        ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));


        // 監聽服務端發過來的請求
        remotingClient.registerProcessor(5, new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("receive from server : " + request.getCode());
                return null;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        }, poolA);


        remotingClient.start();


        // 主動發起遠端調用 
        {
            // 同步調用
            RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
            request.setRemark("sync");
            RemotingCommand response = remotingClient.invokeSync("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call sync ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
        }
        {
            // 異步調用
            RemotingCommand request = RemotingCommand.createRequestCommand(1, null);
            request.setRemark("async");
            remotingClient.invokeAsync("127.0.0.1:8888", request, 30 * 1000L, new InvokeCallback() {
                @Override
                public void operationComplete(ResponseFuture responseFuture) {
                    RemotingCommand response = responseFuture.getResponseCommand();
                    System.out.println("call async ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
                }
            });
        }
        {
            // 單向調用
            RemotingCommand request = RemotingCommand.createRequestCommand(9, null);
            request.setRemark("oneway");
            remotingClient.invokeOneway("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call oneway ok ");
        }
        System.in.read();
    }
}           

該點對點調用,是需要手動指定目标伺服器的ip和端口的,不同于hsf擁有注冊中心進行協調撮合提供目标ip。

▐日志輸出

Connected to the target VM, address: '127.0.0.1:57381', transport: 'socket'
start ok {"listenPort":8888,"serverAsyncSemaphoreValue":64,"serverCallbackExecutorThreads":0,"serverChannelMaxIdleTimeSeconds":120,"serverOnewaySemaphoreValue":256,"serverPooledByteBufAllocatorEnable":true,"serverSelectorThreads":2,"serverSocketRcvBufSize":65535,"serverSocketSndBufSize":65535,"serverWorkerThreads":8,"useEpollNativeSelector":false}
received from client, remark:sync, coe:0
received from client, remark:async, coe:1
received from client, remark:oneway, coe:9

           
Connected to the target VM, address: '127.0.0.1:57385', transport: 'socket'
call sync ok remark:server body:hello sync 1
call oneway ok 
call async ok remark:server body:hello sync 1           
詳解rocketMq通信子產品&amp;更新構想

實作原理

關于netty如何封裝java基礎nio socket不做展開。

這裡分析通信子產品是如何封裝netty,擴充調用協定規範的部分,重點描述其中關鍵的設計要點。

▐ server 啟動 監聽請求

作為服務端,需綁定端口,監聽請求,這裡采用标準netty服務端模式。

remotingServer.start();
@Override
    public void start() {
        ...
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
    encoder,
    new NettyDecoder(),
    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
    connectionManageHandler,
    serverHandler
);
                    }
                });
        ...
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        ...
    }           

關注涉及幾個線程池的地方:

  1. bossGroup -> eventLoopGroupBoss 固定線程數1
  2. workerGroup -> eventLoopGroupSelector 若linux采用epoll實作 否則使用nio實作, 線程數可配置
  3. eventGroup -> defaultEventExecutorGroup 普通實作的 handler 工作線程池, 線程數可配置

另外就是傳統藝能:心跳, 解碼器 NettyEncoder,編碼器 NettyDecoder,連接配接管理器 connectionManageHandler,和最終的業務處理器 serverHandler

▐ server 注冊業務處理器

業務線程池配置

請求協定code關聯業務處理器

// 支援共用或獨立的業務處理線程池
        ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
        ExecutorService poolB = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));


        // 業務處理器
        NettyRequestProcessor processA = new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 0:
                        response.setBody(new String("hello sync 0").getBytes());
                    case 1:
                        response.setBody(new String("hello sync 1").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 業務處理器
        NettyRequestProcessor processB = new NettyRequestProcessor(){
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 9:
                        response.setBody(new String("hello sync 9").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 注冊 協定 - 對應的處理器, 類似web url 路由到對應的class
        remotingServer.registerProcessor(0, processA, poolA);
        remotingServer.registerProcessor(1, processA, poolA);
        remotingServer.registerProcessor(9, processB, poolB);           

不同業務獨立線程池的必要性

在複雜業務場景中,比如商品管理鍊路,訂單交易鍊路,将所有的請求堆積在一個線程池中,快請求和慢請求公用一個賽道,無法避免資源配置設定不均問題

通信子產品設計為可手動配置每個業務的處理線程池

注冊路由和線程池關系

@Override
    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
        ExecutorService executorThis = executor;
        if (null == executor) {
            executorThis = this.publicExecutor;
        }


        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
        this.processorTable.put(requestCode, pair);
    }           

建立 code - processor - pool 的三者映射關系,在後續收到請求後,可查找注冊關系進行路由喚起processor

▐ client 啟動 發起請求

NettyRemotingClient remotingClient = new NettyRemotingClient(nettyServerConfig, null);


        remotingClient.start();


// 主動發起遠端調用
        {
            // 同步調用
            RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
            request.setRemark("sync");
            RemotingCommand response = remotingClient.invokeSync("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call sync ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
        }
        {
            // 異步調用
            RemotingCommand request = RemotingCommand.createRequestCommand(1, null);
            request.setRemark("async");
            remotingClient.invokeAsync("127.0.0.1:8888", request, 30 * 1000L, new InvokeCallback() {
                @Override
                public void operationComplete(ResponseFuture responseFuture) {
                    RemotingCommand response = responseFuture.getResponseCommand();
                    System.out.println("call async ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
                }
            });
        }
        {
            // 單向調用
            RemotingCommand request = RemotingCommand.createRequestCommand(9, null);
            request.setRemark("oneway");
            remotingClient.invokeOneway("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call oneway ok ");
        }           

啟動用戶端client後,即處于長連接配接狀态,雙向通信及時性有保障

三種調用模式

作為通信元件,需要适配多種調用場景,同步異步調用已是基本操作,oneway用于不關心是否傳回的場景。

試想一下,在全雙工雙向異步通信的背景下,如何能像http一樣實作同步調用,發出一個請求,收到一個請求後怎麼跟前面發出的請求關聯起來,又如何實作異步等待轉為同步響應。

詳解rocketMq通信子產品&amp;更新構想
  • 同步調用

發起請求

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        // 唯一id
        final int opaque = request.getOpaque(); 
    ...
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        // 把目前請求記錄到待響應table中
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    //标記為寫入成功
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }
                // 寫入異常結果 并喚起wait的線程
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                public void putResponse(final RemotingCommand responseCommand) {
                    this.responseCommand = responseCommand;
                    this.countDownLatch.countDown();
                }
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        // 同步等待結果
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
            this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
            return this.responseCommand;
        }
      ...
    }           

關鍵設計點:每一個請求request,都配置設定了一個 client唯一自增的id (request.getOpaque(); requestId.getAndIncrement())。

把id和上下文存儲到請求待響應table中:發送請求後(寫入channel),線程等待結果響應 responseFuture.waitResponse,利用countDownLatch等待結果。

  • 異步調用

發起請求

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)
        // 唯一id
    final int opaque = request.getOpaque();
    ... 
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
        // 把目前請求記錄到待響應table中
        this.responseTable.put(opaque, responseFuture);
        ...
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    //标記為寫入成功
                    responseFuture.setSendRequestOK(true);
                    return;
                }
                requestFail(opaque);
                log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
            }
        }); 
        ...
    }           

關鍵設計點:每一個請求request,都配置設定了一個 client唯一自增的id (request.getOpaque(); requestId.getAndIncrement())。

把id和上下文存儲到請求待響應table中:發送請求後,将callback傳遞給responseFuture,等待callback被調用。

  • 單向調用oneway

發起請求

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        request.markOnewayRPC();
        ...
        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                once.release();
                if (!f.isSuccess()) {
                    log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                }
            }
        });
        ...
    }           

無需監聽結果

關鍵設計點:使用信号量Semaphore控制并發數

是通道瞬間并發度,不同于流控qps

oneway模式:不同于同步調用 異步調用 這裡不關心傳回值 是以無需記錄id到待響應table

▐ server受理請求 路由

監聽請求

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                // 來自client的請求
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                // 來自client的響應
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
     public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        // 路由關系 線程池配置 查詢 
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();
    ...
        Runnable run = new Runnable() {
            @Override
            public void run() {
        ...
                final RemotingResponseCallback callback = new RemotingResponseCallback() {
                    @Override
                    public void callback(RemotingCommand response) {
                        ...  
                        // 非oneway模式 才需要回寫response
                        if (!cmd.isOnewayRPC()) {
                            ...
                            ctx.writeAndFlush(response); 
                            ...
                        }
                    }
                };
                ...
                // 使用指定的業務處理器processor處理業務
                NettyRequestProcessor processor = pair.getObject1();
                RemotingCommand response = processor.processRequest(ctx, cmd);
                callback.callback(response); 
                ...
            }
        };
        ...
        // 包裝為線程任務 放到配置的線程池中執行
        final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
        pair.getObject2().submit(requestTask);
        ...
    }           

關鍵設計點

抽象複用:

client 和 server的 網絡通信讀子產品是高度一緻的,是以抽象出來共有的部分,複用代碼,繼承結構:

詳解rocketMq通信子產品&amp;更新構想

是一個很标準的抽象複用案例, 但需注意在兩個角色(client server)中同一份代碼是有不一樣的解讀鍊路

路由實作:

利用code - processor - pool 的三者映射關系友善的拿到對應業務的處理器及其獨立的線程池,進行任務投遞

設計理念類似觀察者模式,添加觀察者-業務處理器(這裡僅單個觀察者),當事件來了(socket消息讀取)後,通知到所有觀察者進行具體業務處理。

▐ client 監聽響應

  • 監聽 同步調用結果
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {        @Override        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                // 來自server的請求
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                // 來自server的響應
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
     public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
         // 從待響應table中找到響應對應的請求
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);


            responseTable.remove(opaque);
            if (responseFuture.getInvokeCallback() != null) {
                // 異步調用 回調callback
                executeInvokeCallback(responseFuture);
            } else {
                // 同步調用
                // 寫入正常結果 并喚起wait的線程
                responseFuture.putResponse(cmd);
                public void putResponse(final RemotingCommand responseCommand) {
                    this.responseCommand = responseCommand;
                    this.countDownLatch.countDown();
                }
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }           

關鍵設計點

異步協調 && 同步等待 && 喚起機制

讀取到來自server響應資料的線程 -> 通過待響應table查找目前響應歸屬的請求 -> 操作其countDownLatch定向喚起等待結果的請求線程

同步結果喚起條件:寫入異常 || 等待逾時 || 讀取到來自server的對應id的響應

// 同步等待結果

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

  • 監聽 異步調用結果
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                // 來自server的請求
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                // 來自server的響應
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
     public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
         // 從待響應table中找到響應對應的請求
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);


            responseTable.remove(opaque);
            if (responseFuture.getInvokeCallback() != null) {
                // 異步調用
                executeInvokeCallback(responseFuture);
            } else {
                // 同步調用
                // 寫入結果 并喚起wait的線程
                responseFuture.putResponse(cmd);
                   public void putResponse(final RemotingCommand responseCommand) {
                this.responseCommand = responseCommand;
                this.countDownLatch.countDown();
            }
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }


  private void executeInvokeCallback(final ResponseFuture responseFuture) {
        ExecutorService executor = this.getCallbackExecutor();
    ...
        executor.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    responseFuture.executeInvokeCallback();
                } catch (Throwable e) {
                    log.warn("execute callback in executor exception, and callback throw", e);
                } finally {
                    responseFuture.release();
                }
            }
        });
        ...  
    }           

關鍵設計點

  • 異步協調 && callback機制

讀取到來自server響應資料的線程 -> 通過待響應table查找目前響應歸屬的請求 -> 回調callback

異步結果回調callback條件:寫入異常 || 等待逾時 || 讀取到來自server的對應id的響應

另外callback執行采用了cas機制限制僅執行一次

詳解rocketMq通信子產品&amp;更新構想

子產品更新-微服務化通信工具

why?

從業務視角開發來看,通信子產品依然是比較基礎的,對于普通開發者,希望能夠像hsf一樣,簡單的定制協定service,契合java接口實作多态機制,不希望每次都去根據code或其他url之類的手動去分發路由,顯得過于原始。

how?

參考hsf系列的遠端調用方式,使用動态代理規範化協定傳輸,使用泛化反射機制便捷調用。

封裝程度跟靈活程度往往是成反比的,注意不要過度設計,盡可能保留原始通信子產品的靈活。

▐ 使用方式

  • 定義接口 和 實作
public interface ServiceHello {
    String sayHello(String a, String b);
    Integer sayHelloInteger(Integer a, Integer b);
}           
import com.uext.remote.rf.service.ServiceHello;
public class ServiceHelloImpl implements ServiceHello {
    @Override
    public String sayHello(String a, String b) {
        return "hello " + a + " " + b;
    }
    @Override
    public Integer sayHelloInteger(Integer a, Integer b) {
        return 1000 + a + b;
    }
}           

同hsf,接口interface可打包後提供給消費者,實作類隐藏于提供者代碼中

  • 啟動provider 注冊服務監聽
import com.alibaba.fastjson.JSON;
import com.uext.remote.rf.provider.ServiceHelloImpl;
import com.uext.remote.rf.provider.ServiceWorldImpl;
import com.uext.remote.rf.service.ServiceHello;
import com.uext.remote.rf.service.ServiceWorld;


public class TestServer {
    public static void main(String[] args) throws Exception {
        ApiProviderBean apiProviderBean = new ApiProviderBean();
        apiProviderBean.setPort(8888);
        apiProviderBean.init();
        apiProviderBean.register(ServiceHello.class, new ServiceHelloImpl());
        apiProviderBean.register(ServiceWorld.class, new ServiceWorldImpl());


        System.out.println("start ok " + JSON.toJSONString(apiProviderBean));
        System.in.read();
    }
}           

啟動服務端,注冊一些需要暴露的服務,通過接口和接口的實作類的執行個體進行綁定

  • 啟動consumer 發起調用
import com.uext.remote.rf.service.ServiceHello;
import com.uext.remote.rf.service.ServiceWorld;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
public class TestClient {
    public static void main(String[] args) throws Exception {
        // 初始化一個連接配接用戶端
        NettyClientConfig nettyServerConfig = new NettyClientConfig();
        NettyRemotingClient remotingClient = new NettyRemotingClient(nettyServerConfig, null);
        remotingClient.start();
        ApiConsumerBean apiConsumerBean = new ApiConsumerBean();
        apiConsumerBean.setRemotingClient(remotingClient);
        apiConsumerBean.setInterfac(ServiceHello.class);
        apiConsumerBean.setTimeOut(30000L);
        apiConsumerBean.setAddr("127.0.0.1:8888");


        ServiceHello serviceHello = apiConsumerBean.getProxy();
        ApiConsumerBean apiConsumerBean2 = new ApiConsumerBean();
        apiConsumerBean2.setRemotingClient(remotingClient);
        apiConsumerBean2.setInterfac(ServiceWorld.class);
        apiConsumerBean2.setTimeOut(30000L);
        apiConsumerBean2.setAddr("127.0.0.1:8888");
        ServiceWorld serviceWorld = apiConsumerBean2.getProxy();


        System.out.println(serviceHello.sayHello("a", "b"));
        System.out.println(serviceHello.sayHelloInteger(1, 2));
        serviceWorld.sayWorld("aa", "bb");


        System.in.read();
    }
}           

初始化一個長連接配接用戶端, 擷取接口遠端實作執行個體, 發起調用。

  • 日志輸出
Connected to the target VM, address: '127.0.0.1:49830', transport: 'socket'
start ok {"index":{"com.uext.remote.hsf.service.ServiceWorld":{"public abstract void com.uext.remote.hsf.service.ServiceWorld.sayWorld(java.lang.String,java.lang.String)":{}},"com.uext.remote.hsf.service.ServiceHello":{"public abstract java.lang.Integer com.uext.remote.hsf.service.ServiceHello.sayHelloInteger(java.lang.Integer,java.lang.Integer)":{},"public abstract java.lang.String com.uext.remote.hsf.service.ServiceHello.sayHello(java.lang.String,java.lang.String)":{}}},"port":8888,"remotingServer":{"callbackExecutor":{"activeCount":0,"completedTaskCount":0,"corePoolSize":4,"largestPoolSize":0,"maximumPoolSize":4,"poolSize":0,"queue":[],"rejectedExecutionHandler":{},"shutdown":false,"taskCount":0,"terminated":false,"terminating":false,"threadFactory":{}},"rPCHooks":[]}}
world aa bb           
Connected to the target VM, address: '127.0.0.1:53211', transport: 'socket'
hello a b
1003           

▐ 實作方式

  • 請求頭 參數協定
import lombok.Data;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@Data
public class CommonHeader implements CommandCustomHeader{
    /**
     * com.uext.remote.hsf.service.ServiceHello
     */
    String interfaceName;
    /**
     * public abstract java.lang.String com.uext.remote.hsf.service.ServiceHello.sayHello(java.lang.String,java.lang.String)
     */
    String methodName;


    String argsJsonJson;


    @Override
    public void checkFields() throws RemotingCommandException {
    }
}           

使用接口interface package url 和 方法 method的作為識别碼,用以路由選擇。

其中動态參數問題,需要考慮如何解決解碼為方法參數對應的不同類型,本文采用簡易實作(json)。

  • provider實作代碼
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import io.netty.channel.ChannelHandlerContext;
import lombok.Data;
import org.apache.rocketmq.remoting.netty.*;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.*;
@Data
public class ApiProviderBean {
    private int port = 8888;
    // 長連接配接執行個體
    private NettyRemotingServer remotingServer;
    public void init() throws Exception {
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(port);
        remotingServer = new NettyRemotingServer(nettyServerConfig, null);
        remotingServer.registerProcessor(0, new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                // 請求資料解析
                CommonHeader commonHeader = (CommonHeader) request.decodeCommandCustomHeader(CommonHeader.class);


                // 路由查找
                Map<String/*method*/, Call> map = index.get(commonHeader.getInterfaceName());
                Call call = Objects.requireNonNull(map, "interface not exists " + commonHeader.getInterfaceName()).get(commonHeader.getMethodName());
                if(call == null){
                    throw new RuntimeException("method not exists " + commonHeader.getMethodName());
                }
                // 參數解碼 todo 優化解碼編碼
                Parameter[] ts = call.method.getParameters();
                List<Object> args = new ArrayList<>();
                List<String> argsJson = JSON.parseObject(commonHeader.argsJsonJson, new TypeReference<List<String>>(){});
                for (int i = 0; i < argsJson.size(); i++) {
                    // 根據method規範 逐一反序列化
                    args.add(JSON.parseObject(argsJson.get(i), ts[i].getType()));
                }
                // 反射調用
                Object res = call.method.invoke(call.instance, args.toArray(new Object[0]));
                // 結果編碼 回傳 todo 優化解碼編碼
                RemotingCommand response = RemotingCommand.createResponseCommand(0, null);
                if(res != null) {
                    response.setBody(JSON.toJSONBytes(res));
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        }, null);
        remotingServer.start();
    }
    private static class Call{
        Object instance;
        Method method;
    }
    private Map<String/*interface*/, Map<String/*method*/, Call>> index = new HashMap<>();
    /**
     * @param interfac 接口 協定
     * @param impl 實作類的執行個體
     */
    public synchronized <T> void register(Class<T> interfac, T impl){
        // 建立 接口-實作類-方法 路由關系
        String iname = interfac.getName();
        Map<String/*method*/, Call> map = index.get(iname);
        if(map == null){
            map = new LinkedHashMap<>();
            index.put(iname, map);
        }
        for (Method declaredMethod : interfac.getDeclaredMethods()) {
            Call call = new Call();
            call.instance = impl;
            call.method = declaredMethod;
            map.put(declaredMethod.toString(), call);
        }
    }
}           

關鍵在于 注冊協定(interface)和實作類, 維護映射路由關系。

收到channel請求的資料後,解碼,根據映射路由關系進行反射調用拿到結果,編碼結果,回寫到channel

由于通道code 定義為int,但為了靈活配置接口及實作,不想寫死,是以丢失了自定義不同業務線程池的特性,如果有需要可以重構通道code為string,然後把相關路由協定序列化到通道code中。

  • consumer實作代碼
import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@Data
public class ApiConsumerBean implements InvocationHandler {
    /**
     * 逾時時間
     */
    private Long timeOut = 3000L;
    /**
     * 目标 ip:port
     */
    private String addr = "127.0.0.1:8888";
    /**
     * 實作類
     */
    private Class<?> interfac;
    /**
     * 長連接配接執行個體
     */
    private NettyRemotingClient remotingClient;
    /**
     * 擷取協定 代理執行個體
     */
    public <T> T getProxy() throws IllegalArgumentException {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{interfac}, this);
    }
    /**
     * 規範編碼協定
     */
    @Override
    public Object invoke(Object target, Method method, Object[] args) throws Throwable {
        if (Object.class.equals(method.getDeclaringClass())) {
            return method.invoke(this, args);
        }
        // 協定編碼入參
        CommonHeader header = new CommonHeader();
        header.setInterfaceName(interfac.getName());
        header.setMethodName(method.toString());
        // todo 優化解碼編碼
        List<String> argJson = new ArrayList<>();
        for (Object arg : args) {
            argJson.add(JSON.toJSONString(arg));
        }
        header.setArgsJsonJson(JSON.toJSONString(argJson));
        // 定義通道code 0 為 遠端使用
        RemotingCommand request = RemotingCommand.createRequestCommand(0, header);
        Object res = null;
        if(method.getReturnType() != null) {
            RemotingCommand response = remotingClient.invokeSync(addr, request, timeOut);
            // 協定解碼結果 todo 優化解碼編碼
            if(response.getBody() != null && response.getBody().length > 0) {
                res = JSON.parseObject(new String(response.getBody(), StandardCharsets.UTF_8), method.getReturnType());
            }
        }else{
            remotingClient.invokeOneway(addr, request, timeOut);
        }
        return res;
    }
}           

關鍵在于 委托接口(interface)的調用實作, 動态代理為: 根據協定編碼, 包裝request之後寫入channel

同步等待, 是以采用了同步調用模式

收到channel響應的結果後, 解碼, 傳回結果

其中無傳回值的接口, 不關心響應結果, 可使用oneway方式調用

▐ 更進一步 注冊中心 ip自動選擇

引入注冊中心 zk 或 namesrv,通過中心化協調,讓某一些consumer自動選擇某一台provider,并同時可以支援配置中心化下放,實作服務治理,越來越像微服務(dubbo)架構了哈。

當然,在跟多業務場景中,是無法引入其他中間件的,能少依賴就少依賴,降低複雜度。

在内網環境中,絕大部分項目采用Axxx一站式釋出部署,配套Nxxxxxxx叢集雲資源管理,是支援按應用名動态擷取目前叢集ip清單的。

curl http://xxxxx.xxxx
{
  "num": 164,
  "result": [
    {
      "dns_ip": "13.23.xx.xxx",  
      "state": "working_online"
    },
    ...
  ],
  "start": 0,
  "total": 164
}           

那麼我們是否可以依賴該 ip清單,用來做本地hash ip自動選擇呢?

當然可以,配合可用性心跳探測,每台機器節點自己維護一份可用性提供者消費者清單緩存,通過一緻性hash等算法選擇機器比對機器。

那麼就得到了一個簡易版的低依賴,去中心化,高可用的微服務通信架構。

詳解rocketMq通信子產品&amp;更新構想

團隊介紹

大淘寶技術開放平台,是淘寶天貓與外部生态互聯互通的重要開放途徑,通過開放的産品技術把一系列基礎服務像水、電、煤一樣輸送給我們的商家、開發者、社群媒體以及其他合作夥伴,推動行業的定制、創新、進化,并最終促成新商業文明生态圈。

我們是一支技術能力雄厚,有着光榮曆史傳統的技術團隊。在曆年雙十一戰場上,團隊都表現着優異的成績。這裡承載着每秒百萬級的業務處理,90%的訂單通過訂單推送服務實時地推送到商家的ERP系統完成電商作業,通過奇門開放的ERP-WMS場景已經成為倉儲行業标準。随着新零售業務的持續探索與快速發展,我們渴求各路高手加入,參與核心系統架構設計、性能調優,開放模式創新等富有技術挑戰的工作。

繼續閱讀