天天看點

這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構

作者:冰河

部落格位址:https://binghe001.github.io

大家好,我是冰河~~

沒錯,這次冰河又要搞事情了,這次準備下手的是

RPC架構項目

。為什麼要對

RPC架構項目

下手呢,因為在如今分布式、微服務乃至雲原生不斷發展的過程中,RPC作為底層必不可少的通信元件,被廣泛應用在分布式、微服務和雲原生項目中。

為啥要開發RPC架構

事情是這樣的,在開發這個RPC架構之前,我花費了不少時間算是對Dubbo架構徹底研究透徹了。

冰河在撸透了Dubbo2.x和Dubbo3.x的源碼之後,本來想給大家寫一個Dubbo源碼解析的專欄。為此,我其實私下準備了一個多月:畫流程圖、分析源碼、寫測試Demo,自己在看Dubbo源碼時,也為Dubbo源碼添加了非常詳細的注釋。這裡,就包含Dubbo2.x和Dubbo3.x的源碼。

這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構

當我就這麼熬夜肝文一個多月後,突然發現一個問題:Dubbo經過多年不斷的疊代開發,它的源碼已經非常多了,如果以文章的形式将Dubbo的源碼面面俱到的分析到位,那還不知道要寫到何年何月去了。當我寫文章分析Dubbo的最新版本3.x時,可能寫到專欄的中後期Dubbo已經更新到4.x、5.x,設定有可能是6.x、7.x了。

與其這麼費勁吧咧的分析源碼,還不如從零開始帶着大家一起手撸一個能夠在實際生産環境使用的、分布式、高性能、可擴充的RPC架構。這樣,大家也能夠直覺的感受到一個能夠在實際場景使用的RPC架構是如何一步步開發出來的。

相信大家在學完《RPC手撸專欄》後,自己再去看Dubbo源碼的話,就相對來說簡單多了。你說是不是這樣的呢?

你能學到什麼?

既然是整個專欄的開篇嘛,肯定是要告訴你在這個專欄中能夠學習到哪些實用的技術的。這裡,我就畫一張圖來直覺的告訴你在《RPC手撸專欄》能夠學到哪些技術吧。

《RPC手撸專欄》整體架構技術全貌如圖所示,加入星球後與冰河一起從零實作它,搞定它,當你緊跟冰河節奏搞定這個RPC架構後,你會發現:什麼Dubbo、什麼gRPC、什麼BRPC、什麼Hessian、什麼Tars、什麼Thrift、什麼motan、什麼hprose等等等等,市面上主流的RPC架構,對你來說就都不叫事兒了,跟緊冰河的節奏,你可以的。

這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構

相信小夥伴們看到《RPC手撸專欄》涉及到的知識點,應該能夠了解到咱們這個從零開始的《RPC手撸專欄》還是比較硬核的吧?

另外,咱這RPC項目支援同步調用、異步調用、回調和單向調用。

  • 同步調用
    這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構
  • 異步調用
這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構
  • 回調
這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構
  • 單向調用
這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構

對,沒錯,咱們《RPC手撸專欄》最終實作的RPC架構的定位就是盡量可以在實際環境使用。通過這個專欄的學習,讓大家深入了解到能夠在實際場景使用的RPC架構是如何一步步開發出來的。

代碼結構

我将這個

bhrpc項目

的定位為可在實際場景使用的、分布式、高性能、可擴充的RPC架構,目前總體上已經開發并完善的功能達到60+個子項目,大家看圖吧。

這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構
這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構

項目大量使用了對标Dubbo的自定義SPI技術實作高度可擴充性,各位小夥伴可以根據自己的需要,按照SPI的設計要求添加自己實作的自定義插件。

這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構

示範效果

說了那麼多,咱們一起來看看這個RPC架構的使用效果吧,因為咱們這個RPC架構支援的調用方式有:原生RPC調用、整合Spring(XML/注解)、整合SpringBoot、整合SpringCloud、整合SpringCloud Alibaba,整合Docker和整合K8S七種使用方式。

這裡,咱們就以 整合Spring注解的方式 來給大家示範下這個RPC架構。

RPC核心注解說明

