天天看點

Dubbo筆記 ① : 入門篇一、前言二、Dubbo的服務調用三、泛化調用四、本地服務Mock五、 服務降級

文章目錄

  • 一、前言
    • 1. Dubbo 架構
  • 二、Dubbo的服務調用
    • 1. SDK 定義
    • 2. 同步釋出和調用服務
      • 2.1. 服務提供者
      • 2.2 服務消費者
      • 2.3. 運作結果
    • 3. 異步釋出和調用服務
      • 3.1 服務提供者
      • 3.2 服務消費者
      • 3.3 運作結果
  • 三、泛化調用
  • 四、本地服務Mock
  • 五、 服務降級

一、前言

本系列為個人Dubbo學習筆記,内容基于《深度剖析Apache Dubbo 核心技術内幕》, 過程參考官方源碼分析文章,僅用于個人筆記記錄。本文分析基于Dubbo2.7.0版本,由于個人了解的局限性,若文中不免出現錯誤,感謝指正。

系列文章位址:Dubbo源碼分析:全集整理

Dubbo是阿裡巴巴公司開源的一個高性能優秀的服務架構,使得應用可通過高性能的 RPC 實作服務的輸出和輸入功能,可以和 Spring架構無縫內建。Dubbo是一款高性能、輕量級的開源Java RPC架構,它提供了三大核心能力:面向接口的遠端方法調用,智能容錯和負載均衡,以及服務自動注冊和發現。

1. Dubbo 架構

Dubbo筆記 ① : 入門篇一、前言二、Dubbo的服務調用三、泛化調用四、本地服務Mock五、 服務降級
節點 角色說明
Provider 暴露服務的服務提供方
Consumer 調用遠端服務的服務消費方
Registry 服務注冊與發現的注冊中心
Monitor 統計服務的調用次數和調用時間的監控中心
Container 服務運作容器

調用關系說明

  1. 服務容器負責啟動,加載,運作服務提供者。
  2. 服務提供者在啟動時,向注冊中心注冊自己提供的服務。
  3. 服務消費者在啟動時,向注冊中心訂閱自己所需的服務。
  4. 注冊中心傳回服務提供者位址清單給消費者,如果有變更,注冊中心将基于長連接配接推送變更資料給消費者。
  5. 服務消費者,從提供者位址清單中,基于軟負載均衡算法,選一台提供者進行調用,如果調用失敗,再選另一台調用。
  6. 服務消費者和提供者,在記憶體中累計調用次數和調用時間,定時每分鐘發送一次統計資料到監控中心。

更加詳細的劃分:

Dubbo筆記 ① : 入門篇一、前言二、Dubbo的服務調用三、泛化調用四、本地服務Mock五、 服務降級

這部分内容詳參:

https://www.cnblogs.com/barrywxx/p/8528849.html

https://dubbo.apache.org/zh/docs/v2.7/user/preface/architecture/

二、Dubbo的服務調用

關于Dubbo 在 Spring 中的使用請移步 :https://blog.csdn.net/qq_36882793/article/details/103324692。

下面的調用引用基于Main 方法實作。

1. SDK 定義

public interface GreetingService {
    /**
     * 同步調用sayHello
     * @param name
     * @return
     */
    String sayHello(String name);

    /**
     * 異步調用 sayHello : 通過 CompletableFuture 方式
     * @param name
     * @return
     */
    CompletableFuture<String> sayHelloForAsync(String name);

    /**
     * 異步調用 sayHello : 通過 AsyncContext 方式
     * @param name
     * @return
     */
    String sayHelloForAsyncByContext(String name);

}

// 實作類

public class GreetingServiceImpl implements GreetingService {
    // 為異步調用提供單獨的線程池,避免使用JDK公共線程池(ForkJoinPool,commonPool())
    private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(8, 15, 1,
            TimeUnit.MINUTES, new SynchronousQueue<>(), new NamedThreadFactory("thread-pool"), new ThreadPoolExecutor.CallerRunsPolicy());

    @Override
    public String sayHello(String name) {
        String attribute = RpcContext.getContext().getAttachment("attribute");
        return sayHello(name, attribute);
    }

    private String sayHello(String name, String attribute) {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "GreetingServiceImpl :attribute = " + attribute + " name = " + name;
    }

