天天看點

十六.Netty實作RPC架構

1.RPC是什麼?

RPC,遠端過程調用,可以做到像本地調用一樣調用遠端服務,是一種程序間的通信方式,概念想必大家都很清楚,可以換一種思考方式去了解RPC,也就是從本地調用出發,進而去推導RPC調用

十六.Netty實作RPC架構

1.本地函數調用

本地函數是我們經常碰到的,比如下面示例:

public String sayHello(String name) {
    return "hello, " + name;
}
           

我們隻需要傳入一個參數,調用sayHello方法就可以得到一個輸出,也就是輸入參數——>方法體——>輸出,入參、出參以及方法體都在同一個程序空間中,這就是本地函數調用

2. Socket通信

那有沒有辦法實作不同程序之間通信呢?調用方在程序A,需要調用方法A,但是方法A在程序B中

十六.Netty實作RPC架構

最容易想到的方式就是使用Socket通信,使用Socket可以完成跨程序調用,我們需要約定一個程序通信協定,來進行傳參,調用函數,出參。寫過Socket應該都知道,Socket是比較原始的方式,我們需要更多的去關注一些細節問題,比如參數和函數需要轉換成位元組流進行網絡傳輸,也就是序列化操作,然後出參時需要反序列化;使用socket進行底層通訊,代碼程式設計也比較容易出錯。

如果一個調用方需要關注這麼多問題,那無疑是個災難,是以有沒有什麼簡單方法,讓我們的調用方不需要關注細節問題,讓調用方像調用本地函數一樣,隻要傳入參數,調用方法,然後坐等傳回結果就可以了呢?

3. RPC架構

RPC架構就是用來解決上面的問題的,它能夠讓調用方像調用本地函數一樣調用遠端服務,底層通訊細節對調用方是透明的,将各種複雜性都給屏蔽掉,給予調用方極緻體驗。

十六.Netty實作RPC架構

RPC 核心功能

前面說了這麼多,再次總結下一個RPC架構需要重點關注哪幾個點:

  • 代理 (動态代理)
  • 通訊協定
  • 序列化
  • 網絡傳輸
十六.Netty實作RPC架構

一個 RPC 的核心功能主要有 5 個部分組成,分别是:用戶端、用戶端 Stub、網絡傳輸子產品、服務端 Stub、服務端等

十六.Netty實作RPC架構

PRC調用流程說明

  1. 服務消費方(client)以本地調用方式調用服務
  2. client stub 接收到調用後負責将方法、參數等封裝成能夠進行網絡傳輸的消息體
  3. client stub 将消息進行編碼并發送到服務端
  4. server stub 收到消息後進行解碼
  5. server stub根據解碼結果調用本地的服務
  6. 本地服務執行并将結果傳回給 server stub
  7. server stub将傳回導入結果進行編碼并發送至消費方
  8. client stub 接收到消息并進行解碼
  9. 服務消費方(client)得到結果

2.基于Netty實作dubbo RPC

需求說明

  1. dubbo 底層使用了 Netty 作為網絡通訊架構,要求用 Netty 實作一個簡單的 RPC 架構
  2. 模仿dubbo,消費者和提供者約定接口和協定,消費者遠端調用提供者的服務,提供者傳回一個字元串,消費者列印提供者傳回的資料。底層網絡通信使用Netty 4.1.20

公共接口

HelloService

public interface HelloService {

    String hello(String msg);
}
           

HelloServiceImpl

public class HelloServiceImpl implements HelloService {

  //當有消費方調用該方法時, 就傳回一個結果
  @Override
  public String hello(String msg) {
    System.out.println("收到用戶端消息=" + msg);
    //根據mes 傳回不同的結果
    if (msg!= null) {
      return "你好用戶端, 我已經收到你的消息 [" + msg+ "]";
    } else {
      return "你好用戶端, 我已經收到你的消息 ";
    }
  }
}
           

服務端

NettyServer

public class NettyServer {

  public static void startServer(String hostName, int port) {
    startServer0(hostName, port);
  }

  //編寫一個方法,完成對NettyServer的初始化和啟動

  private static void startServer0(String hostname, int port) {

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {

      ServerBootstrap serverBootstrap = new ServerBootstrap();

      serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
          .childHandler(new ChannelInitializer<SocketChannel>() {
                          @Override
                          protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyServerHandler()); //業務處理器

                          }
                        }

          );

      ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
      System.out.println("服務提供方開始提供服務~~");
      channelFuture.channel().closeFuture().sync();

    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }

  }
}
           

NettyServerHandler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //擷取用戶端發送的消息,并調用服務
    System.out.println("msg=" + msg);
    //用戶端在調用伺服器的api 時,我們需要定義一個協定
    //比如我們要求 每次發消息是都必須以某個字元串開頭 "HelloService#hello#你好"
    if (msg.toString().startsWith(ClientBootstrap.providerName)) {

      String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
      ctx.writeAndFlush(result);
    }
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
  }
}
           

ServerBootstrap

//ServerBootstrap 會啟動一個服務提供者,就是 NettyServer
public class ServerBootstrap {
  public static void main(String[] args) {

    NettyServer.startServer("127.0.0.1", 7000);
  }
           

用戶端

NettyClient

public class NettyClient {

  //建立線程池
  private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

  private static NettyClientHandler client;

  //編寫方法使用代理模式,擷取一個代理對象

  public Object getBean(final Class<?> serviceClass, final String providerName) {

    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[] { serviceClass },
        new InvocationHandler() {
          @Override
          public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //{}  部分的代碼,用戶端每調用一次 hello, 就會進入到該代碼
            if (client == null) {
              initClient();
            }

            //設定要發給伺服器端的資訊
            //providerName 協定頭 args[0] 就是用戶端調用api hello(???), 參數
            client.setPara(providerName + args[0]);

            //
            return executor.submit(client).get();

          }
        });
  }

  //初始化用戶端
  private static void initClient() {
    client = new NettyClientHandler();
    //建立EventLoopGroup
    NioEventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
        .handler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new StringDecoder());
            pipeline.addLast(new StringEncoder());
            pipeline.addLast(client);
          }
        });

    try {
      bootstrap.connect("127.0.0.1", 7000).sync();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}
           

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;//上下文
    private String result; //傳回的結果
    private String para; //用戶端調用方法時,傳入的參數


    //與伺服器的連接配接建立後,就會被調用, 這個方法是第一個被調用(1)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive 被調用  ");
        context = ctx; //因為我們在其它方法會使用到 ctx
    }

    //收到伺服器的資料後,調用方法 (4)
    //
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被調用  ");
        result = msg.toString();
        notify(); //喚醒等待的線程
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //被代理對象調用, 發送資料給伺服器,-> wait -> 等待被喚醒(channelRead) -> 傳回結果 (3)-》5
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 被調用  ");
        context.writeAndFlush(para);
        //進行wait
        wait(); //等待channelRead 方法擷取到伺服器的結果後,喚醒
        System.out.println(" call2 被調用  ");
        return  result; //服務方傳回的結果

    }
    //(2)
    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}
           

ClientBootstrap

public class ClientBootstrap {

  //這裡定義協定頭
  public static final String providerName = "HelloService#hello#";

  public static void main(String[] args) throws Exception {

    //建立一個消費者
    NettyClient customer = new NettyClient();

    //建立代理對象
    HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

    for (; ; ) {
      Thread.sleep(2 * 1000);
      //通過代理對象調用服務提供者的方法(服務)
      String res = service.hello("你好 dubbo~");
      System.out.println("調用的結果 res= " + res);
    }
  }
}