天天看點

100 行代碼搞定了 RPC 原理,大家随便問。。

作者:孫浩

引言

本文主要論述的是“RPC 實作原理”,那麼首先明确一個問題什麼是 RPC 呢?RPC 是 Remote Procedure Call 的縮寫,即,遠端過程調用。RPC 是一個計算機通信協定。該協定允許運作于一台計算機的程式調用另一台計算機的子程式,而開發人員無需額外地為這個互動程式設計。

值得注意是,兩個或多個應用程式都分布在不同的伺服器上,它們之間的調用都像是本地方法調用一樣。接下來我們便來分析一下一次 RPC 調用發生了些什麼?

一次基本的 RPC 調用會涉及到什麼?

現在業界内比較流行的一些 RPC 架構,例如 Dubbo 提供的是​

​基于接口的遠端方法調用​

​​,即用戶端隻需要知道接口的定義即可調用遠端服務。在 Java 中接口并不能直接調用執行個體方法,必須通過其實作類對象來完成此操作,這意味着用戶端必須為這些接口生成​

​代理對象​

​​,對此 Java 提供了 ​

​Proxy​

​​、​

​InvocationHandler​

​​ 生成動态代理的支援;生成了代理對象,那麼每個具體的發方法是怎麼調用的呢?jdk 動态代理生成的代理對象調用指定方法時實際會執行 ​

​InvocationHandler​

​​ 中定義的 ​

​#invoke​

​ 方法,在該方法中完成遠端方法調用并擷取結果。

抛開用戶端,回過頭來看 RPC 是兩台計算機間的調用,實質上是兩台主機間的​

​網絡通信​

​​,涉及到網絡通信又必然會有​

​序列化、反序列化​

​​,​

​編解碼​

​​等一些必須要考慮的問題;同時實際上現在大多系統都是叢集部署的,多台主機/容器對外提供相同的服務,如果叢集的節點數量很大的話,那麼管理服務位址也将是一件十分繁瑣的事情,常見的做法是各個服務節點将自己的位址和提供的服務清單注冊到一個 ​

​注冊中心​

​​,由 ​

​注冊中心​

​​ 來統一管理服務清單;這樣的做法解決了一些問題同時為用戶端增加了一項新的工作——那就是​

​服務發現​

​,通俗來說就是從注冊中心中找到遠端方法對應的服務清單并通過某種政策從中選取一個服務位址來完成網絡通信。

聊了用戶端和 ​

​注冊中心​

​​,另外一個重要的角色自然是服務端,服務端最重要的任務便是提供服務接口的真正實作并在某個端口上監聽網絡請求,監聽到請求後從網絡請求中擷取到對應的參數(比如服務接口、方法、請求參數等),再根據這些參數通過​

​反射​

​的方式調用接口的真正實作擷取結果并将其寫入對應的響應流中。

綜上所述,一次基本的 RPC 調用流程大緻如下:

100 行代碼搞定了 RPC 原理,大家随便問。。

基本實作

服務端(生産者)

  • 服務接口

在 RPC 中,生産者和消費者有一個共同的服務接口 API。如下,定義一個 HelloService 接口。

/**
 * @author 孫浩
 * @Descrption  服務接口
 ***/
public interface HelloService {
    String sayHello(String somebody);
}      
  • 服務實作

生産者要提供服務接口的實作,建立 HelloServiceImpl 實作類。

/**
 * @author 孫浩
 * @Descrption 服務實作
 ***/
public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello(String somebody) {
        return "hello " + somebody + "!";
    }
}      
  • 服務注冊

本例使用 Spring 來管理 bean,采用自定義 xml 和解析器的方式來将服務實作類載入容器(當然也可以采用自定義注解的方式,此處不過多論述)并将服務接口資訊注冊到注冊中心。

首先自定義​​

​xsd​

​,

<xsd:element name="service">
    <xsd:complexType>
        <xsd:complexContent>
            <xsd:extension base="beans:identifiedType">
                <xsd:attribute name="interface" type="xsd:string" use="required"/>
                <xsd:attribute name="timeout" type="xsd:int" use="required"/>
                <xsd:attribute name="serverPort" type="xsd:int" use="required"/>
                <xsd:attribute name="ref" type="xsd:string" use="required"/>
                <xsd:attribute name="weight" type="xsd:int" use="optional"/>
                <xsd:attribute name="workerThreads" type="xsd:int" use="optional"/>
                <xsd:attribute name="appKey" type="xsd:string" use="required"/>
                <xsd:attribute name="groupName" type="xsd:string" use="optional"/>
            </xsd:extension>
        </xsd:complexContent>
    </xsd:complexType>
