天天看點

手撸RPC架構 - 服務提供者收發消息基礎實作

作者:馬士兵教育CTO
手撸RPC架構 - 服務提供者收發消息基礎實作

目标

在前面的章節中,我們已經實作了自定義注解的以及掃描解析自定義注解的功能。作為一個服務提供者來說。首先就需要能夠接收到服務消費者發送過來的請求,是以這一章我們來實作一下服務提供者收發消息的基礎功能

手撸RPC架構 - 服務提供者收發消息基礎實作

Netty服務端功能實作

RPC請求都是通過網絡傳輸,現在比較流行像Mina,還有Netty,我們使用Netty來實作網絡請求的接收和發送

建立Netty服務啟動與關閉的通用接口 com.xpc.rpc.provider.server.api.Server

csharp複制代碼package com.xpc.rpc.provider.server.api;

public interface Server {

    /**
     * 啟動Netty服務
     */
    void startNettyServer();

    /**
     * 停止Netty服務
     */
    void shutDown();
}
           

建立Netty服務端具體實作類 com.xpc.rpc.provider.server.base.BaseServer

java複制代碼package com.xpc.rpc.provider.server.base;

import com.xpc.rpc.common.utils.RemotingUtil;
import com.xpc.rpc.provider.handler.RpcProviderHandler;
import com.xpc.rpc.provider.server.api.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseServer implements Server {

    private static final Logger LOGGER = LoggerFactory.getLogger(BaseServer.class);

    private EventLoopGroup bossGroup;

    private EventLoopGroup workerGroup;

    private ServerBootstrap bootstrap;

    @Override
    public void startNettyServer() {
        /**
         * 性能優化
         */
        if(useEpoll()) {
            bossGroup = new EpollEventLoopGroup();
            workerGroup = new EpollEventLoopGroup();
        }else {
            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup();
        }
        bootstrap = new ServerBootstrap();

        bootstrap.group(bossGroup,workerGroup)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,128)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        //Netty自帶的String類型編解碼器
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        //我們自己實作的處理器
                        pipeline.addLast(new RpcProviderHandler());
                    }
                });

        try {
            ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 21778).sync();
            LOGGER.info("Netty 服務端啟動成功............");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            LOGGER.error("Netty 服務端啟動失敗:{}",e);
        }finally {
            shutDown();
        }
    }

    @Override
    public void shutDown() {
        if(bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        if(workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }

    private boolean useEpoll() {
        return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
    }
}
           

接下來就是服務端我們自己定義的處理器 com.xpc.rpc.provider.handler.RpcProviderHandler,這裡就是後續服務提供者接收到請求之後具體的處理類,所有的業務邏輯都會在這裡實作

scala複制代碼package com.xpc.rpc.provider.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcProviderHandler extends SimpleChannelInboundHandler<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        LOGGER.info("服務端收到消息:{}",msg);
        //這裡再寫回一條消息給用戶端
        ctx.writeAndFlush("你好,我是服務端........");
    }
}
           

至此服務端的代碼就實作完了,接下來就是服務系消費者發送消息的基礎功能實作,建立一個子工程 xpc-rpc-consumer,pom.xml檔案如下

xml複制代碼<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>xpc-rpc</artifactId>
        <groupId>com.xpc</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>xpc-rpc-consumer</artifactId>


</project>
           

建立啟動類 com.xpc.rpc.consumer.RpcConsumer

java複制代碼package com.xpc.rpc.consumer;

import com.xpc.rpc.consumer.handler.RpcConsumerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class RpcConsumer {

     private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumer.class);

     private Bootstrap bootstrap;

     private EventLoopGroup eventLoopGroup;


     public RpcConsumer() {

     }

    /**
     * 啟動服務消費者
     */
    public void start() {
         eventLoopGroup = new NioEventLoopGroup();
         bootstrap = new Bootstrap();

         bootstrap.group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel(SocketChannel channel) throws Exception {
                         ChannelPipeline pipeline = channel.pipeline();
                         //String類型的編解碼器
                         pipeline.addLast(new StringDecoder());
                         pipeline.addLast(new StringEncoder());
                         //自定義處理器
                         pipeline.addLast(new RpcConsumerHandler());
                     }
                 });
         try {
             ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 21778).sync();
             channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                 @Override
                 public void operationComplete(Future<? super Void> future) throws Exception {
                     if(future.isSuccess()) {
                         LOGGER.info("用戶端連接配接成功........");
                     }else {
                         LOGGER.info("用戶端連接配接失敗........");
                     }
                 }
             });
             channelFuture.channel().closeFuture().sync();
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }

}
           

建立自定義處理器 com.xpc.rpc.consumer.handler.RpcConsumerHandler

scala複制代碼package com.xpc.rpc.consumer.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcConsumerHandler extends SimpleChannelInboundHandler<String> {


    private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("開始發送消息..........");
        String msg = "hello rpc";
        ctx.writeAndFlush(msg);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        LOGGER.info("這是服務端發送過來的消息: {}",s);
    }
}
           

到此服務提供者收發消息的基礎功能已經實作

測試

寫好的功能怎麼能不測試呢

在 xpc-rpc-test 下建立一個服務提供者的測試類 com.xpc.test.netty.ProviderTest

java複制代碼package com.xpc.test.netty;

import com.xpc.rpc.provider.server.base.BaseServer;
import org.junit.Test;

public class ProviderTest {


    @Test
    public void startNetty() {
        BaseServer baseServer = new BaseServer();
        baseServer.startNettyServer();
    }
}
           

在 xpc-rpc-test 下建立一個服務消費者的測試類 com.xpc.test.netty.ConsumerTest

java複制代碼package com.xpc.test.netty;

import com.xpc.rpc.consumer.RpcConsumer;
import org.junit.Test;

public class ConsumerTest {

    @Test
    public void startConsumer() {
        RpcConsumer rpcConsumer = new RpcConsumer();
        rpcConsumer.start();
    }
}
           

先啟動服務提供者 ProviderTest,然後再啟動服務消費者

服務提供者日志:

手撸RPC架構 - 服務提供者收發消息基礎實作

服務消費者日志:

手撸RPC架構 - 服務提供者收發消息基礎實作

繼續閱讀