    @Override
    public CompletableFuture<String> sayHelloForAsync(String name) {
        // 這裡如果需要使用上下文需要在這裡(Dubbo 線程)擷取到上下文
        String attribute = RpcContext.getContext().getAttachment("attribute");
        return CompletableFuture.supplyAsync(() -> sayHello(name, attribute), POOL);
    }


    /**
     *  1. RpcContext.startAsync(); 開啟異步執行,傳回一個 asyncContext
     *  2. 随後把服務處理任務送出到線程池後sayHelloForAsyncByContext 就直接傳回null
     *  3. asyncContext.signalContextSwitch(); 切換成Dubbo上下文
     *  4. 代碼執行結束後 asyncContext.write(sayHello(name, attribute)); 将執行結果寫回異步上下文中。
     * @param name
     * @return
     */
    @Override
    public String sayHelloForAsyncByContext(String name) {
    	// 1. 開啟異步
        AsyncContext asyncContext = RpcContext.startAsync();
        // 這裡由于處理業務的線程是 POOL 提供,是以Dubbo 線程不會被阻塞
        POOL.execute(() -> {
            // 如果要使用上下文,則必須要放在第一句執行
            asyncContext.signalContextSwitch();
            String attribute = RpcContext.getContext().getAttachment("attribute");
            // 寫回響應資訊
            asyncContext.write(sayHello(name, attribute));
        });
        return null;
    }
}
           

對于 sayHello 方法:是Dubbo同步調用時方法。其作用是在睡眠2s後将name 拼接傳回。同時通過 RpcContext.getContext().getAttachment(“attribute”); 擷取消費者在調用時隐式傳遞的參數(attribute)。

對于 sayHelloForAsync 方法 :是Dubbo 異步調用時的方法。基于定義CompletableFuture 簽名的接口實作異步執行需要接口傳回值為 CompletableFuture,并且方法内部使用 CompletableFuture.supplyAsync讓本該由Dubbo 内部線程池中的線程處理的任務,轉換由業務自定義的線程池中的線程來處理, CompletableFuture.supplyAsync 方法會立刻傳回一個CompletableFuture 對象,是以Dubbo内部線程池中的線程會得到及時釋放,傳遞的業務函數則由業務線程池POOL 來進行處理。需要注意的是,調用 sayHelloForAsync 方法的是 Dubbo線程模型線程池的線程,但是其業務處理CompletableFuture.supplyAsync 中的業務邏輯(sayHello(name)) 則是傳遞在 POOL 線程池中處理。也就是說,1處和2處的線程并非同一個線程。

對于 sayHelloForAsyncByContext 方法 :通過 .RpcContext.startAsync(); 開啟異步執行,随後将任務送出給使用者線程池執行後直接傳回,當使用者線程執行時,首先切換上下文,待執行結束後将結果寫回 異步上下文。

個人了解 : 第一種是将業務邏輯第二種和第三種相比,處理業務邏輯使用的是Dubbo 線程,而後面兩種則是使用的使用者自定義的線程池中的線程,可以更快的釋放Dubbo線程

2. 同步釋出和調用服務

2.1. 服務提供者

public class ApiProvider {
    public static void main(String[] args) throws IOException {

        ServiceConfig<GreetingService> serviceServiceConfig = new ServiceConfig<>();
        // 設定服務名稱
        serviceServiceConfig.setApplication(new ApplicationConfig("Api-provider"));
        // 設定注冊中心位址
        RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
        serviceServiceConfig.setRegistry(registryConfig);
        // 設定暴露接口
        serviceServiceConfig.setInterface(GreetingService.class);
        serviceServiceConfig.setRef(new GreetingServiceImpl());

        // 設定版本号和分組 服務接口 + 服務分組 + 服務版本号确定唯一服務
        serviceServiceConfig.setVersion("1.0.0");
        serviceServiceConfig.setGroup("dubbo");

        // 設定線程池政策
//        HashMap<String, String> objectObjectHashMap = Maps.newHashMap();
//        objectObjectHashMap.put("threadpool", "mythreadpool");
//        serviceServiceConfig.setParameters(objectObjectHashMap);
        // 暴露服務
        serviceServiceConfig.export();
        // 挂起線程
        System.out.println("service is start");
        System.in.read();
    }
}
           

2.2 服務消費者

