RPC
Remote Procedure Call:遠端過程調用,通訊協定,允許一台計算機調用另一台計算機的程式
Client:服務消費者
Server:服務提供者
調用流程:
1、Client以本地調用方法調用服務
2、ClientStub将請求方法名、參數封裝起來
3、ClientStub通過編碼後發送
4、ServerStub接收到後通過編碼發送給Server調用API
5、Server傳回請求結果給ServerStub進行編碼發送
6、ClientStub接收到請求結果進行編碼後發送給Client
HelloService
/**
* 接口,服務提供方與服務消費方都需要實作
*/
public interface HelloService {
String hello(String msg);
}
HelloServerImpl
public class HelloServerImpl implements HelloService {
/**
* 當消費方調用該方法時,傳回一個結果
*/
private static int count = 0;
@Override
public String hello(String msg) {
System.out.println("收到用戶端消息="+msg);
//根據msg傳回不同結果
if (msg != null) {
return "你好用戶端,我收到了你發來的消息[" + msg + "] 第"+(++count)+"次調用";
} else {
return "你好用戶端,我收到了你發來的消息";
}
}
}
NettyServer
/**
* 編寫一個方法,完成對NettyServer初始化與啟動
*/
public class NettyServer {
/**
* 不把核心方法對外暴露,外包括一層,不能啟動方法傳遞不同參數
*/
public static void startServer(String hostname, int port) {
startServer0(hostname, port);
}
private static void startServer0(String hostname, int port) {
NioEventLoopGroup BossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(BossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(new NettyServerHandler());//自定義業務處理器
}
});
ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
System.out.println("服務提供方開始提供服務...");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
BossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerHandler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//擷取用戶端發送的消息,并調用我們的服務
System.out.println("msg=" + msg);
/**
* 用戶端在調用伺服器的api時,我們需要定義一個協定
* 如:
* 要求每次發消息時,必須以某個字元串開頭“Hello#Cyrus#”
*/
if (msg.toString().startsWith(ClientBootStrap.providerName)) {
String result = new HelloServerImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
NettyClient
public class NettyClient {
//建立一個線程池,大小為本機cpu核數
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler client; //自定義Handler
private static int count; //計數器
//建立一個方法使用代理模式,擷取一個代理對象
public Object getBean(final Class<?> serviceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass},((proxy, method, args) -> {
//{}中的代碼,用戶端每次調用hello方法都會進入該代碼中
System.out.println("(proxy, method, args)被進入...."+(++count));
if (client == null) {
initClient();
}
//設定要發給伺服器端的消息 args[0]:傳遞的參數
client.setPara(providerName + args[0]);
/**
* 将client(自定義Handler)交給線程池去執行
* 線程執行client擷取用戶端傳回的結果
*/
return executor.submit(client).get();
}));
}
//初始化用戶端
private static void initClient() {
client = new NettyClientHandler();
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(client);
}
});
bootstrap.connect("127.0.0.1", 9999).sync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;//上下文
private String result;//傳回的結果
private String para;//用戶端調用方法是,傳入的參數
//與伺服器建立連接配接後,就會被調用,該方法是第一個調用的(1)/一旦建立連接配接隻建立一次
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive被調用");
context = ctx; //在其他方法中需要使用ctx
}
//收到伺服器的資料後,調用方法(4)
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead被調用");
result = msg.toString();
//随機喚醒等待的線程
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
//被代理對象調用,真正發送資料給伺服器,->wait等待被喚醒,喚醒後傳回結果(3)(5)
@Override
public synchronized Object call() throws Exception {
System.out.println("call_1被調用");
context.writeAndFlush(para);
//進行wait
wait(); //等待channelRead()方法擷取到伺服器響應的結果後喚醒
System.out.println("call_2被調用");
return result; //服務方傳回的結果
}
//(2)
void setPara(String para) {
System.out.println("setPara");
this.para = para;
}
}
ServerBootStrap
/**
* ServerBootStrap會啟動一個服務提供者(NettyServer)
*/
public class ServerBootStrap {
public static void main(String[] args) {
//代碼代理
NettyServer.startServer("127.0.0.1", 9999);
}
}
ClientBootStrap
public class ClientBootStrap {
public static final String providerName = "Hello#Cyrus#";
public static void main(String[] args) throws InterruptedException {
//建立一個消費者
NettyClient customer = new NettyClient();
//建立代理對象
HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
//通過代理對象調用服務提供者的方法(服務API)
for (; ; ) {
Thread.sleep(5 * 1000);
String result = service.hello("你好 dubbo...");
System.out.println("調用的結果 result="+result);
}
}
}