為了讓大家更好的了解這個RPC架構,我先給大家看下RPC架構的兩個核心注解,一個是RPC的服務提供者注解

@RpcService

,一個是RPC的服務調用者注解

@RpcReference

(1)服務提供者注解

@RpcService

的核心源碼如下所示。

/**
 * @author binghe
 * @version 1.0.0
 * @description bhrpc服務提供者注解
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {

    /**
     * 接口的Class
     */
    Class<?> interfaceClass() default void.class;

    /**
     * 接口的ClassName
     */
    String interfaceClassName() default "";

    /**
     * 版本号
     */
    String version() default "1.0.0";

    /**
     * 服務分組,預設為空
     */
    String group() default "";

    /**
     * 延遲釋出,預留
     */
    int delay() default 0;

    /**
     * 是否導出rpc服務,預留
     */
    boolean export() default true;
}
           

(2)服務調用者注解

@RpcReference

的核心源碼如下所示。

/**
 * @author binghe
 * @version 1.0.0
 * @description bhrpc服務消費者
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface RpcReference {

    /**
     * 版本号
     */
    String version() default "1.0.0";

    /**
     * 注冊中心類型, 目前的類型包含:zookeeper、nacos、etcd、consul
     */
    String registryType() default "zookeeper";

    /**
     * 注冊位址
     */
    String registryAddress() default "127.0.0.1:2181";

    /**
     * 負載均衡類型,預設基于ZK的一緻性Hash
     */
    String loadBalanceType() default "zkconsistenthash";

    /**
     * 序列化類型,目前的類型包含:protostuff、kryo、json、jdk、hessian2、fst
     */
    String serializationType() default "protostuff";

    /**
     * 逾時時間,預設5s
     */
    long timeout() default 5000;

    /**
     * 是否異步執行
     */
    boolean async() default false;

    /**
     * 是否單向調用
     */
    boolean oneway() default false;

    /**
     * 代理的類型,jdk:jdk代理, javassist: javassist代理, cglib: cglib代理
     */
    String proxy() default "jdk";

    /**
     * 服務分組,預設為空
     */
    String group() default "";
}
           

這裡,我隻列出了服務提供者注解

@RpcService

和服務調用者注解

@RpcReference

的部分源碼,後續在RPC架構不斷完善的過程中,大家就可以慢慢看到源碼的全貌和其每個注解實作的功能。這裡,我就不詳細介紹了。

當然啦,在這個RPC架構實作的原生調用方式中,可以不用這些注解就能夠實作遠端調用。

效果示範

接口定義

定義兩個接口,分别為HelloService和HelloPersonService,源碼如下所示。

  • HelloService接口源碼
public interface HelloService {
    String hello(String name);
    String hello(Person person);
}
           
  • HelloPersonService接口源碼
public interface HelloPersonService {
    List<Person> getTestPerson(String name,int num);
}
           

實作服務提供者demo

(1)建立HelloService接口和HelloPersonService接口的實作類HelloServiceImpl和HelloPersonServiceImpl,如下所示。

  • HelloServiceImpl類源碼
@RpcService(interfaceClass = HelloService.class, version = "1.0.0")
public class HelloServiceImpl implements HelloService {

    @Override
    public String hello(String name) {
        return "Hello! " + name;
    }

    @Override
    public String hello(Person person) {
        return "Hello! " + person.getFirstName() + " " + person.getLastName();
    }
}
           

可以看到,在HelloServiceImpl類上添加了RPC服務提供者注解

@RpcService

,表示将其釋出為一個RPC服務。

  • HelloPersonServiceImpl類源碼
@RpcService(interfaceClass = HelloPersonService.class, version = "1.0.0")
public class HelloPersonServiceImpl implements HelloPersonService {
    @Override
    public List<Person> getTestPerson(String name, int num) {
        List<Person> persons = new ArrayList<>(num);
        for (int i = 0; i < num; ++i) {
            persons.add(new Person(Integer.toString(i), name));
        }
        return persons;
    }
}
           

可以看到,在HelloPersonServiceImpl類上添加了RPC服務提供者注解

@RpcService

,表示将其釋出為一個RPC服務。

