天天看點

真的夠可以的,基于Netty實作了RPC架構

RPC全稱Remote Procedure Call,即遠端過程調用,對于調用者無感覺這是一個遠端調用功能。目前流行的開源RPC 架構有阿裡的Dubbo、Google 的 gRPC、Twitter 的Finagle 等。本次RPC架構的設計主要參考的是阿裡的Dubbo,這裡Netty 基本上是作為架構的技術底層而存在的,主要完成高性能的網絡通信,進而實作高效的遠端調用。

Dubbo的架構與Spring

其實在之前的文章中《談談京東的服務架構》,探讨過Dubbo的組成和架構。

真的夠可以的,基于Netty實作了RPC架構

編輯切換為居中

添加圖檔注釋,不超過 140 字(可選)

真的夠可以的,基于Netty實作了RPC架構

編輯切換為居中

添加圖檔注釋,不超過 140 字(可選)

另外使用Dubbo最友善的地方在于它可以和Spring非常友善的內建,Dubbo對于配置的優化也是随着Spring一脈相承的,從最早的XML形式到後來的注解方式以及自動裝配,都是在不斷地簡化開發過程來提高開發效率。

Dubbo在Spring架構中的工作流程:

1、Spring的IOC容器啟動

2、把服務注冊到注冊中心(zookeeper軟體)中

3、消費者啟動時會把它需要用到的服務從注冊中心拉取下來

4、提供者的位址發生改變時,注冊中心會馬上通知消費者

5、根據注冊中心中的服務位址直接就可以調用提供者了,如果調用了提供者,就會把提供者的位址主動緩存起來

6、監控消費者調用提供者的次數

RPC實作的關鍵

1、序列化與反序列化

在遠端過程調用時,用戶端跟服務端是不同的程序,甚至有時候用戶端用Java,服務端用C++。這時候就需要用戶端把參數先轉成一個位元組流,傳給服務端後,再把位元組流轉成自己能讀取的格式,這個過程叫序列化和反序列化,同理,從服務端傳回的值也需要序列化反序列化的過程。在序列化的時候,我們選擇Netty自身的對象序列化器。

2、資料網絡傳輸

解決了序列化的問題,那麼剩下的就是如何把資料參數傳到生産者,網絡傳輸層需要把序列化後的參數位元組流傳給服務端,然後再把序列化後的調用結果傳回用戶端,雖然大部分RPC架構都采用了TCP作為傳輸協定,其實UDP也可以作為傳輸協定的,基于TCP和UDP我們可以自定義任意規則的協定,加之我們要使用NIO通信方式作為高性能網絡服務的前提,于是Netty似乎更符合我們Java程式員的口味,Netty真香!

3、告訴注冊中心我要調誰

現在調用參數的序列化和網絡傳輸都已經具備,但是還有個問題,那就是消費者要調用誰的問題,一個函數或者方法,我們可以了解為一個服務,這些服務注冊在注冊中心上面,隻有當消費者告訴注冊中心要調用誰,才可以進行遠端調用。是以不但要把将要調用的服務的參數傳過去,也要把要調用的服務資訊傳過去。

簡易RPC架構的架構

真的夠可以的,基于Netty實作了RPC架構

編輯切換為居中

添加圖檔注釋,不超過 140 字(可選)

Dubbo 核心子產品主要有四個:Registry 注冊中心、Provider 服務提供者、Consumer 服務消費者、Monitor監控,為了友善直接砍掉了監控子產品,同時把服務提供者子產品與注冊中心子產品寫在一起,通過實作自己的簡易IOC容器,完成對服務提供者的執行個體化。

關于使用Netty進行Socket程式設計的部分可以參考Netty的官網 或者我之前的部落格《Netty編碼實戰與Channel生命周期》,在這裡Netty的編碼技巧和方式不作為本文的重點。

RPC架構編碼實作

首先需要引入的依賴如下(Netty + Lombok):

<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.8</version> </dependency>

1、Registry與Provider

目錄結構如下:

───src └─main ├─java │ └─edu │ └─xpu │ └─rpc │ ├─api │ │ IRpcCalc.java │ │ IRpcHello.java │ │ │ ├─core │ │ InvokerMessage.java │ │ │ ├─provider │ │ RpcCalcProvider.java │ │ RpcHelloProvider.java │ │ │ └─registry │ MyRegistryHandler.java │ RpcRegistry.java │ └─resources ───pom.xml

IRpcCalc.java與IRpcHello.java是兩個Service接口。IRpcCalc.java内容如下,完成模拟業務加、減、乘、除運算

