文章目錄
- 一、前言
-
- 1. Dubbo 架構
- 二、Dubbo的服務調用
-
- 1. SDK 定義
- 2. 同步釋出和調用服務
-
- 2.1. 服務提供者
- 2.2 服務消費者
- 2.3. 運作結果
- 3. 異步釋出和調用服務
-
- 3.1 服務提供者
- 3.2 服務消費者
- 3.3 運作結果
- 三、泛化調用
- 四、本地服務Mock
- 五、 服務降級
一、前言
本系列為個人Dubbo學習筆記,内容基于《深度剖析Apache Dubbo 核心技術内幕》, 過程參考官方源碼分析文章,僅用于個人筆記記錄。本文分析基于Dubbo2.7.0版本,由于個人了解的局限性,若文中不免出現錯誤,感謝指正。
系列文章位址:Dubbo源碼分析:全集整理
Dubbo是阿裡巴巴公司開源的一個高性能優秀的服務架構,使得應用可通過高性能的 RPC 實作服務的輸出和輸入功能,可以和 Spring架構無縫內建。Dubbo是一款高性能、輕量級的開源Java RPC架構,它提供了三大核心能力:面向接口的遠端方法調用,智能容錯和負載均衡,以及服務自動注冊和發現。
1. Dubbo 架構

