前言
這個demo,主要是展示了RPC與Netty在一起所産生的一種奇妙的化學反應,與傳統的,阻塞的通信不同,基于Netty的PRC架構,可以實作,兩個服務之前,異步的方法調用。
其實這篇文章的關鍵詞已經給出來了,即:兩個服務之前,異步的,方法調用。
一、RPC是什麼
如果不清楚RPC是什麼,推薦閱讀我的部落格《PRC原理分析:從一個簡單的DEMO開始》
https://blog.csdn.net/m13797378901/article/details/86538744
當然,我這裡也再一次說明一下RPC是什麼:
RPC,全稱為Remote Procedure Call,即遠端過程調用,它是一個計算機通信協定。它允許像調用本地服務一樣調用遠端服務。它可以有不同的實作方式。如RMI(遠端方法調用)、Hessian、Http invoker等。另外,RPC是與語言無關的。
以上是來自原網上其他部落格的說明,這裡我補充一點:RPC是與語言無關的,他是一種理念,思想。
二、Netty是什麼
Netty是一個高性能、異步事件驅動的NIO架構,它提供了對TCP、UDP和檔案傳輸的支援,作為一個異步NIO架構,Netty的所有IO操作都是異步非阻塞的,通過Future-Listener機制,使用者可以友善的主動擷取或者通過通知機制獲得IO操作結果。作為目前最流行的NIO架構,Netty在網際網路領域、大資料分布式計算領域、遊戲行業、通信行業等獲得了廣泛的應用,一些業界著名的開源元件也基于Netty的NIO架構建構。
Netty是一種先進的NIO解決方案,他完善了java中NIO并且對其進行了優化,使之可以更高效的更友善的進行使用。
那麼NIO是什麼呢?
臭不要臉的部落客推薦閱讀自己的部落格《與NIO的第一次親密接觸》 =_=
https://blog.csdn.net/m13797378901/article/details/88977996
三、源碼分析
廢話不多少,我們直接開幹吧。
1、項目結構
服務端
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczX0xiRGZkRGZ0Xy9GbvNGL2EzXlpXazxSP9EVT3tGRPNTT65UNjpXT4BjMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL1kjM1EzN0ETMwEDNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
用戶端
公共契約公共類
上方圖1為服務端架構,圖2位用戶端架構,圖3為公共類以及契約接口
(1)服務端包結構說明
- api:存放契約實作
- netty:網絡通信,以及核心業務實作
- utils:工具包,Static的資料集合
(2)用戶端包結構說明
- interfaces:存放一個回調接口INettyCallBack
- netty:網絡通信,以及核心業務實作
(3)公共包以及契約
- api: 用戶端與伺服器共同遵守的契約,即用戶端負責調用契約,伺服器負責實作契約
- model:公共實體
- utils:公共的工具包,如json相關,字元串相關的處理
2、服務端代碼分析
(1)Starter 類
該類是啟動類,負責聲明端口,注冊中心資料注入,以及netty的啟動。
* @author xuyuanpeng
* @version 1.0
* @date 2019-04-09 21:31
*/
public class Starter {
private static Logger logger = Logger.getLogger("Starter");
public static void main(String [] agres) throws Exception {
//EchoServerImpl類,實作了IEchoServer 接口
IEchoServer echoServer=new EchoServerImpl(8088);
//将契約注冊到注冊中心,契約儲存的Key:契約,Value:契約實作類。
echoServer.register(IUserService.class, UserServiceImpl.class);
//啟動
echoServer.start();
}
}
(2)IEchoServer 接口
* @author xuyuanpeng
* @version 1.0
* @date 2019-04-09 20:23
*/
public interface IEchoServer {
//停止服務
public void stop();
//開始服務
public void start() throws Exception;
//注冊服務,注冊服務,就是講接口,以及對應的實作,放到了一個Map中
public void register(Class serviceInterface, Class impl);
//判斷目前服務是否在運作
public boolean isRunning();
//擷取使用的端口
public int getPort();
}
(3)EchoServerImpl 核心實作類
值得注意的是, StaticData.serviceRegistry是邏輯上的注冊中心,但本質上是一個HashMap。
在start方法中,将EchoServerHandler注入到ChannelPipeline中。
在Netty裡,Channel是通訊的載體,而ChannelHandler負責Channel中的邏輯處理。那麼ChannelPipeline是什麼呢?我覺得可以了解為ChannelHandler的容器:一個Channel包含一個ChannelPipeline,所有ChannelHandler都會注冊到ChannelPipeline中,并按順序組織起來。
在Netty中,ChannelEvent是資料或者狀态的載體,例如傳輸的資料對應MessageEvent,狀态的改變對應ChannelStateEvent。當對Channel進行操作時,會産生一個ChannelEvent,并發送到ChannelPipeline。ChannelPipeline會選擇一個ChannelHandler進行處理。這個ChannelHandler處理之後,可能會産生新的ChannelEvent,并流轉到下一個ChannelHandler。
/**
* @author xuyuanpeng
* @version 1.0
* @date 2019-03-06 14:47
*/
public class EchoServerImpl implements IEchoServer{
private final Integer port;
private static Boolean isRunning = false;
private static Logger logger = Logger.getLogger("EchoServerImpl");
public EchoServerImpl(int port){
this.port=port;
}
@Override
public void start() throws Exception {
logger.info("start");
NioEventLoopGroup group=new NioEventLoopGroup();
try {
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(group)
//指定使用 NIO 的傳輸 Channel
.channel(NioServerSocketChannel.class)
//.設定 socket 位址使用所選的端口
.localAddress(new InetSocketAddress(port))
//添加 EchoServerHandler 到 Channel 的 ChannelPipeline
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoServerHandler());
}
});
//綁定的伺服器;sync 等待伺服器關閉
ChannelFuture future=bootstrap.bind().sync();
//關閉 channel 和 塊,直到它被關閉
future.channel().closeFuture().sync();
}finally {
//關機的 EventLoopGroup,釋放所有資源。
group.shutdownGracefully().sync();
}
}
@Override
public void register(Class serviceInterface, Class impl) {
StaticData.serviceRegistry.put(serviceInterface.getName(), impl);
}
}
(4)EchoServerHandler 具體業務實作
channelRead方法:接受到用戶端消息後,會進入此方法。
接受到用戶端消息後,将位元組轉為字元串,并且,對字元串進行json解析,得到實體模型,其中包含了類,方法,參數類型,參數值。
再使用java中的invoke,可以直接調用方法,并且擷取傳回值。
擷取傳回值後,轉為ByteBuf類型,再寫入流中,傳回到用戶端。
/**
* @author xuyuanpeng
* @version 1.0
* @date 2019-03-06 14:28
*/
//@Sharable 辨別這類的執行個體之間可以在 channel 裡面共享
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = Logger.getLogger("EchoServerHandler");
@Override
public void channelRead(ChannelHandlerContext context, Object msg){
logger.info("channelRead");
ByteBuf in= (ByteBuf) msg;
//将所接收的資料傳回給發送者。注意,這裡還沒有沖刷資料
/**
* 解析資料,處理資料,調用invoke
* 将invoke資料傳回
*/
String resultStr="";
try {
String read= in.toString(CharsetUtil.UTF_8);
RPCNetty entity = (RPCNetty) JsonMapper.fromJsonString(read,RPCNetty.class);
Class clz= StaticData.serviceRegistry.get(entity.getClzName());
Method method=clz.getMethod(entity.getMethodName(),entity.getParamTypes());
Object resultObj=method.invoke(clz.newInstance(),entity.getArguments());
resultStr=JsonMapper.toJsonString(resultObj);
}catch (Exception e){
logger.info("Exception>"+e.getMessage());
}
ByteBuf resultIn= Unpooled.buffer();
resultIn.writeBytes(resultStr.getBytes());
//必須寫入ByteBuf位元組流
context.write(resultIn);
}
@Override
public void channelReadComplete(ChannelHandlerContext context) throws Exception{
//沖刷所有待審消息到遠端節點。關閉通道後,操作完成
logger.info("channelReadComplete");
context.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3、用戶端代碼分析
(1)Starter 啟動類
聲明端口,通過代理聲明IUserService ,代理調用接口的方法。
由于是異步通信,這裡采用回調的方式來接收資料。
* @author xuyuanpeng
* @version 1.0
* @date 2019-04-09 21:31
*/
public class Starter {
private static java.util.logging.Logger logger = Logger.getLogger("Starter");
public static void main(String [] agres){
final String host = "127.0.0.1";
final int port = 8088;
InetSocketAddress inetSocketAddress=new InetSocketAddress(host, port);
IUserService userService=RPCFactory.getRemoteProxyObj(IUserService.class,inetSocketAddress,
new INettyCallBack() {
@Override
public Object calllBack(Object obj) {
logger.info("obj>"+ JsonMapper.toJsonString(obj));
return null;
}
});
userService.getUser("1");
}
}
(2)IUserService 契約
//契約接口
public interface IUserService {
String getUser(String var1);
}
//契約實作
@Service
public class UserServiceImpl implements IUserService {
@Override
public String getUser(String id) {
return "I am Service, Hi Client "+id +" , nice to meet you :)";
}
}
(3)RPCFactory 代理實作類
傳遞網絡連接配接需要的host以及port。
将相關的對象,來封裝實體對象傳遞到Netty處理類中,進行進一步的處理
private static java.util.logging.Logger logger = Logger.getLogger("EchoClientHandler");
public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr, final INettyCallBack nettyCallBack) {
// 1.将本地的接口調用轉換成JDK的動态代理,在動态代理中實作接口的遠端調用
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
new Class<?>[]{serviceInterface},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
logger.info("invoke");
EchoClient echoClient=new EchoClient(addr);
RPCNetty rpcNetty=new RPCNetty();
rpcNetty.setClzName(serviceInterface.getName());
rpcNetty.setMethodName(method.getName());
rpcNetty.setParamTypes(method.getParameterTypes());
rpcNetty.setArguments(args);
echoClient.setRpcNetty(rpcNetty);
echoClient.setNettyCallBack(nettyCallBack);
echoClient.start();
return null;
}
});
}
}
(4)EchoClient 中
與服務端中的start方法類似,
都是綁定對應的Channel,聲明遠端端口,在回調方法 中,聲明Handler對象,将相關對象,設定到此對象中,最後,将Handler綁定到Channel的管道上。
* @author xuyuanpeng
* @version 1.0
* @date 2019-03-06 17:52
*/
public class EchoClient {
private String host;
private int port;
private InetSocketAddress inetSocketAddress;
private INettyCallBack nettyCallBack;
private RPCNetty rpcNetty;
private static java.util.logging.Logger logger = java.util.logging.Logger.getLogger("EchoClient");
public void start() throws Exception{
EventLoopGroup group=new NioEventLoopGroup();
try {
Bootstrap b=new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.remoteAddress(inetSocketAddress)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
EchoClientHandler echoClientHandler=new EchoClientHandler();
echoClientHandler.setRpcNetty(rpcNetty);
echoClientHandler.setNettyCallBack(nettyCallBack);;
socketChannel.pipeline().addLast(echoClientHandler);
}
});
ChannelFuture future=b.connect().sync();
future.channel().closeFuture().sync();
}finally {
group.shutdownGracefully().sync();
}
}
......
}
(5)EchoClientHandler 具體業務處理方法
當Handler被處理,激活以後,會觸發channelActive方法,在此方法中,可以将由代理方法中傳輸的對象,傳遞到伺服器中,具體傳輸方式,同于服務端接受資料,即,将實體模型轉為Json字元串,再将字元串轉為位元組流寫入到伺服器中。
伺服器如果接受到資料,會對其中的參數,即,接口類,方法,方法參數,方法參數類型,進行處理,即對其進行代理,找到伺服器中具體的實作類,實作此方法。
當伺服器處理完畢,得到了傳回值,再将值轉為json字元串位元組,傳遞到用戶端。
channelRead0方法,就是用戶端接受資料之後的回調。
由于是異步進行處理,是以我們再其中同樣也進行溢出接口調用,在其他位置,隻要實作此接口,就可以擷取到伺服器傳輸的資料。
/**
* @author xuyuanpeng
* @version 1.0
* @date 2019-03-06 17:40
*/
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static java.util.logging.Logger logger = Logger.getLogger("EchoClientHandler");
private RPCNetty rpcNetty;
private INettyCallBack nettyCallBack;
......
/**
* 請求資料
*/
@Override
public void channelActive(ChannelHandlerContext context){
logger.info("client channelActive >>"+ JsonMapper.toJsonString(rpcNetty));
context.writeAndFlush(Unpooled.copiedBuffer(JsonMapper.toJsonString(rpcNetty),
CharsetUtil.UTF_8));
}
/**
* 伺服器傳回資料
*/
@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
logger.info("client channelRead0 >>"+byteBuf.toString(CharsetUtil.UTF_8));
nettyCallBack.calllBack(byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3、公共服務
(1)RPCNetty 通信實體
* @author xuyuanpeng
* @version 1.0
* @date 2019-04-08 18:57
*/
public class RPCNetty implements Serializable {
private String clzName;
private Class clz;
private String methodName;
private Class<?>[] paramTypes;
private Object[] arguments;
}
4、運作結果
(1)啟動服務端
四月 10, 2019 5:57:54 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerImpl start
資訊: start
(2)啟動用戶端
四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcclient.netty.RPCFactory$1 invoke
資訊: invoke
四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcclient.netty.EchoClientHandler channelActive
資訊: client channelActive >>{“clzName”:“com.xyp.iodemo.nettyrpccommon.api.IUserService”,“methodName”:“getUser”,“paramTypes”:[“java.lang.String”],“arguments”:[“1”]}
四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcclient.netty.EchoClientHandler channelRead0
資訊: client channelRead0 >>“I am Service, Hi Client 1 , nice to meet you ?”
四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcclient.Starter$1 calllBack
資訊: obj>"“I am Service, Hi Client 1 , nice to meet you ?”"
(3)伺服器響應
四月 10, 2019 5:57:54 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerImpl start
資訊: start
四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead
資訊: channelRead
四月 10, 2019 6:24:15 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead
資訊: read>>>>{“clzName”:“com.xyp.iodemo.nettyrpccommon.api.IUserService”,“methodName”:“getUser”,“paramTypes”:[“java.lang.String”],“arguments”:[“1”]}
四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead
資訊: resultStr>>>>“I am Service, Hi Client 1 , nice to meet you ?”
四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelRead
資訊: context.write>>>>“I am Service, Hi Client 1 , nice to meet you ?”
四月 10, 2019 6:24:16 下午 com.xyp.iodemo.nettyrpcserver.netty.EchoServerHandler channelReadComplete
資訊: channelReadComplete
四、總結
項目的git位址:https://gitee.com/xyp_YF/SNetttRpc.git
說明
Netty穩定且高效,易用,是進行NIO通信的不錯之選。
我們采用了Netty的通信方式,來處理RPC的底層的資料通信,這樣穩定且高效。
參考
《Netty權威指南》書籍
《與NIO的第一次親密接觸》 https://blog.csdn.net/m13797378901/article/details/86538744