</xsd:element>      

分别指定 schema 和 xmd,schema 和對應 handler 的映射:

schema
http\://www.storm.com/schema/storm-service.xsd=META-INF/storm-service.xsd
http\://www.storm.com/schema/storm-reference.xsd=META-INF/storm-reference.xsd
handler
http\://www.storm.com/schema/storm-service=com.hsunfkqm.storm.framework.spring.StormServiceNamespaceHandler
http\://www.storm.com/schema/storm-reference=com.hsunfkqm.storm.framework.spring.StormRemoteReferenceNamespaceHandler      

将編寫好的檔案放入 ​

​classpath​

​​ 下的 ​

​META-INF​

​ 目錄下:

100 行代碼搞定了 RPC 原理,大家随便問。。

在 Spring 配置檔案中配置服務類:

<!-- 釋出遠端服務 -->
 <bean id="helloService" class="com.hsunfkqm.storm.framework.test.HelloServiceImpl"/>
 <storm:service id="helloServiceRegister"
                     interface="com.hsunfkqm.storm.framework.test.HelloService"
                     ref="helloService"
                     groupName="default"
                     weight="2"
                     appKey="ares"
                     workerThreads="100"
                     serverPort="8081"
                     timeout="600"/>      

編寫對應的 Handler 和 Parser:​

​StormServiceNamespaceHandler​

import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

/**
 * @author 孫浩
 * @Descrption 服務釋出自定義标簽
 ***/
public class StormServiceNamespaceHandler extends NamespaceHandlerSupport {
    @Override
    public void init() {
        registerBeanDefinitionParser("service", new ProviderFactoryBeanDefinitionParser());
    }
}      

​ProviderFactoryBeanDefinitionParser​

​:

protected Class getBeanClass(Element element) {
        return ProviderFactoryBean.class;
    }

    protected void doParse(Element element, BeanDefinitionBuilder bean) {

        try {
            String serviceItf = element.getAttribute("interface");
            String serverPort = element.getAttribute("serverPort");
            String ref = element.getAttribute("ref");
            // ....
            bean.addPropertyValue("serverPort", Integer.parseInt(serverPort));
            bean.addPropertyValue("serviceItf", Class.forName(serviceItf));
            bean.addPropertyReference("serviceObject", ref);
            //...
            if (NumberUtils.isNumber(weight)) {
                bean.addPropertyValue("weight", Integer.parseInt(weight));
            }
            //...
       } catch (Exception e) {
            // ...
      }
    }      

​ProviderFactoryBean​

​:

/**
 * @author 孫浩
 * @Descrption 服務釋出
 ***/
public class ProviderFactoryBean implements FactoryBean, InitializingBean {

    //服務接口
    private Class<?> serviceItf;
    //服務實作
    private Object serviceObject;
    //服務端口
    private String serverPort;
    //服務逾時時間
    private long timeout;
    //服務代理對象,暫時沒有用到
    private Object serviceProxyObject;
    //服務提供者唯一辨別
    private String appKey;
    //服務分組組名
    private String groupName = "default";
    //服務提供者權重,預設為 1 , 範圍為 [1-100]
    private int weight = 1;
    //服務端線程數,預設 10 個線程
    private int workerThreads = 10;

    @Override
    public Object getObject() throws Exception {
        return serviceProxyObject;
    }

    @Override
    public Class<?> getObjectType() {
        return serviceItf;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //啟動 Netty 服務端
        NettyServer.singleton().start(Integer.parseInt(serverPort));
        //注冊到 zk, 中繼資料注冊中心
        List<ProviderService> providerServiceList = buildProviderServiceInfos();
        IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
        registerCenter4Provider.registerProvider(providerServiceList);
    }
}