節點 | 角色說明 |
---|---|
Provider | 暴露服務的服務提供方 |
Consumer | 調用遠端服務的服務消費方 |
Registry | 服務注冊與發現的注冊中心 |
Monitor | 統計服務的調用次數和調用時間的監控中心 |
Container | 服務運作容器 |
調用關系說明
- 服務容器負責啟動,加載,運作服務提供者。
- 服務提供者在啟動時,向注冊中心注冊自己提供的服務。
- 服務消費者在啟動時,向注冊中心訂閱自己所需的服務。
- 注冊中心傳回服務提供者位址清單給消費者,如果有變更,注冊中心将基于長連接配接推送變更資料給消費者。
- 服務消費者,從提供者位址清單中,基于軟負載均衡算法,選一台提供者進行調用,如果調用失敗,再選另一台調用。
- 服務消費者和提供者,在記憶體中累計調用次數和調用時間,定時每分鐘發送一次統計資料到監控中心。
更加詳細的劃分:
這部分内容詳參:
https://www.cnblogs.com/barrywxx/p/8528849.html
https://dubbo.apache.org/zh/docs/v2.7/user/preface/architecture/
二、Dubbo的服務調用
關于Dubbo 在 Spring 中的使用請移步 :https://blog.csdn.net/qq_36882793/article/details/103324692。
下面的調用引用基于Main 方法實作。
1. SDK 定義
public interface GreetingService {
/**
* 同步調用sayHello
* @param name
* @return
*/
String sayHello(String name);
/**
* 異步調用 sayHello : 通過 CompletableFuture 方式
* @param name
* @return
*/
CompletableFuture<String> sayHelloForAsync(String name);
/**
* 異步調用 sayHello : 通過 AsyncContext 方式
* @param name
* @return
*/
String sayHelloForAsyncByContext(String name);
}
// 實作類
public class GreetingServiceImpl implements GreetingService {
// 為異步調用提供單獨的線程池,避免使用JDK公共線程池(ForkJoinPool,commonPool())
private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(8, 15, 1,
TimeUnit.MINUTES, new SynchronousQueue<>(), new NamedThreadFactory("thread-pool"), new ThreadPoolExecutor.CallerRunsPolicy());
@Override
public String sayHello(String name) {
String attribute = RpcContext.getContext().getAttachment("attribute");
return sayHello(name, attribute);
}
private String sayHello(String name, String attribute) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "GreetingServiceImpl :attribute = " + attribute + " name = " + name;
}
@Override
public CompletableFuture<String> sayHelloForAsync(String name) {
// 這裡如果需要使用上下文需要在這裡(Dubbo 線程)擷取到上下文
String attribute = RpcContext.getContext().getAttachment("attribute");
return CompletableFuture.supplyAsync(() -> sayHello(name, attribute), POOL);
}
/**
* 1. RpcContext.startAsync(); 開啟異步執行,傳回一個 asyncContext
* 2. 随後把服務處理任務送出到線程池後sayHelloForAsyncByContext 就直接傳回null
* 3. asyncContext.signalContextSwitch(); 切換成Dubbo上下文
* 4. 代碼執行結束後 asyncContext.write(sayHello(name, attribute)); 将執行結果寫回異步上下文中。
* @param name
* @return
*/
@Override
public String sayHelloForAsyncByContext(String name) {
// 1. 開啟異步
AsyncContext asyncContext = RpcContext.startAsync();
// 這裡由于處理業務的線程是 POOL 提供,是以Dubbo 線程不會被阻塞
POOL.execute(() -> {
// 如果要使用上下文,則必須要放在第一句執行
asyncContext.signalContextSwitch();
String attribute = RpcContext.getContext().getAttachment("attribute");
// 寫回響應資訊
asyncContext.write(sayHello(name, attribute));
});
return null;
}
}
對于 sayHello 方法:是Dubbo同步調用時方法。其作用是在睡眠2s後将name 拼接傳回。同時通過 RpcContext.getContext().getAttachment(“attribute”); 擷取消費者在調用時隐式傳遞的參數(attribute)。
對于 sayHelloForAsync 方法 :是Dubbo 異步調用時的方法。基于定義CompletableFuture 簽名的接口實作異步執行需要接口傳回值為 CompletableFuture,并且方法内部使用 CompletableFuture.supplyAsync讓本該由Dubbo 内部線程池中的線程處理的任務,轉換由業務自定義的線程池中的線程來處理, CompletableFuture.supplyAsync 方法會立刻傳回一個CompletableFuture 對象,是以Dubbo内部線程池中的線程會得到及時釋放,傳遞的業務函數則由業務線程池POOL 來進行處理。需要注意的是,調用 sayHelloForAsync 方法的是 Dubbo線程模型線程池的線程,但是其業務處理CompletableFuture.supplyAsync 中的業務邏輯(sayHello(name)) 則是傳遞在 POOL 線程池中處理。也就是說,1處和2處的線程并非同一個線程。
對于 sayHelloForAsyncByContext 方法 :通過 .RpcContext.startAsync(); 開啟異步執行,随後将任務送出給使用者線程池執行後直接傳回,當使用者線程執行時,首先切換上下文,待執行結束後将結果寫回 異步上下文。
個人了解 : 第一種是将業務邏輯第二種和第三種相比,處理業務邏輯使用的是Dubbo 線程,而後面兩種則是使用的使用者自定義的線程池中的線程,可以更快的釋放Dubbo線程
2. 同步釋出和調用服務
2.1. 服務提供者
public class ApiProvider {
public static void main(String[] args) throws IOException {
ServiceConfig<GreetingService> serviceServiceConfig = new ServiceConfig<>();
// 設定服務名稱
serviceServiceConfig.setApplication(new ApplicationConfig("Api-provider"));
// 設定注冊中心位址
RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
serviceServiceConfig.setRegistry(registryConfig);
// 設定暴露接口
serviceServiceConfig.setInterface(GreetingService.class);
serviceServiceConfig.setRef(new GreetingServiceImpl());
// 設定版本号和分組 服務接口 + 服務分組 + 服務版本号确定唯一服務
serviceServiceConfig.setVersion("1.0.0");
serviceServiceConfig.setGroup("dubbo");
// 設定線程池政策
// HashMap<String, String> objectObjectHashMap = Maps.newHashMap();
// objectObjectHashMap.put("threadpool", "mythreadpool");
// serviceServiceConfig.setParameters(objectObjectHashMap);
// 暴露服務
serviceServiceConfig.export();
// 挂起線程
System.out.println("service is start");
System.in.read();
}
}
2.2 服務消費者
public class ApiConsumer {
public static void main(String[] args) {
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<>();
// 設定服務名稱
referenceConfig.setApplication(new ApplicationConfig("Api-consumer"));
// 設定注冊中心位址
RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
referenceConfig.setRegistry(registryConfig);
// 設定暴露接口
referenceConfig.setInterface(GreetingService.class);
referenceConfig.setTimeout(5000);
// 設定自定義負載均衡政策和叢集容錯政策
// referenceConfig.setCluster("");
referenceConfig.setLoadbalance("");
// 設定版本号和分組 服務接口 + 服務分組 + 服務版本号确定唯一服務
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
// 設定線程池政策
// HashMap<String, String> objectObjectHashMap = Maps.newHashMap();
// objectObjectHashMap.put("threadpool", "mythreadpool");
// serviceServiceConfig.setParameters(objectObjectHashMap);
// 暴露服務
GreetingService greetingService = referenceConfig.get();
// 設定隐式參數
RpcContext.getContext().setAttachment("attribute", "ApiConsumer");
String hello = greetingService.sayHello("hello");
System.out.println("hello = " + hello);
}
}
2.3. 運作結果
啟動服務,運作結果如下,可以看到消費者正常列印了結果。
3. 異步釋出和調用服務
這裡我們調用 GreetingService#sayHelloForAsync 和 GreetingService#sayHelloForAsyncByContext方法來完成異步調用。
3.1 服務提供者
異步釋出流程和同步并沒有任何改變。
public class ApiAsyncProvider {
/**
* 釋出服務相同,沒有改變
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
ServiceConfig<GreetingService> serviceServiceConfig = new ServiceConfig<>();
// 設定服務名稱
serviceServiceConfig.setApplication(new ApplicationConfig("Api-provider"));
// 設定注冊中心位址
RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
serviceServiceConfig.setRegistry(registryConfig);
// 設定暴露接口
serviceServiceConfig.setInterface(GreetingService.class);
serviceServiceConfig.setRef(new GreetingServiceImpl());
// 設定版本号和分組 服務接口 + 服務分組 + 服務版本号确定唯一服務
serviceServiceConfig.setVersion("1.0.0");
serviceServiceConfig.setGroup("dubbo");
// 設定線程池政策
// HashMap<String, String> objectObjectHashMap = Maps.newHashMap();
// objectObjectHashMap.put("threadpool", "mythreadpool");
// serviceServiceConfig.setParameters(objectObjectHashMap);
// 暴露服務
serviceServiceConfig.export();
// 挂起線程
System.out.println("service is start");
System.in.read();
}
}
3.2 服務消費者
異步調用主要有三種方式:
- 直接異步調用,從上下文中擷取 CompletableFuture 來擷取結果
- 基于 CompletableFuture 的接口實作異步調用
- 基于 AsyncContext 實作異步執行。
public class ApiAsyncConsumer {
public static void main(String[] args) throws InterruptedException {
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<>();
// 設定服務名稱
referenceConfig.setApplication(new ApplicationConfig("Api-consumer"));
// 設定注冊中心位址
RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
referenceConfig.setRegistry(registryConfig);
// 設定暴露接口
referenceConfig.setInterface(GreetingService.class);
referenceConfig.setTimeout(5000);
// 設定自定義負載均衡政策和叢集容錯政策
// referenceConfig.setCluster("");
// referenceConfig.setLoadbalance("");
// 設定版本号和分組 服務接口 + 服務分組 + 服務版本号确定唯一服務
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
GreetingService greetingService = referenceConfig.get();
// 異步調用有三種方式
// 方式一:直接異步調用,等待回調
asyncOne(referenceConfig, greetingService);
// 方式二 :通過 CompletableFuture 異步調用
asyncTwo(greetingService);
// 方式三 :通過 AsyncContext 異步調用
asyncThree(greetingService);
Thread.currentThread().join();
}
private static void asyncThree(GreetingService greetingService) {
String name = greetingService.sayHelloForAsyncByContext("方式3");
System.out.println("name = " + name);
System.out.println("over\n\n");
}
private static void asyncTwo(GreetingService greetingService) {
CompletableFuture<String> future = greetingService.sayHelloForAsync("方式2");
future.whenComplete((s, throwable) -> {
System.out.println("s = " + s);
System.out.println("throwable = " + throwable);
});
}
private static void asyncOne(ReferenceConfig<GreetingService> referenceConfig, GreetingService greetingService) {
// 設定為異步調用,其餘兩種方式不需要設定為true
referenceConfig.setAsync(true);
RpcContext.getContext().setAttachment("attribute", "good");
String hello = greetingService.sayHello("方式1");
System.out.println("hello = " + hello);
// 擷取異步調用的結果
CompletableFuture<String> completableFuture = RpcContext.getContext().getCompletableFuture();
completableFuture.whenComplete((s, throwable) -> {
System.out.println("s = " + s);
System.out.println("throwable = " + throwable);
});
}
}
個人了解 : 第一種是将業務邏輯第二種和第三種相比,處理業務邏輯使用的是Dubbo 線程,而後面兩種則是使用的使用者自定義的線程池中的線程,可以更快的釋放Dubbo線程
3.3 運作結果
三、泛化調用
泛化接口調用主要在服務消費端沒有API 接口類以及對應的模型類元(比如入參出參都是Pojo類)的情況下,其參數和傳回值中沒有對應的POJO 類,是以所有POJO參數均轉化為Map表示,使用泛化調用時,消費端不需要再引入二方包。
在Dubbo 中,根據序列化方式的不同,分為三種傳回調用,分别為 true、bean 和 nativejava。
public class ApiGenericConsumer {
public static void main(String[] args) throws IOException, ClassNotFoundException {
// 1. 設定泛型參數為 GenericService
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(new ApplicationConfig("generic-consumer"));
referenceConfig.setRegistry(new RegistryConfig("zookeeper://localhost:2181"));
referenceConfig.setTimeout(5000);
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
// genericTrue(referenceConfig);
// genericBean(referenceConfig);
genericNativejava(referenceConfig);
}
/**
* generic = true 的方式
* 輸出:
* sayHello = GenericServiceImpl :attribute = null name = generic
* genericResult = {data={"id":"no.1","name":"張三"}, class=pojo.Result}
*
* @param referenceConfig
*/
private static void genericTrue(ReferenceConfig<GenericService> referenceConfig) {
// 設定為泛化引用,類型為 true。 傳回調用的接口為 api.GenericService
referenceConfig.setInterface("api.GenericService");
referenceConfig.setGeneric(Constants.GENERIC_SERIALIZATION_DEFAULT);
// 使用 GenericService 代替所有接口引用
GenericService genericService = referenceConfig.get();
// 簡單的泛型調用 : 基本屬性類型以及Date、List、Map 等不需要轉換,直接調用,如果傳回值為 POJO,則會自動轉化為 Map
Object sayHello = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{"generic"});
System.out.println("sayHello = " + sayHello);
// 入參傳回值都為 POJO 的調用
Map<String, Object> map = Maps.newHashMap();
map.put("class", "pojo.Pojo");
map.put("id", "no.1");
map.put("name", "張三");
Object genericResult = genericService.$invoke("seyHelloByGeneric", new String[]{"pojo.Pojo"}, new Object[]{map});
System.out.println("genericResult = " + genericResult);
}
/**
* generic = bean 的方式
* 輸出:
* sayHello = GenericServiceImpl :attribute = null name = genericBean
* pojo 類型好像無法調用
*
* @param referenceConfig
*/
private static void genericBean(ReferenceConfig<GenericService> referenceConfig) {
// 設定為泛化引用,類型為 true。 傳回調用的接口為 api.GenericService
referenceConfig.setInterface("api.GenericService");
referenceConfig.setGeneric(Constants.GENERIC_SERIALIZATION_BEAN);
// 使用 GenericService 代替所有接口引用
GenericService genericService = referenceConfig.get();
// 簡單的泛型調用 : 基本屬性類型以及Date、List、Map 等不需要轉換,直接調用,如果傳回值為 POJO,則會自動轉化為 Map
JavaBeanDescriptor genericBean = JavaBeanSerializeUtil.serialize("genericBean");
Object sayHello = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{genericBean});
// 因為服務提供方會對傳回結果進行序列化, 是以對結果進行反序列化。
System.out.println("sayHello = " + JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) sayHello));
}
/**
* generic = nativejava 方式
* 輸出
* sayHello = GenericServiceImpl :attribute = null name = genericNativejava
*
* @param referenceConfig
*/
private static void genericNativejava(ReferenceConfig<GenericService> referenceConfig) throws IOException, ClassNotFoundException {
// 設定為泛化引用,類型為 true。 傳回調用的接口為 api.GenericService
referenceConfig.setInterface("api.GenericService");
referenceConfig.setGeneric(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA);
// 使用 GenericService 代替所有接口引用
GenericService genericService = referenceConfig.get();
UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
// 泛型調用,需要把參數使用Java序列化為二進制資料
ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.serialize(null, out).writeObject("genericNativejava");
// 簡單的泛型調用 : 基本屬性類型以及Date、List、Map 等不需要轉換,直接調用,如果傳回值為 POJO,則會自動轉化為 Map
JavaBeanDescriptor genericBean = JavaBeanSerializeUtil.serialize("genericBean");
Object sayHello = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{out.toByteArray()});
UnsafeByteArrayInputStream in = new UnsafeByteArrayInputStream((byte[]) sayHello);
Object result = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, in).readObject();
System.out.println("sayHello = " + result);
}
}
四、本地服務Mock
當遠端服務不可用時,可以通過模拟服務提供者讓消費者測試自己的功能,而不需要發起遠端調用。
要實作Mock功能,首先需要消費端先實作服務端的mock 實作類,需要注意的是Mock實作類必須要符合
接口包名.類名Mock格式
。需要注意的是,在執行Mock服務實作類mock() 方法前,會先發起遠端調用,當遠端服務調用失敗時,才會降級執行mock功能。
開啟mock 功能需要設定 :
// 設定啟動時候不檢查服務是否可用
referenceConfig.setCheck(false);
// 設定啟用Mock
referenceConfig.setMock(true);
五、 服務降級
Dubbo 提供了兩種服務降級政策:
- force:return 政策:該種方式不會再調用服務提供者,而是直接傳回用戶端mock值。
- fail:return 政策:該種方式會先去嘗試調用服務提供者,若調用失敗,再傳回mock值。
服務消費者需要開啟 mock 功能。
/**
* 降級政策
* @param type force 或者 fail
*/
private static void doDemotion(String type) {
// 擷取zk 的服務注冊中心工廠
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
// 根據zk位址,擷取具體的zk注冊中心用戶端執行個體
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://localhost:2181"));
// 将降級方案注冊到zk
registry.register(URL.valueOf("override://0.0.0.0/api.GreetingService?category=configurators&dynamic=false&" +
"application=Api-consumer&" +
// 指定降級政策,return+null,表示mock傳回為null
"mock=" + type + ":return+null" +
"&group=dubbo&version=1.0.0"));
// 取消降級方案
// registry.unregister(URL.valueOf("override://0.0.0.0/api.GreetingService?category=configurators&dynamic=false&" +
// "application=Api-consumer&" +
// "mock=" + type + ":return+null" +
// "&group=dubbo&version=1.0.0"));
}
以上:内容部分參考
《深度剖析Apache Dubbo 核心技術内幕》
https://blog.csdn.net/qq_36882793/article/details/103324692
如有侵擾,聯系删除。 内容僅用于自我記錄學習使用。如有錯誤,歡迎指正