public class ApiConsumer {
    public static void main(String[] args) {
        ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<>();

        // 設定服務名稱
        referenceConfig.setApplication(new ApplicationConfig("Api-consumer"));
        // 設定注冊中心位址
        RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
        referenceConfig.setRegistry(registryConfig);
        // 設定暴露接口
        referenceConfig.setInterface(GreetingService.class);
        referenceConfig.setTimeout(5000);
        // 設定自定義負載均衡政策和叢集容錯政策
//        referenceConfig.setCluster("");
        referenceConfig.setLoadbalance("");
        // 設定版本号和分組 服務接口 + 服務分組 + 服務版本号确定唯一服務
        referenceConfig.setVersion("1.0.0");
        referenceConfig.setGroup("dubbo");

        // 設定線程池政策
//        HashMap<String, String> objectObjectHashMap = Maps.newHashMap();
//        objectObjectHashMap.put("threadpool", "mythreadpool");
//        serviceServiceConfig.setParameters(objectObjectHashMap);
        // 暴露服務
        GreetingService greetingService = referenceConfig.get();
        // 設定隐式參數
        RpcContext.getContext().setAttachment("attribute", "ApiConsumer");
        String hello = greetingService.sayHello("hello");
        System.out.println("hello = " + hello);
    }
}
           

2.3. 運作結果

啟動服務,運作結果如下,可以看到消費者正常列印了結果。

Dubbo筆記 ① : 入門篇一、前言二、Dubbo的服務調用三、泛化調用四、本地服務Mock五、 服務降級

3. 異步釋出和調用服務

這裡我們調用 GreetingService#sayHelloForAsync 和 GreetingService#sayHelloForAsyncByContext方法來完成異步調用。

3.1 服務提供者

異步釋出流程和同步并沒有任何改變。