//================RegisterCenter#registerProvider======================
@Override
public void registerProvider(final List<ProviderService> serviceMetaData) {
    if (CollectionUtils.isEmpty(serviceMetaData)) {
        return;
    }

    //連接配接 zk, 注冊服務
    synchronized (RegisterCenter.class) {
        for (ProviderService provider : serviceMetaData) {
            String serviceItfKey = provider.getServiceItf().getName();

            List<ProviderService> providers = providerServiceMap.get(serviceItfKey);
            if (providers == null) {
                providers = Lists.newArrayList();
            }
            providers.add(provider);
            providerServiceMap.put(serviceItfKey, providers);
        }

        if (zkClient == null) {
            zkClient = new ZkClient(ZK_SERVICE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
        }

        //建立 ZK 命名空間/目前部署應用 APP 命名空間/
        String APP_KEY = serviceMetaData.get(0).getAppKey();
        String ZK_PATH = ROOT_PATH + "/" + APP_KEY;
        boolean exist = zkClient.exists(ZK_PATH);
        if (!exist) {
            zkClient.createPersistent(ZK_PATH, true);
        }

        for (Map.Entry<String, List<ProviderService>> entry : providerServiceMap.entrySet()) {
            //服務分組
            String groupName = entry.getValue().get(0).getGroupName();
            //建立服務提供者
            String serviceNode = entry.getKey();
            String servicePath = ZK_PATH + "/" + groupName + "/" + serviceNode + "/" + PROVIDER_TYPE;
            exist = zkClient.exists(servicePath);
            if (!exist) {
                zkClient.createPersistent(servicePath, true);
            }

            //建立目前伺服器節點
            int serverPort = entry.getValue().get(0).getServerPort();//服務端口
            int weight = entry.getValue().get(0).getWeight();//服務權重
            int workerThreads = entry.getValue().get(0).getWorkerThreads();//服務工作線程
            String localIp = IPHelper.localIp();
            String currentServiceIpNode = servicePath + "/" + localIp + "|" + serverPort + "|" + weight + "|" + workerThreads + "|" + groupName;
            exist = zkClient.exists(currentServiceIpNode);
            if (!exist) {
                //注意,這裡建立的是臨時節點
                zkClient.createEphemeral(currentServiceIpNode);
            }
            //監聽注冊服務的變化,同時更新資料到本地緩存
            zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    if (currentChilds == null) {
                        currentChilds = Lists.newArrayList();
                    }
                    //存活的服務 IP 清單
                    List<String> activityServiceIpList = Lists.newArrayList(Lists.transform(currentChilds, new Function<String, String>() {
                        @Override
                        public String apply(String input) {
                            return StringUtils.split(input, "|")[0];
                        }
                    }));
                    refreshActivityService(activityServiceIpList);
                }
            });

        }
    }
}      

至此服務實作類已被載入 Spring 容器中,且服務接口資訊也注冊到了注冊中心。

  • 網絡通信

作為生産者對外提供 RPC 服務,必須有一個網絡程式來來監聽請求和做出響應。在 Java 領域 Netty 是一款高性能的 NIO 通信架構,很多的架構的通信都是采用 Netty 來實作的,本例中也采用它當做通信伺服器。

建構并啟動 Netty 服務監聽指定端口:

public void start(final int port) {
    synchronized (NettyServer.class) {
        if (bossGroup != null || workerGroup != null) {
            return;
        }

        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //注冊解碼器 NettyDecoderHandler
                        ch.pipeline().addLast(new NettyDecoderHandler(StormRequest.class, serializeType));
                        //注冊編碼器 NettyEncoderHandler
                        ch.pipeline().addLast(new NettyEncoderHandler(serializeType));
                        //注冊服務端業務邏輯處理器 NettyServerInvokeHandler
                        ch.pipeline().addLast(new NettyServerInvokeHandler());
                    }
                });
        try {
            channel = serverBootstrap.bind(port).sync().channel();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}      

上面的代碼中向 Netty 服務的 pipeline 中添加了編解碼和業務處理器,當接收到請求時,經過編解碼後,真正處理業務的是業務處理器,即​

​NettyServerInvokeHandler​

​​, 該處理器繼承自​

​SimpleChannelInboundHandler​

​​, 當資料讀取完成将觸發一個事件,并調用​

​NettyServerInvokeHandler#channelRead0​

​方法來處理請求。