(2)建立服務提供者demo的配置類ServerConfig,在ServerConfig類中注入RegistryService注冊中心接口的實作類,以及RPC服務提供者的核心類RpcServer,如下所示。

/**
 * @author binghe
 * @version 1.0.0
 * @description 基于注解的配置類
 */
@Configuration
@ComponentScan(value = {"io.binghe.rpc.demo"})
@PropertySource(value = {"classpath:rpc.properties"})
public class SpringAnnotationProviderConfig {

    @Value("${registry.address}")
    private String registryAddress;

    @Value("${registry.type}")
    private String registryType;

    @Value("${registry.loadbalance.type}")
    private String registryLoadbalanceType;

    @Value("${server.address}")
    private String serverAddress;

    @Value("${reflect.type}")
    private String reflectType;

    @Bean
    public RpcSpringServer rpcSpringServer(){
        return new RpcSpringServer(serverAddress, registryAddress, registryType, registryLoadbalanceType, reflectType);
    }
}
           

(3)建立服務提供者demo的啟動類ServerTest,如下所示。

/**
 * @author binghe
 * @version 1.0.0
 * @description RPC整合Spring注解,服務提供者demo啟動類
 */
public class ServerTest {
    public static void main(String[] args){
        new AnnotationConfigApplicationContext(ServerConfig.class);
    }
}
           

實作服務調用者demo

(1)建立測試服務調用者的TestService接口,如下所示。

public interface TestService {
    void printResult();
}
           

(2)建立TestService接口的實作類TestServiceImpl,在TestServiceImpl類上标注Spring的

@Service

注解,并在TestServiceImpl類中通過

@RpcReference

注解注入HelloService接口的實作類和HelloPersonService接口的實作類,并實作TestService接口的printResult()方法,源碼如下所示。

/**
 * @author binghe
 * @version 1.0.0
 * @description 測試RPC服務調用者
 */
@Service
public class TestServiceImpl implements TestService {

    @RpcReference(version = "1.0.0", timeout = 3000, proxy = "javassist", isAsync = true)
    private HelloService helloService;
    
    @RpcReference(proxy = "cglib")
    private HelloPersonService helloPersonService;

    @Override
    public void printResult() {
        String result = helloService.hello("binghe");
        System.out.println(result);
        result = helloService.hello(new Person("binghe001", "binghe002"));
        System.out.println(result);
        System.out.println("=================================");
        List<Person> personList = helloPersonService.getTestPerson("binghe", 2);
        personList.stream().forEach(System.out::println);
    }
}
           

通過TestServiceImpl類的源碼我們可以看到,遠端調用HelloService接口的方法時使用的是javassist動态代理,遠端調用HelloPersonService接口時,使用的是cglib動态代理。

(3)建立服務調用者demo的配置類ClientConfig,如下所示。

@Configuration
@ComponentScan(value = {"io.binghe.rpc.*"})
@PropertySource(value = {"classpath:rpc.properties"})
public class ClientConfig {
}
           

(4)建立服務調用者demo的啟動類ClientTest,如下所示。

public class ClientTest {

    public static void main(String[] args){
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ClientConfig.class);
        TestService testService = context.getBean(TestService.class);
        testService.printResult();
        context.close();
    }
}
           

啟動服務測試

(1)啟動Zookeeper,這裡,為了示範簡單,就直接在我本機啟動單機Zookeeper好了,啟動後的效果如下圖所示。

這次我設計了一款TPS百萬級别的分布式、高性能、可擴充的RPC架構

(2)啟動服務提供者ServerTest類,啟動後輸出的日志資訊如下所示。

13:43:36,876  INFO ConnectionStateManager:228 - State change: CONNECTED
13:43:36,905  INFO RpcClient:79 - use cglib dynamic proxy...
13:43:36,942  INFO CuratorFrameworkImpl:235 - Starting
13:43:36,943  INFO ZooKeeper:868 - Initiating client connection, connectString=127.0.0.1:2181 
           

可以看到,服務提供者已經将釋出的服務注冊到了Zookeeper中。

(3)登入Zookeeper用戶端檢視Zookeeper中注冊的服務,如下所示。

  • 檢視HelloService接口釋出的服務資訊