public class ApiAsyncProvider {
    /**
     * 釋出服務相同,沒有改變
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        ServiceConfig<GreetingService> serviceServiceConfig = new ServiceConfig<>();
        // 設定服務名稱
        serviceServiceConfig.setApplication(new ApplicationConfig("Api-provider"));
        // 設定注冊中心位址
        RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
        serviceServiceConfig.setRegistry(registryConfig);
        // 設定暴露接口
        serviceServiceConfig.setInterface(GreetingService.class);
        serviceServiceConfig.setRef(new GreetingServiceImpl());

        // 設定版本号和分組 服務接口 + 服務分組 + 服務版本号确定唯一服務
        serviceServiceConfig.setVersion("1.0.0");
        serviceServiceConfig.setGroup("dubbo");

        // 設定線程池政策
//        HashMap<String, String> objectObjectHashMap = Maps.newHashMap();
//        objectObjectHashMap.put("threadpool", "mythreadpool");
//        serviceServiceConfig.setParameters(objectObjectHashMap);
        // 暴露服務
        serviceServiceConfig.export();
        // 挂起線程
        System.out.println("service is start");
        System.in.read();
    }
}
           

3.2 服務消費者

異步調用主要有三種方式:

  1. 直接異步調用,從上下文中擷取 CompletableFuture 來擷取結果
  2. 基于 CompletableFuture 的接口實作異步調用
  3. 基于 AsyncContext 實作異步執行。
public class ApiAsyncConsumer {
    public static void main(String[] args) throws InterruptedException {
        ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<>();

        // 設定服務名稱
        referenceConfig.setApplication(new ApplicationConfig("Api-consumer"));
        // 設定注冊中心位址
        RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
        referenceConfig.setRegistry(registryConfig);
        // 設定暴露接口
        referenceConfig.setInterface(GreetingService.class);
        referenceConfig.setTimeout(5000);
        // 設定自定義負載均衡政策和叢集容錯政策
//        referenceConfig.setCluster("");
//        referenceConfig.setLoadbalance("");
        // 設定版本号和分組 服務接口 + 服務分組 + 服務版本号确定唯一服務
        referenceConfig.setVersion("1.0.0");
        referenceConfig.setGroup("dubbo");

        GreetingService greetingService = referenceConfig.get();
        // 異步調用有三種方式
        // 方式一:直接異步調用,等待回調
        asyncOne(referenceConfig, greetingService);
        // 方式二 :通過 CompletableFuture 異步調用
        asyncTwo(greetingService);
        // 方式三 :通過 AsyncContext 異步調用
        asyncThree(greetingService);
        Thread.currentThread().join();
    }

    private static void asyncThree(GreetingService greetingService) {
        String name = greetingService.sayHelloForAsyncByContext("方式3");
        System.out.println("name = " + name);
        System.out.println("over\n\n");
    }

    private static void asyncTwo(GreetingService greetingService) {
        CompletableFuture<String> future = greetingService.sayHelloForAsync("方式2");
        future.whenComplete((s, throwable) -> {
            System.out.println("s = " + s);
            System.out.println("throwable = " + throwable);
        });
    }

    private static void asyncOne(ReferenceConfig<GreetingService> referenceConfig, GreetingService greetingService) {
        // 設定為異步調用,其餘兩種方式不需要設定為true
        referenceConfig.setAsync(true);
        RpcContext.getContext().setAttachment("attribute", "good");
        String hello = greetingService.sayHello("方式1");
        System.out.println("hello = " + hello);
        // 擷取異步調用的結果
        CompletableFuture<String> completableFuture = RpcContext.getContext().getCompletableFuture();
        completableFuture.whenComplete((s, throwable) -> {
            System.out.println("s = " + s);
            System.out.println("throwable = " + throwable);
        });
    }
}
           

個人了解 : 第一種是将業務邏輯第二種和第三種相比,處理業務邏輯使用的是Dubbo 線程,而後面兩種則是使用的使用者自定義的線程池中的線程,可以更快的釋放Dubbo線程

3.3 運作結果

Dubbo筆記 ① : 入門篇一、前言二、Dubbo的服務調用三、泛化調用四、本地服務Mock五、 服務降級

三、泛化調用

泛化接口調用主要在服務消費端沒有API 接口類以及對應的模型類元(比如入參出參都是Pojo類)的情況下,其參數和傳回值中沒有對應的POJO 類,是以所有POJO參數均轉化為Map表示,使用泛化調用時,消費端不需要再引入二方包。

在Dubbo 中,根據序列化方式的不同,分為三種傳回調用,分别為 true、bean 和 nativejava。

public class ApiGenericConsumer {
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        // 1. 設定泛型參數為 GenericService
        ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
        referenceConfig.setApplication(new ApplicationConfig("generic-consumer"));
        referenceConfig.setRegistry(new RegistryConfig("zookeeper://localhost:2181"));
        referenceConfig.setTimeout(5000);
        referenceConfig.setVersion("1.0.0");
        referenceConfig.setGroup("dubbo");
//        genericTrue(referenceConfig);
//        genericBean(referenceConfig);
        genericNativejava(referenceConfig);
    }

    /**
     * generic = true 的方式
     * 輸出:
     * sayHello = GenericServiceImpl :attribute = null name = generic
     * genericResult = {data={"id":"no.1","name":"張三"}, class=pojo.Result}
     *
     * @param referenceConfig
     */
    private static void genericTrue(ReferenceConfig<GenericService> referenceConfig) {
        // 設定為泛化引用,類型為 true。 傳回調用的接口為 api.GenericService
        referenceConfig.setInterface("api.GenericService");
        referenceConfig.setGeneric(Constants.GENERIC_SERIALIZATION_DEFAULT);
        // 使用 GenericService 代替所有接口引用
        GenericService genericService = referenceConfig.get();
        // 簡單的泛型調用 : 基本屬性類型以及Date、List、Map 等不需要轉換,直接調用,如果傳回值為 POJO,則會自動轉化為 Map
        Object sayHello = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{"generic"});
        System.out.println("sayHello = " + sayHello);