@Override
protected void channelRead0(ChannelHandlerContext ctx, StormRequest request) throws Exception {
    if (ctx.channel().isWritable()) {
        //從服務調用對象裡擷取服務提供者資訊
        ProviderService metaDataModel = request.getProviderService();
        long consumeTimeOut = request.getInvokeTimeout();
        final String methodName = request.getInvokedMethodName();

        //根據方法名稱定位到具體某一個服務提供者
        String serviceKey = metaDataModel.getServiceItf().getName();
        //擷取限流工具類
        int workerThread = metaDataModel.getWorkerThreads();
        Semaphore semaphore = serviceKeySemaphoreMap.get(serviceKey);
        if (semaphore == null) {
            synchronized (serviceKeySemaphoreMap) {
                semaphore = serviceKeySemaphoreMap.get(serviceKey);
                if (semaphore == null) {
                    semaphore = new Semaphore(workerThread);
                    serviceKeySemaphoreMap.put(serviceKey, semaphore);
                }
            }
        }

        //擷取注冊中心服務
        IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
        List<ProviderService> localProviderCaches = registerCenter4Provider.getProviderServiceMap().get(serviceKey);

        Object result = null;
        boolean acquire = false;

        try {
            ProviderService localProviderCache = Collections2.filter(localProviderCaches, new Predicate<ProviderService>() {
                @Override
                public boolean apply(ProviderService input) {
                    return StringUtils.equals(input.getServiceMethod().getName(), methodName);
                }
            }).iterator().next();
            Object serviceObject = localProviderCache.getServiceObject();

            //利用反射發起服務調用
            Method method = localProviderCache.getServiceMethod();
            //利用 semaphore 實作限流
            acquire = semaphore.tryAcquire(consumeTimeOut, TimeUnit.MILLISECONDS);
            if (acquire) {
                result = method.invoke(serviceObject, request.getArgs());
                //System.out.println("---------------"+result);
            }
        } catch (Exception e) {
            System.out.println(JSON.toJSONString(localProviderCaches) + "  " + methodName+" "+e.getMessage());
            result = e;
        } finally {
            if (acquire) {
                semaphore.release();
            }
        }
        //根據服務調用結果組裝調用傳回對象
        StormResponse response = new StormResponse();
        response.setInvokeTimeout(consumeTimeOut);
        response.setUniqueKey(request.getUniqueKey());
        response.setResult(result);
        //将服務調用傳回對象回寫到消費端
        ctx.writeAndFlush(response);
    } else {
        logger.error("------------channel closed!---------------");
    }
}      

此處還有部分細節如自定義的編解碼器等,篇幅所限不在此詳述,繼承 ​

​MessageToByteEncoder​

​​ 和 ​

​ByteToMessageDecoder​

​​ 覆寫對應的 ​

​encode​

​​ 和 ​

​decode​

​ 方法即可自定義編解碼器,使用到的序列化工具如 Hessian/Proto 等可參考對應的官方文檔。

  • 請求和響應包裝

    為便于封裝請求和響應,定義兩個 bean 來表示請求和響應。

請求:

/**
 * @author 孫浩
 * @Descrption
 ***/
public class StormRequest implements Serializable {

    private static final long serialVersionUID = -5196465012408804755L;
    //UUID, 唯一辨別一次傳回值
    private String uniqueKey;
    //服務提供者資訊
    private ProviderService providerService;
    //調用的方法名稱
    private String invokedMethodName;
    //傳遞參數
    private Object[] args;
    //消費端應用名
    private String appName;
    //消費請求逾時時長
    private long invokeTimeout;
    // getter/setter
}      

響應:

/**
 * @author 孫浩
 * @Descrption
 ***/
public class StormResponse implements Serializable {
    private static final long serialVersionUID = 5785265307118147202L;
    //UUID, 唯一辨別一次傳回值
    private String uniqueKey;
    //用戶端指定的服務逾時時間
    private long invokeTimeout;
    //接口調用傳回的結果對象
    private Object result;
    //getter/setter
}      

用戶端(消費者)

用戶端(消費者)在 RPC 調用中主要是生成服務接口的代理對象,并從注冊中心擷取對應的服務清單發起網絡請求。

用戶端和服務端一樣采用 Spring 來管理 bean 解析 xml 配置等不再贅述,重點看下以下幾點:

  • 通過 jdk 動态代理來生成引入服務接口的代理對象
public Object getProxy() {
    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{targetInterface}, this);
}      
  • 從注冊中心擷取服務清單并依據某種政策選取其中一個服務節點
//服務接口名稱
String serviceKey = targetInterface.getName();
//擷取某個接口的服務提供者清單
IRegisterCenter4Invoker registerCenter4Consumer = RegisterCenter.singleton();
List<ProviderService> providerServices = registerCenter4Consumer.getServiceMetaDataMap4Consume().get(serviceKey);
//根據軟負載政策,從服務提供者清單選取本次調用的服務提供者
ClusterStrategy clusterStrategyService = ClusterEngine.queryClusterStrategy(clusterStrategy);
ProviderService providerService = clusterStrategyService.select(providerServices);      
  • 通過 Netty 建立連接配接,發起網絡請求