[zk: localhost:2181(CONNECTED) 5] get /binghe_rpc/io.binghe.rpc.test.client.HelloService#1.0.0/65eb0d7f-4bf7-4a0a-bafc-1b7e0e030353

{"name":"io.binghe.rpc.test.client.HelloService#1.0.0","id":"65eb0d7f-4bf7-4a0a-bafc-1b7e0e030353","address":"127.0.0.1","port":18866,"sslPort":null,"payload":{"@class":"io.binghe.rpc.center.meta.ServiceMeta","serviceName":"io.binghe.rpc.test.client.HelloService","serviceVersion":"1.0.0","serviceAddr":"127.0.0.1","servicePort":18866},"registrationTimeUTC":1656135817627,"serviceType":"DYNAMIC","uriSpec":null,"enabled":true}
           
  • 檢視HelloPersonService接口釋出的服務資訊
[zk: localhost:2181(CONNECTED) 7] get /binghe_rpc/io.binghe.rpc.test.client.HelloPersonService#1.0.0/882a5cdb-f581-4a83-8d56-800a8f14e831

{"name":"io.binghe.rpc.test.client.HelloPersonService#1.0.0","id":"882a5cdb-f581-4a83-8d56-800a8f14e831","address":"127.0.0.1","port":18866,"sslPort":null,"payload":{"@class":"io.binghe.rpc.center.meta.ServiceMeta","serviceName":"io.binghe.rpc.test.client.HelloPersonService","serviceVersion":"1.0.0","serviceAddr":"127.0.0.1","servicePort":18866},"registrationTimeUTC":1656135817274,"serviceType":"DYNAMIC","uriSpec":null,"enabled":true}
           

通過Zookeeper用戶端可以看出,HelloService接口和HelloPersonService接口釋出的服務都已經被注冊到Zookeeper了。

(4)啟動服務提供者ClientTest類,實作RPC調用,輸出的日志資訊如下所示。

13:56:47,391  INFO ConnectionStateManager:228 - State change: CONNECTED
13:56:47,488  INFO RpcClient:76 - use javassist dynamic proxy...
13:56:47,518  INFO ConnectionStateManager:228 - State change: CONNECTED
13:56:47,545  INFO RpcClient:79 - use cglib dynamic proxy...
13:56:48,253  INFO RpcConsumer:85 - connect rpc server 127.0.0.1 on port 18866 success.
Hello! binghe
Hello! binghe001 binghe002
=================================
0 binghe
1 binghe
           

可以看到,在ClientTest類的指令行輸出了遠端調用的結果資訊。并輸出了調用HelloService接口的遠端方法使用的是javassist動态代理。調用HelloPersonService接口的遠端方法使用的是cglib動态代理。

咱們一起手撸的RPC架構其實還有很多非常強大的功能,這裡,就不一一示範了,後面咱們都會一起手撸來實作它。

一點點建議

咱們這個專欄屬于實戰類型比較強的專欄,加上咱們一起從零開始手撸的RPC架構會涉及衆多的知識點。正所謂紙上得來終覺淺,絕知此事要躬行。冰河希望大家在學習這個專欄的時候勤動手,跟着專欄一起實作代碼。期間要多動腦,多總結,這樣才能夠加深對各項知識點的了解。切忌眼高手低,學了半天卻最終啥也沒學會。

好了,今天的開篇文章就到這兒吧,如果文章對你有點幫助,記得給冰河一鍵三連哦,歡迎将文章轉發給更多的小夥伴,冰河将不勝感激~~

一起出發

我會将《RPC手撸專欄》的源碼擷取方式放到知識星球中,同時在微信上會建立專門的知識星球群,冰河會在知識星球上和星球群裡解答球友的提問,關注冰河技術公号 回複 星球 即可擷取優惠券。

歡迎大家将文章或者星球轉發到群裡或者朋友圈,這些内容冰河将用下班、周末、假期的時間不斷完善。通過視訊+文章+知識小冊+直播+作業的形式與你一起學習、提升和進步,最終的目的就是提升你的技術實力,讓你在職場走的更遠,順便多賺些錢。

好了,今天就到這兒吧,我是冰河,我們下期見~~