public interface IRpcCalc { // 加 int add(int a, int b); // 減 int sub(int a, int b); // 乘 int mul(int a, int b); // 除 int div(int a, int b); }

IRpcHello.java,測試服務是否可用:

public interface IRpcHello { String hello(String name); }

至此API 子產品就定義完成了,非常簡單的兩個接口。接下來,我們要确定傳輸規則,也就是傳輸協定,協定内容當然要自定義,才能展現出Netty 的優勢。

設計一個InvokerMessage類,裡面包含了服務名稱、調用方法、參數清單、參數值,這就是我們自定義協定的協定包:

@Data public class InvokerMessage implements Serializable { private String className; // 服務名稱 private String methodName; // 調用哪個方法 private Class<?>[] params; // 參數清單 private Object[] values; // 參數值 }

通過定義這樣的協定類,就能知道我們需要調用哪個服務,服務中的哪個方法,方法需要傳遞的參數清單(參數類型+參數值),這些資訊正确傳遞過去了才能拿到正确的調用傳回值。

接下來建立這兩個服務的具體實作類,IRpcHello的實作類如下:

public class RpcHelloProvider implements IRpcHello { public String hello(String name) { return "Hello, " + name + "!"; } }

IRpcCalc的實作類如下:

public class RpcCalcProvider implements IRpcCalc { @Override public int add(int a, int b) { return a + b; } @Override public int sub(int a, int b) { return a - b; } @Override public int mul(int a, int b) { return a * b; } @Override public int div(int a, int b) { return a / b; } }

Registry 注冊中心主要功能就是負責将所有Provider的服務名稱和服務引用位址注冊到一個容器中(這裡為了友善直接使用接口類名作為服務名稱,前提是假定我們每個服務隻有一個實作類),并對外釋出。Registry 應該要啟動一個對外的服務,很顯然應該作為服務端,并提供一個對外可以通路的端口。先啟動一個Netty服務,建立RpcRegistry 類,RpcRegistry.java的具體代碼如下:

public class RpcRegistry { private final int port; public RpcRegistry(int port){ this.port = port; } public void start(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 處理拆包、粘包的編解碼器 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); // 處理序列化的編解碼器 pipeline.addLast("encoder", new ObjectEncoder()); pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); // 自己的業務邏輯 pipeline.addLast(new MyRegistryHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); // 設定長連接配接 ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync(); System.out.println("RPC Registry start listen at " + this.port); channelFuture.channel().closeFuture().sync(); } catch (Exception e){ e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { new RpcRegistry(8080).start(); } }

接下來隻需要實作我們自己的Handler即可,建立MyRegistryHandler.java,内容如下:

public class MyRegistryHandler extends ChannelInboundHandlerAdapter { // 在注冊中心注冊服務需要有容器存放 public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<>(); // 類名的緩存位置 private static final List<String> classCache = new ArrayList<>(); // 約定,隻要是寫在provider下所有的類都認為是一個可以對完提供服務的實作類 // edu.xpu.rpc.provider public MyRegistryHandler(){ scanClass("edu.xpu.rpc.provider"); doRegister(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Object result = new Object(); // 用戶端傳過來的調用資訊 InvokerMessage request = (InvokerMessage)msg; // 先判斷有沒有這個服務 String serverClassName = request.getClassName(); if(registryMap.containsKey(serverClassName)){ // 擷取服務對象 Object clazz = registryMap.get(serverClassName); Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParams()); result = method.invoke(clazz, request.getValues()); System.out.println("request=" + request); System.out.println("result=" + result); } ctx.writeAndFlush(result); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } // 實作簡易IOC容器 // 掃描出包裡面所有的Class private void scanClass(String packageName){ ClassLoader classLoader = this.getClass().getClassLoader(); URL url = classLoader.getResource(packageName.replaceAll("\\.", "/")); File dir = new File(url.getFile()); File[] files = dir.listFiles(); for (File file: files){ if(file.isDirectory()){ scanClass(packageName + "." + file.getName()); }else{ // 拿出類名 String className = packageName + "." + file.getName().replace(".class", "").trim(); classCache.add(className); } } } // 把掃描到的Class執行個體化,放到Map中 // 注冊的服務名稱就叫做接口的名字 [約定優于配置] private void doRegister(){ if(classCache.size() == 0) return; for (String className: classCache){ try { Class<?> clazz = Class.forName(className); // 服務名稱 Class<?> anInterface = clazz.getInterfaces()[0]; registryMap.put(anInterface.getName(), clazz.newInstance()); } catch (Exception e) { e.printStackTrace(); } } } }

在這裡還通過反射實作了簡易的IOC容器,先遞歸掃描provider包底下的類,把這些類的對象作為服務對象放到IOC容器中進行管理,由于IOC是一個Map實作的,是以将類名作為服務名稱,也就是Key,服務對象作為Value。根據消費者傳過來的服務名稱,就可以找到對應的服務,到此,Registry和Provider已經全部寫完了。

2、consumer

目錄結構如下:

└─src ├─main │ ├─java │ │ └─edu │ │ └─xpu │ │ └─rpc │ │ ├─api │ │ │ IRpcCalc.java │ │ │ IRpcHello.java │ │ │ │ │ ├─consumer │ │ │ │ RpcConsumer.java │ │ │ │ │ │ │ └─proxy │ │ │ RpcProxy.java │ │ │ RpcProxyHandler.java │ │ │ │ │ └─core │ │ InvokerMessage.java │ │ │ └─resources └─test └─java └─ pom.xml

在看用戶端的實作之前,先梳理一下RPC流程。API 子產品中的接口隻在服務端實作了。是以,用戶端調用API 中定義的某一個接口方法時,實際上是要發起一次網絡請求去調用服務端的某一個服務。而這個網絡請求首先被注冊中心接收,由注冊中心先确定需要調用的服務的位置,再将請求轉發至真實的服務實作,最終調用服務端代碼,将傳回值通過網絡傳輸給用戶端。整個過程對于用戶端而言是完全無感覺的,就像調用本地方法一樣,是以必定要對用戶端的API接口做代理,隐藏網絡請求的細節。

真的夠可以的,基于Netty實作了RPC架構

編輯切換為居中

添加圖檔注釋,不超過 140 字(可選)

由上圖的流程圖可知,要讓使用者調用無感覺,必須建立出代理類來完成網絡請求的操作。

RpcProxy.java如下:

public class RpcProxy { public static <T> T create(Class<?> clazz) { //clazz傳進來本身就是interface MethodProxy proxy = new MethodProxy(clazz); T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz} , proxy); return result; } private static class MethodProxy implements InvocationHandler { private Class<?> clazz; public MethodProxy(Class<?> clazz) { this.clazz = clazz; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 如果傳進來是一個已實作的具體類 if (Object.class.equals(method.getDeclaringClass())) { try { return method.invoke(this, args); } catch (Throwable t) { t.printStackTrace(); } // 如果傳進來的是一個接口(核心) } else { return rpcInvoke(method, args); } return null; } // 實作接口的核心方法 public Object rpcInvoke(Method method, Object[] args) { // 傳輸協定封裝 InvokerMessage invokerMessage = new InvokerMessage(); invokerMessage.setClassName(this.clazz.getName()); invokerMessage.setMethodName(method.getName()); invokerMessage.setValues(args); invokerMessage.setParams(method.getParameterTypes()); final RpcProxyHandler consumerHandler = new RpcProxyHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); //自定義協定編碼器 pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); //對象參數類型編碼器 pipeline.addLast("encoder", new ObjectEncoder()); //對象參數類型解碼器 pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast("handler", consumerHandler); } }); ChannelFuture future = bootstrap.connect("localhost", 8080).sync(); future.channel().writeAndFlush(invokerMessage).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } return consumerHandler.getResponse(); } } }

我們通過傳進來的接口對象,獲得了要調用的服務名,服務方法名,參數類型清單,參數清單,這樣就把自定義的RPC協定包封裝好了,隻需要把協定包發出去等待結果傳回即可,是以為了接收傳回值資料還需要自定義一個接收用的Handler,RpcProxyHandlerdiamante如下:

public class RpcProxyHandler extends ChannelInboundHandlerAdapter { private Object result; public Object getResponse() { return result; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exception is general"); } }

這樣就算是完成了整個流程,下面開始測試一下吧,測試的RpcConsumer.java代碼如下:

public class RpcConsumer { public static void main(String[] args) { // 本機之間的正常調用 // IRpcHello iRpcHello = new RpcHelloProvider(); // iRpcHello.hello("Tom"); // 肯定是用動态代理來實作的 // 傳給它接口,傳回一個接口的執行個體,僞代理 IRpcHello rpcHello = RpcProxy.create(IRpcHello.class); System.out.println(rpcHello.hello("ZouChangLin")); int a = 10; int b = 5; IRpcCalc iRpcCalc = RpcProxy.create(IRpcCalc.class); System.out.println(String.format("%d + %d = %d", a, b, iRpcCalc.add(a, b))); System.out.println(String.format("%d - %d = %d ", a, b, iRpcCalc.sub(a, b))); System.out.println(String.format("%d * %d = %d", a, b, iRpcCalc.mul(a, b))); System.out.println(String.format("%d / %d = %d", a, b, iRpcCalc.div(a, b))); } }

3、效果測試

先開啟Registry,運作端口是8080:

真的夠可以的,基于Netty實作了RPC架構

編輯切換為居中

添加圖檔注釋,不超過 140 字(可選)

開啟consumer開始調用

真的夠可以的,基于Netty實作了RPC架構

編輯切換為居中

添加圖檔注釋,不超過 140 字(可選)

調用完成後可以看到調用結果正确,并且在Registry這邊也看到了日志:

真的夠可以的,基于Netty實作了RPC架構

編輯切換為居中

添加圖檔注釋,不超過 140 字(可選)

可以發現,簡易RPC架構順利完工!

                                   資源擷取:

大家點贊、收藏、關注、評論啦 、檢視👇🏻👇🏻👇🏻微信公衆号擷取聯系方式👇🏻👇🏻👇🏻

 精彩專欄推薦訂閱:在下方專欄👇🏻👇🏻👇🏻👇🏻

每天學四小時:Java+Spring+JVM+分布式高并發,架構師指日可待