/**
 * @author 孫浩
 * @Descrption Netty 消費端 bean 代理工廠
 ***/
public class RevokerProxyBeanFactory implements InvocationHandler {
    private ExecutorService fixedThreadPool = null;
    //服務接口
    private Class<?> targetInterface;
    //逾時時間
    private int consumeTimeout;
    //調用者線程數
    private static int threadWorkerNumber = 10;
    //負載均衡政策
    private String clusterStrategy;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        ...

        //複制一份服務提供者資訊
        ProviderService newProvider = providerService.copy();
        //設定本次調用服務的方法以及接口
        newProvider.setServiceMethod(method);
        newProvider.setServiceItf(targetInterface);

        //聲明調用 AresRequest 對象,AresRequest 表示發起一次調用所包含的資訊
        final StormRequest request = new StormRequest();
        //設定本次調用的唯一辨別
        request.setUniqueKey(UUID.randomUUID().toString() + "-" + Thread.currentThread().getId());
        //設定本次調用的服務提供者資訊
        request.setProviderService(newProvider);
        //設定本次調用的方法名稱
        request.setInvokedMethodName(method.getName());
        //設定本次調用的方法參數資訊
        request.setArgs(args);

        try {
            //建構用來發起調用的線程池
            if (fixedThreadPool == null) {
                synchronized (RevokerProxyBeanFactory.class) {
                    if (null == fixedThreadPool) {
                        fixedThreadPool = Executors.newFixedThreadPool(threadWorkerNumber);
                    }
                }
            }
            //根據服務提供者的 ip,port, 建構 InetSocketAddress 對象,辨別服務提供者位址
            String serverIp = request.getProviderService().getServerIp();
            int serverPort = request.getProviderService().getServerPort();
            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort);
            //送出本次調用資訊到線程池 fixedThreadPool, 發起調用
            Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
            //擷取調用的傳回結果
            StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
            if (response != null) {
                return response.getResult();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return null;
    }
    //  ...
}      

Netty 的響應是異步的,為了在方法調用傳回前擷取到響應結果,需要将異步的結果同步化。

  • Netty 異步傳回的結果存入阻塞隊列
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, StormResponse response) throws Exception {
    //将 Netty 異步傳回的結果存入阻塞隊列,以便調用端同步擷取
    RevokerResponseHolder.putResultValue(response);
}      
  • 請求發出後同步擷取結果
//送出本次調用資訊到線程池 fixedThreadPool, 發起調用
Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
//擷取調用的傳回結果
StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
if (response != null) {
    return response.getResult();
}

//===================================================
//從傳回結果容器中擷取傳回結果,同時設定等待逾時時間為 invokeTimeout
long invokeTimeout = request.getInvokeTimeout();
StormResponse response = RevokerResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);      

測試

​Server​

​:

/**
 * @author 孫浩
 * @Descrption
 ***/
public class MainServer {
    public static void main(String[] args) throws Exception {
        //釋出服務
        final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-server.xml");
        System.out.println(" 服務釋出完成");
    }
}      

​Client​

​:

public class Client {

    private static final Logger logger = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws Exception {

        final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-client.xml");
        final HelloService helloService = (HelloService) context.getBean("helloService");
        String result = helloService.sayHello("World");
        System.out.println(result);
        for (;;) {

        }
    }
}      

結果

生産者:

100 行代碼搞定了 RPC 原理,大家随便問。。

消費者:

100 行代碼搞定了 RPC 原理,大家随便問。。

注冊中心

100 行代碼搞定了 RPC 原理,大家随便問。。

總結

本文簡單介紹了 RPC 的整個流程,并實作了一個簡單的 RPC 調用。希望閱讀完本文之後,能加深你對 RPC 的一些認識。

  • 生産者端流程:
  • 加載服務接口,并緩存
  • 服務注冊,将服務接口以及服務主機資訊寫入注冊中心(本例使用的是 zookeeper)
  • 啟動網絡伺服器并監聽
  • 反射,本地調用
  • 消費者端流程:
  • 代理服務接口生成代理對象
  • 服務發現(連接配接 zookeeper,拿到服務位址清單,通過用戶端負載政策擷取合适的服務位址)
  • 遠端方法調用(本例通過 Netty,發送消息,并擷取響應結果)