        // 入參傳回值都為 POJO 的調用
        Map<String, Object> map = Maps.newHashMap();
        map.put("class", "pojo.Pojo");
        map.put("id", "no.1");
        map.put("name", "張三");
        Object genericResult = genericService.$invoke("seyHelloByGeneric", new String[]{"pojo.Pojo"}, new Object[]{map});
        System.out.println("genericResult = " + genericResult);
    }

    /**
     * generic = bean 的方式
     * 輸出:
     * sayHello = GenericServiceImpl :attribute = null name = genericBean
     * pojo 類型好像無法調用
     *
     * @param referenceConfig
     */
    private static void genericBean(ReferenceConfig<GenericService> referenceConfig) {
        // 設定為泛化引用,類型為 true。 傳回調用的接口為 api.GenericService
        referenceConfig.setInterface("api.GenericService");
        referenceConfig.setGeneric(Constants.GENERIC_SERIALIZATION_BEAN);
        // 使用 GenericService 代替所有接口引用
        GenericService genericService = referenceConfig.get();
        // 簡單的泛型調用 : 基本屬性類型以及Date、List、Map 等不需要轉換,直接調用,如果傳回值為 POJO,則會自動轉化為 Map
        JavaBeanDescriptor genericBean = JavaBeanSerializeUtil.serialize("genericBean");
        Object sayHello = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{genericBean});
        // 因為服務提供方會對傳回結果進行序列化, 是以對結果進行反序列化。
        System.out.println("sayHello = " + JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) sayHello));
    }

    /**
     * generic = nativejava 方式
     * 輸出
     * sayHello = GenericServiceImpl :attribute = null name = genericNativejava
     *
     * @param referenceConfig
     */
    private static void genericNativejava(ReferenceConfig<GenericService> referenceConfig) throws IOException, ClassNotFoundException {
        // 設定為泛化引用,類型為 true。 傳回調用的接口為 api.GenericService
        referenceConfig.setInterface("api.GenericService");
        referenceConfig.setGeneric(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA);
        // 使用 GenericService 代替所有接口引用
        GenericService genericService = referenceConfig.get();
        UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
        // 泛型調用,需要把參數使用Java序列化為二進制資料
        ExtensionLoader.getExtensionLoader(Serialization.class)
                .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                .serialize(null, out).writeObject("genericNativejava");
        // 簡單的泛型調用 : 基本屬性類型以及Date、List、Map 等不需要轉換,直接調用,如果傳回值為 POJO,則會自動轉化為 Map
        JavaBeanDescriptor genericBean = JavaBeanSerializeUtil.serialize("genericBean");
        Object sayHello = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{out.toByteArray()});
        UnsafeByteArrayInputStream in = new UnsafeByteArrayInputStream((byte[]) sayHello);
        Object result = ExtensionLoader.getExtensionLoader(Serialization.class)
                .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                .deserialize(null, in).readObject();
        System.out.println("sayHello = " + result);
    }
}
           

四、本地服務Mock

當遠端服務不可用時,可以通過模拟服務提供者讓消費者測試自己的功能,而不需要發起遠端調用。

要實作Mock功能,首先需要消費端先實作服務端的mock 實作類,需要注意的是Mock實作類必須要符合

接口包名.類名Mock格式

。需要注意的是,在執行Mock服務實作類mock() 方法前,會先發起遠端調用,當遠端服務調用失敗時,才會降級執行mock功能。

開啟mock 功能需要設定 :

// 設定啟動時候不檢查服務是否可用
    referenceConfig.setCheck(false);
    // 設定啟用Mock
    referenceConfig.setMock(true);
           

五、 服務降級

Dubbo 提供了兩種服務降級政策:

  • force:return 政策:該種方式不會再調用服務提供者,而是直接傳回用戶端mock值。
  • fail:return 政策:該種方式會先去嘗試調用服務提供者,若調用失敗,再傳回mock值。

服務消費者需要開啟 mock 功能。

/**
     * 降級政策
     * @param type force 或者 fail
     */
    private static void doDemotion(String type) {
        // 擷取zk 的服務注冊中心工廠
        RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
        // 根據zk位址,擷取具體的zk注冊中心用戶端執行個體
        Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://localhost:2181"));
        // 将降級方案注冊到zk
        registry.register(URL.valueOf("override://0.0.0.0/api.GreetingService?category=configurators&dynamic=false&" +
                "application=Api-consumer&" +
                // 指定降級政策,return+null,表示mock傳回為null
                "mock=" + type + ":return+null" +
                "&group=dubbo&version=1.0.0"));
        // 取消降級方案
//        registry.unregister(URL.valueOf("override://0.0.0.0/api.GreetingService?category=configurators&dynamic=false&" +
//                "application=Api-consumer&" +
//                "mock=" + type + ":return+null" +
//                "&group=dubbo&version=1.0.0"));
    }
           

以上:内容部分參考

《深度剖析Apache Dubbo 核心技術内幕》

https://blog.csdn.net/qq_36882793/article/details/103324692

如有侵擾,聯系删除。 内容僅用于自我記錄學習使用。如有錯誤,歡迎指正