代碼位址如下:
http://www.demodashi.com/demo/13448.html
可以給你提供思路
也可以讓你學到Netty相關的知識
當然,這隻是一種實作方式
需求
看下圖,其實這個項目就是為了做這樣一件事。
有一個公共服務ServerA,它提供了一個名為getUserName的服務。
現在有多個類似ServerB的Web應用伺服器。
當客戶想通過ServerB要請求getUserName服務時,由于ServerB服務中因為沒有UserService的實作類,導緻不能正常提供服務的問題。

預期結果
可以看到,在Client項目中,UserService沒有實作類,但是傳回了正常的結果。
項目結構
整個項目分為三個部分,Server端、Client端以及一個公共jar。
下圖正是整個項目的目錄結構圖。
公共部分
公共部分的存在是因為我将伺服器端和用戶端寫在了一個項目中,為了不讓代碼重複警告,是以提出來的一個公共子產品。主要是幾個實體類和一些序列化工具類。
- MsgPackDecoder
- 該類是一個MsgPack編碼器,主要作用将Object對象序列化成byte數組。
- MsgPackEncoder
- 該類是一個MsgPack解碼器,主要作用将byte數組反序列化成Object對象。
- ObjectCodec
- 該類是繼承了MessageToMessageCodec<ByteBuf, Object>。是自定義序列化類主要有兩個方法,encode和decode
@Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) { // 調用工具類的序列化方法 byte[] data = ObjectSerializerUtils.serilizer(msg); ByteBuf buf = Unpooled.buffer(); buf.writeBytes(data); out.add(buf); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) { // 調用工具類的反序列化方法 byte[] bytes = new byte[msg.readableBytes()]; msg.readBytes(bytes); Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes); out.add(deSerilizer); }
- 該類是繼承了MessageToMessageCodec<ByteBuf, Object>。是自定義序列化類主要有兩個方法,encode和decode
- ObjectSerializerUtils
- 序列化工具類。序列化方法可以自己實作
- MethodInvokeMeta
- 重點類。這個類是一個實體類。用于在網絡中傳輸的類。主要有5個字段分别記錄了一個接口的類對象,調用接口的方法名,方法的參數清單(包含參數類型,和參數清單),方法的傳回值類型。
- 在用戶端中,這個類将調用方所要調用的方法封裝。
- 在服務端中,這個類主要用于伺服器反射調用方法。
- 當然,也可以用String來記錄這些元資訊。
- NullWritable
- 這個類主要用于在網絡中傳輸null。當傳回值為null時,服務端會傳回NullWritable對象。用戶端接收到NullWritable時進行null處理。
- User
- 實體對象。測試用例
用戶端
上面的目錄結構圖也有提到,用戶端中隻有UserService接口,是以用戶端中如果不做處理是不能正常運作的。
用戶端中核心類有以下7個,其中與Netty相關的核心類與服務端一樣有3個
- ClientChannelHandlerAdapter
- CustomChannelInitializerClient
- NettyClient
- RpcProxyFactoryBean
- NettyBeanScanner
- PackageClassUtils
- WrapMethodUtils
NettyClient
這個類是Netty用戶端的啟動類,這個類中與Netty服務端進行通信
CustomChannelInitializerClient
這個類是用于初始化管道事件的類。主要添加了TCP粘包問題解決方案和自定義編解碼器工具
ClientChannelHandlerAdapter
這個類是Netty用戶端的處理類,主要通過這個類将調用資訊寫給Netty服務端。
NettyBeanScanner
這個類實作了BeanFactoryPostProcessor接口。BeanFactoryPostProcessor是Spring初始化Bean時對外暴露的擴充點。
Spring IoC容器允許BeanFactoryPostProcessor在容器執行個體化任何bean之前讀取bean的定義(配置中繼資料),并可以修改它。同時可以定義多個BeanFactoryPostProcessor,通過設定'order'屬性來确定各個BeanFactoryPostProcessor執行順序。
在postProcessBeanFactory方法中,調用PackageClassUtils.resolver方法,将UserService.class注冊到SpringBean工廠。
/**
* 注冊Bean到Spring的bean工廠
*/
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
this.beanFactory = (DefaultListableBeanFactory) beanFactory;
// 加載遠端服務的接口
List<String> resolverClass = PackageClassUtils.resolver(basePackage);
for (String clazz : resolverClass) {
String simpleName;
if (clazz.lastIndexOf('.') != -1) {
simpleName = clazz.substring(clazz.lastIndexOf('.') + 1);
} else {
simpleName = clazz;
}
BeanDefinitionBuilder gd = BeanDefinitionBuilder.genericBeanDefinition(RpcProxyFactoryBean.class);
gd.addPropertyValue("interfaceClass", clazz);
gd.addPropertyReference("nettyClient", clientName);
this.beanFactory.registerBeanDefinition(simpleName, gd.getRawBeanDefinition());
}
}
RpcProxyFactoryBean
重點類,這個類繼承了AbstractFactoryBean
使用jdk動态代理的方式建立代理對象。
在invoke方法中,
調用WrapMethodUtils工具類中的方法,将代理對象方法封裝成MethodInvokeMeta對象。
然後通過NettyClient傳輸給NettyServer端,進行RPC調用,并将結果傳回。
至此,用戶端的核心類介紹完畢。
服務端
服務端主要的核心類有三個
- ServerChannelHandlerAdapter
- RequestDispatcher
- NettyServer
NettyServer
這個類主要有兩個方法,一個是啟動Netty服務的方法,一個是關閉伺服器的方法。核心代碼如下:
/**
* 啟動netty服務的方法
*/
public void start() {
// 伺服器監聽端口号
int port = serverConfig.getPort();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// BACKLOG用于構造服務端套接字ServerSocket對象,
// 辨別當伺服器請求處理線程全滿時,用于臨時存放已完成三次握手的請求的隊列的最大長度。
// 如果未設定或所設定的值小于1,Java将使用預設值50
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO));
try {
// 設定事件處理
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 自定義長度解碼器解決TCP黏包問題
// maxFrameLength 最大包位元組大小,超出報異常
// lengthFieldOffset 長度字段的偏差
// lengthFieldLength 長度字段占的位元組數
// lengthAdjustment 添加到長度字段的補償值
// initialBytesToStrip 從解碼幀中第一次去除的位元組數
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535
, 0, 2, 0, 2));
// LengthFieldPrepender編碼器,它可以計算目前待發送消息的二進制位元組長度,将該長度添加到ByteBuf的緩沖區頭中
pipeline.addLast(new LengthFieldPrepender(2));
// 序列化工具
pipeline.addLast(new ObjectCodec());
pipeline.addLast(channelHandlerAdapter);
}
});
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
ServerChannelHandlerAdapter
這個類主要重寫了ChannelRead方法,在ChannelRead方法中調用了RequestDispatcher類中的dispatcher方法來處理消息。
RequestDispatcher
這個類的作用為,将ChannelRead方法中讀到的資料(也可以說指令),通過反射來調用,執行對應方法,将執行後的結果寫回通道,供用戶端使用。
/**
* 分發指令
*
* @param channelHandlerContext channelHandlerContext
* @param invokeMeta invokeMeta
*/
public void dispatcher(final ChannelHandlerContext channelHandlerContext, final MethodInvokeMeta invokeMeta) {
ChannelFuture f = null;
try {
// 擷取用戶端準備調用的接口類
Class<?> interfaceClass = invokeMeta.getInterfaceClass();
// 擷取準備調用的方法名稱
String name = invokeMeta.getMethodName();
// 擷取方法對應的參數清單
Object[] args = invokeMeta.getArgs();
// 擷取參數類型
Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
// 通過Spring擷取對象
Object targetObject = app.getBean(interfaceClass);
// 擷取待調用方法
Method method = targetObject.getClass().getMethod(name, parameterTypes);
// 執行方法
Object obj = method.invoke(targetObject, args);
if (obj == null) {
// 如果方法結果為空,将NULL結果寫給用戶端
f = channelHandlerContext.writeAndFlush(NullWritable.nullWritable());
} else {
// 寫給用戶端結果
f = channelHandlerContext.writeAndFlush(obj);
}
// 釋放通道,不是關閉連接配接
f.addListener(ChannelFutureListener.CLOSE);
} catch (Exception e) {
// 出現異常後的處理
f = channelHandlerContext.writeAndFlush(e.getMessage());
} finally {
if (f != null) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
使用方法
- 啟動ServerApplication
- 啟動ClientApplication
- 打開Chrome浏覽器,輸入:http://localhost:8080/client/get/name 或者 http://localhost:8080/client/get/info 即可看到結果。
如果想整合到現有項目中,請直接留言或者聯系作者,此次并沒有提供內建版本,但如果此篇文章已了解,那麼自己可以手動的去內建到自己的項目中。基于Netty的RPC簡易實作
代碼位址如下:
http://www.demodashi.com/demo/13448.html
注:本文著作權歸作者,由demo大師代發,拒絕轉載,轉載需要作者授權