目标
在前面的章節中,我們已經實作了自定義注解的以及掃描解析自定義注解的功能。作為一個服務提供者來說。首先就需要能夠接收到服務消費者發送過來的請求,是以這一章我們來實作一下服務提供者收發消息的基礎功能
Netty服務端功能實作
RPC請求都是通過網絡傳輸,現在比較流行像Mina,還有Netty,我們使用Netty來實作網絡請求的接收和發送
建立Netty服務啟動與關閉的通用接口 com.xpc.rpc.provider.server.api.Server
csharp複制代碼package com.xpc.rpc.provider.server.api;
public interface Server {
/**
* 啟動Netty服務
*/
void startNettyServer();
/**
* 停止Netty服務
*/
void shutDown();
}
建立Netty服務端具體實作類 com.xpc.rpc.provider.server.base.BaseServer
java複制代碼package com.xpc.rpc.provider.server.base;
import com.xpc.rpc.common.utils.RemotingUtil;
import com.xpc.rpc.provider.handler.RpcProviderHandler;
import com.xpc.rpc.provider.server.api.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BaseServer implements Server {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseServer.class);
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap bootstrap;
@Override
public void startNettyServer() {
/**
* 性能優化
*/
if(useEpoll()) {
bossGroup = new EpollEventLoopGroup();
workerGroup = new EpollEventLoopGroup();
}else {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
}
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//Netty自帶的String類型編解碼器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//我們自己實作的處理器
pipeline.addLast(new RpcProviderHandler());
}
});
try {
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 21778).sync();
LOGGER.info("Netty 服務端啟動成功............");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.error("Netty 服務端啟動失敗:{}",e);
}finally {
shutDown();
}
}
@Override
public void shutDown() {
if(bossGroup != null) {
bossGroup.shutdownGracefully();
}
if(workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
}
}
接下來就是服務端我們自己定義的處理器 com.xpc.rpc.provider.handler.RpcProviderHandler,這裡就是後續服務提供者接收到請求之後具體的處理類,所有的業務邏輯都會在這裡實作
scala複制代碼package com.xpc.rpc.provider.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcProviderHandler extends SimpleChannelInboundHandler<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
LOGGER.info("服務端收到消息:{}",msg);
//這裡再寫回一條消息給用戶端
ctx.writeAndFlush("你好,我是服務端........");
}
}
至此服務端的代碼就實作完了,接下來就是服務系消費者發送消息的基礎功能實作,建立一個子工程 xpc-rpc-consumer,pom.xml檔案如下
xml複制代碼<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xpc-rpc</artifactId>
<groupId>com.xpc</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>xpc-rpc-consumer</artifactId>
</project>
建立啟動類 com.xpc.rpc.consumer.RpcConsumer
java複制代碼package com.xpc.rpc.consumer;
import com.xpc.rpc.consumer.handler.RpcConsumerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumer.class);
private Bootstrap bootstrap;
private EventLoopGroup eventLoopGroup;
public RpcConsumer() {
}
/**
* 啟動服務消費者
*/
public void start() {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//String類型的編解碼器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//自定義處理器
pipeline.addLast(new RpcConsumerHandler());
}
});
try {
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 21778).sync();
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if(future.isSuccess()) {
LOGGER.info("用戶端連接配接成功........");
}else {
LOGGER.info("用戶端連接配接失敗........");
}
}
});
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
建立自定義處理器 com.xpc.rpc.consumer.handler.RpcConsumerHandler
scala複制代碼package com.xpc.rpc.consumer.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcConsumerHandler extends SimpleChannelInboundHandler<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("開始發送消息..........");
String msg = "hello rpc";
ctx.writeAndFlush(msg);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
LOGGER.info("這是服務端發送過來的消息: {}",s);
}
}
到此服務提供者收發消息的基礎功能已經實作
測試
寫好的功能怎麼能不測試呢
在 xpc-rpc-test 下建立一個服務提供者的測試類 com.xpc.test.netty.ProviderTest
java複制代碼package com.xpc.test.netty;
import com.xpc.rpc.provider.server.base.BaseServer;
import org.junit.Test;
public class ProviderTest {
@Test
public void startNetty() {
BaseServer baseServer = new BaseServer();
baseServer.startNettyServer();
}
}
在 xpc-rpc-test 下建立一個服務消費者的測試類 com.xpc.test.netty.ConsumerTest
java複制代碼package com.xpc.test.netty;
import com.xpc.rpc.consumer.RpcConsumer;
import org.junit.Test;
public class ConsumerTest {
@Test
public void startConsumer() {
RpcConsumer rpcConsumer = new RpcConsumer();
rpcConsumer.start();
}
}
先啟動服務提供者 ProviderTest,然後再啟動服務消費者
服務提供者日志:
服務消費者日志: