文章目录
- 一、前言
-
- 1. Dubbo 架构
- 二、Dubbo的服务调用
-
- 1. SDK 定义
- 2. 同步发布和调用服务
-
- 2.1. 服务提供者
- 2.2 服务消费者
- 2.3. 运行结果
- 3. 异步发布和调用服务
-
- 3.1 服务提供者
- 3.2 服务消费者
- 3.3 运行结果
- 三、泛化调用
- 四、本地服务Mock
- 五、 服务降级
一、前言
本系列为个人Dubbo学习笔记,内容基于《深度剖析Apache Dubbo 核心技术内幕》, 过程参考官方源码分析文章,仅用于个人笔记记录。本文分析基于Dubbo2.7.0版本,由于个人理解的局限性,若文中不免出现错误,感谢指正。
系列文章地址:Dubbo源码分析:全集整理
Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成。Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。
1. Dubbo 架构

节点 | 角色说明 |
---|---|
Provider | 暴露服务的服务提供方 |
Consumer | 调用远程服务的服务消费方 |
Registry | 服务注册与发现的注册中心 |
Monitor | 统计服务的调用次数和调用时间的监控中心 |
Container | 服务运行容器 |
调用关系说明
- 服务容器负责启动,加载,运行服务提供者。
- 服务提供者在启动时,向注册中心注册自己提供的服务。
- 服务消费者在启动时,向注册中心订阅自己所需的服务。
- 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
- 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
- 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。
更加详细的划分:
这部分内容详参:
https://www.cnblogs.com/barrywxx/p/8528849.html
https://dubbo.apache.org/zh/docs/v2.7/user/preface/architecture/
二、Dubbo的服务调用
关于Dubbo 在 Spring 中的使用请移步 :https://blog.csdn.net/qq_36882793/article/details/103324692。
下面的调用引用基于Main 方法实现。
1. SDK 定义
public interface GreetingService {
/**
* 同步调用sayHello
* @param name
* @return
*/
String sayHello(String name);
/**
* 异步调用 sayHello : 通过 CompletableFuture 方式
* @param name
* @return
*/
CompletableFuture<String> sayHelloForAsync(String name);
/**
* 异步调用 sayHello : 通过 AsyncContext 方式
* @param name
* @return
*/
String sayHelloForAsyncByContext(String name);
}
// 实现类
public class GreetingServiceImpl implements GreetingService {
// 为异步调用提供单独的线程池,避免使用JDK公共线程池(ForkJoinPool,commonPool())
private static final ThreadPoolExecutor POOL = new ThreadPoolExecutor(8, 15, 1,
TimeUnit.MINUTES, new SynchronousQueue<>(), new NamedThreadFactory("thread-pool"), new ThreadPoolExecutor.CallerRunsPolicy());
@Override
public String sayHello(String name) {
String attribute = RpcContext.getContext().getAttachment("attribute");
return sayHello(name, attribute);
}
private String sayHello(String name, String attribute) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "GreetingServiceImpl :attribute = " + attribute + " name = " + name;
}
@Override
public CompletableFuture<String> sayHelloForAsync(String name) {
// 这里如果需要使用上下文需要在这里(Dubbo 线程)获取到上下文
String attribute = RpcContext.getContext().getAttachment("attribute");
return CompletableFuture.supplyAsync(() -> sayHello(name, attribute), POOL);
}
/**
* 1. RpcContext.startAsync(); 开启异步执行,返回一个 asyncContext
* 2. 随后把服务处理任务提交到线程池后sayHelloForAsyncByContext 就直接返回null
* 3. asyncContext.signalContextSwitch(); 切换成Dubbo上下文
* 4. 代码执行结束后 asyncContext.write(sayHello(name, attribute)); 将执行结果写回异步上下文中。
* @param name
* @return
*/
@Override
public String sayHelloForAsyncByContext(String name) {
// 1. 开启异步
AsyncContext asyncContext = RpcContext.startAsync();
// 这里由于处理业务的线程是 POOL 提供,所以Dubbo 线程不会被阻塞
POOL.execute(() -> {
// 如果要使用上下文,则必须要放在第一句执行
asyncContext.signalContextSwitch();
String attribute = RpcContext.getContext().getAttachment("attribute");
// 写回响应信息
asyncContext.write(sayHello(name, attribute));
});
return null;
}
}
对于 sayHello 方法:是Dubbo同步调用时方法。其作用是在睡眠2s后将name 拼接返回。同时通过 RpcContext.getContext().getAttachment(“attribute”); 获取消费者在调用时隐式传递的参数(attribute)。
对于 sayHelloForAsync 方法 :是Dubbo 异步调用时的方法。基于定义CompletableFuture 签名的接口实现异步执行需要接口返回值为 CompletableFuture,并且方法内部使用 CompletableFuture.supplyAsync让本该由Dubbo 内部线程池中的线程处理的任务,转换由业务自定义的线程池中的线程来处理, CompletableFuture.supplyAsync 方法会立刻返回一个CompletableFuture 对象,所以Dubbo内部线程池中的线程会得到及时释放,传递的业务函数则由业务线程池POOL 来进行处理。需要注意的是,调用 sayHelloForAsync 方法的是 Dubbo线程模型线程池的线程,但是其业务处理CompletableFuture.supplyAsync 中的业务逻辑(sayHello(name)) 则是交付在 POOL 线程池中处理。也就是说,1处和2处的线程并非同一个线程。
对于 sayHelloForAsyncByContext 方法 :通过 .RpcContext.startAsync(); 开启异步执行,随后将任务提交给用户线程池执行后直接返回,当用户线程执行时,首先切换上下文,待执行结束后将结果写回 异步上下文。
个人理解 : 第一种是将业务逻辑第二种和第三种相比,处理业务逻辑使用的是Dubbo 线程,而后面两种则是使用的用户自定义的线程池中的线程,可以更快的释放Dubbo线程
2. 同步发布和调用服务
2.1. 服务提供者
public class ApiProvider {
public static void main(String[] args) throws IOException {
ServiceConfig<GreetingService> serviceServiceConfig = new ServiceConfig<>();
// 设置服务名称
serviceServiceConfig.setApplication(new ApplicationConfig("Api-provider"));
// 设置注册中心地址
RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
serviceServiceConfig.setRegistry(registryConfig);
// 设置暴露接口
serviceServiceConfig.setInterface(GreetingService.class);
serviceServiceConfig.setRef(new GreetingServiceImpl());
// 设置版本号和分组 服务接口 + 服务分组 + 服务版本号确定唯一服务
serviceServiceConfig.setVersion("1.0.0");
serviceServiceConfig.setGroup("dubbo");
// 设置线程池策略
// HashMap<String, String> objectObjectHashMap = Maps.newHashMap();
// objectObjectHashMap.put("threadpool", "mythreadpool");
// serviceServiceConfig.setParameters(objectObjectHashMap);
// 暴露服务
serviceServiceConfig.export();
// 挂起线程
System.out.println("service is start");
System.in.read();
}
}
2.2 服务消费者
public class ApiConsumer {
public static void main(String[] args) {
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<>();
// 设置服务名称
referenceConfig.setApplication(new ApplicationConfig("Api-consumer"));
// 设置注册中心地址
RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
referenceConfig.setRegistry(registryConfig);
// 设置暴露接口
referenceConfig.setInterface(GreetingService.class);
referenceConfig.setTimeout(5000);
// 设置自定义负载均衡策略和集群容错策略
// referenceConfig.setCluster("");
referenceConfig.setLoadbalance("");
// 设置版本号和分组 服务接口 + 服务分组 + 服务版本号确定唯一服务
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
// 设置线程池策略
// HashMap<String, String> objectObjectHashMap = Maps.newHashMap();
// objectObjectHashMap.put("threadpool", "mythreadpool");
// serviceServiceConfig.setParameters(objectObjectHashMap);
// 暴露服务
GreetingService greetingService = referenceConfig.get();
// 设置隐式参数
RpcContext.getContext().setAttachment("attribute", "ApiConsumer");
String hello = greetingService.sayHello("hello");
System.out.println("hello = " + hello);
}
}
2.3. 运行结果
启动服务,运行结果如下,可以看到消费者正常打印了结果。
3. 异步发布和调用服务
这里我们调用 GreetingService#sayHelloForAsync 和 GreetingService#sayHelloForAsyncByContext方法来完成异步调用。
3.1 服务提供者
异步发布流程和同步并没有任何改变。
public class ApiAsyncProvider {
/**
* 发布服务相同,没有改变
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
ServiceConfig<GreetingService> serviceServiceConfig = new ServiceConfig<>();
// 设置服务名称
serviceServiceConfig.setApplication(new ApplicationConfig("Api-provider"));
// 设置注册中心地址
RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
serviceServiceConfig.setRegistry(registryConfig);
// 设置暴露接口
serviceServiceConfig.setInterface(GreetingService.class);
serviceServiceConfig.setRef(new GreetingServiceImpl());
// 设置版本号和分组 服务接口 + 服务分组 + 服务版本号确定唯一服务
serviceServiceConfig.setVersion("1.0.0");
serviceServiceConfig.setGroup("dubbo");
// 设置线程池策略
// HashMap<String, String> objectObjectHashMap = Maps.newHashMap();
// objectObjectHashMap.put("threadpool", "mythreadpool");
// serviceServiceConfig.setParameters(objectObjectHashMap);
// 暴露服务
serviceServiceConfig.export();
// 挂起线程
System.out.println("service is start");
System.in.read();
}
}
3.2 服务消费者
异步调用主要有三种方式:
- 直接异步调用,从上下文中获取 CompletableFuture 来获取结果
- 基于 CompletableFuture 的接口实现异步调用
- 基于 AsyncContext 实现异步执行。
public class ApiAsyncConsumer {
public static void main(String[] args) throws InterruptedException {
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<>();
// 设置服务名称
referenceConfig.setApplication(new ApplicationConfig("Api-consumer"));
// 设置注册中心地址
RegistryConfig registryConfig = new RegistryConfig("zookeeper://localhost:2181");
referenceConfig.setRegistry(registryConfig);
// 设置暴露接口
referenceConfig.setInterface(GreetingService.class);
referenceConfig.setTimeout(5000);
// 设置自定义负载均衡策略和集群容错策略
// referenceConfig.setCluster("");
// referenceConfig.setLoadbalance("");
// 设置版本号和分组 服务接口 + 服务分组 + 服务版本号确定唯一服务
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
GreetingService greetingService = referenceConfig.get();
// 异步调用有三种方式
// 方式一:直接异步调用,等待回调
asyncOne(referenceConfig, greetingService);
// 方式二 :通过 CompletableFuture 异步调用
asyncTwo(greetingService);
// 方式三 :通过 AsyncContext 异步调用
asyncThree(greetingService);
Thread.currentThread().join();
}
private static void asyncThree(GreetingService greetingService) {
String name = greetingService.sayHelloForAsyncByContext("方式3");
System.out.println("name = " + name);
System.out.println("over\n\n");
}
private static void asyncTwo(GreetingService greetingService) {
CompletableFuture<String> future = greetingService.sayHelloForAsync("方式2");
future.whenComplete((s, throwable) -> {
System.out.println("s = " + s);
System.out.println("throwable = " + throwable);
});
}
private static void asyncOne(ReferenceConfig<GreetingService> referenceConfig, GreetingService greetingService) {
// 设置为异步调用,其余两种方式不需要设置为true
referenceConfig.setAsync(true);
RpcContext.getContext().setAttachment("attribute", "good");
String hello = greetingService.sayHello("方式1");
System.out.println("hello = " + hello);
// 获取异步调用的结果
CompletableFuture<String> completableFuture = RpcContext.getContext().getCompletableFuture();
completableFuture.whenComplete((s, throwable) -> {
System.out.println("s = " + s);
System.out.println("throwable = " + throwable);
});
}
}
个人理解 : 第一种是将业务逻辑第二种和第三种相比,处理业务逻辑使用的是Dubbo 线程,而后面两种则是使用的用户自定义的线程池中的线程,可以更快的释放Dubbo线程
3.3 运行结果
三、泛化调用
泛化接口调用主要在服务消费端没有API 接口类以及对应的模型类元(比如入参出参都是Pojo类)的情况下,其参数和返回值中没有对应的POJO 类,所以所有POJO参数均转化为Map表示,使用泛化调用时,消费端不需要再引入二方包。
在Dubbo 中,根据序列化方式的不同,分为三种返回调用,分别为 true、bean 和 nativejava。
public class ApiGenericConsumer {
public static void main(String[] args) throws IOException, ClassNotFoundException {
// 1. 设置泛型参数为 GenericService
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(new ApplicationConfig("generic-consumer"));
referenceConfig.setRegistry(new RegistryConfig("zookeeper://localhost:2181"));
referenceConfig.setTimeout(5000);
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
// genericTrue(referenceConfig);
// genericBean(referenceConfig);
genericNativejava(referenceConfig);
}
/**
* generic = true 的方式
* 输出:
* sayHello = GenericServiceImpl :attribute = null name = generic
* genericResult = {data={"id":"no.1","name":"张三"}, class=pojo.Result}
*
* @param referenceConfig
*/
private static void genericTrue(ReferenceConfig<GenericService> referenceConfig) {
// 设置为泛化引用,类型为 true。 返回调用的接口为 api.GenericService
referenceConfig.setInterface("api.GenericService");
referenceConfig.setGeneric(Constants.GENERIC_SERIALIZATION_DEFAULT);
// 使用 GenericService 代替所有接口引用
GenericService genericService = referenceConfig.get();
// 简单的泛型调用 : 基本属性类型以及Date、List、Map 等不需要转换,直接调用,如果返回值为 POJO,则会自动转化为 Map
Object sayHello = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{"generic"});
System.out.println("sayHello = " + sayHello);
// 入参返回值都为 POJO 的调用
Map<String, Object> map = Maps.newHashMap();
map.put("class", "pojo.Pojo");
map.put("id", "no.1");
map.put("name", "张三");
Object genericResult = genericService.$invoke("seyHelloByGeneric", new String[]{"pojo.Pojo"}, new Object[]{map});
System.out.println("genericResult = " + genericResult);
}
/**
* generic = bean 的方式
* 输出:
* sayHello = GenericServiceImpl :attribute = null name = genericBean
* pojo 类型好像无法调用
*
* @param referenceConfig
*/
private static void genericBean(ReferenceConfig<GenericService> referenceConfig) {
// 设置为泛化引用,类型为 true。 返回调用的接口为 api.GenericService
referenceConfig.setInterface("api.GenericService");
referenceConfig.setGeneric(Constants.GENERIC_SERIALIZATION_BEAN);
// 使用 GenericService 代替所有接口引用
GenericService genericService = referenceConfig.get();
// 简单的泛型调用 : 基本属性类型以及Date、List、Map 等不需要转换,直接调用,如果返回值为 POJO,则会自动转化为 Map
JavaBeanDescriptor genericBean = JavaBeanSerializeUtil.serialize("genericBean");
Object sayHello = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{genericBean});
// 因为服务提供方会对返回结果进行序列化, 所以对结果进行反序列化。
System.out.println("sayHello = " + JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) sayHello));
}
/**
* generic = nativejava 方式
* 输出
* sayHello = GenericServiceImpl :attribute = null name = genericNativejava
*
* @param referenceConfig
*/
private static void genericNativejava(ReferenceConfig<GenericService> referenceConfig) throws IOException, ClassNotFoundException {
// 设置为泛化引用,类型为 true。 返回调用的接口为 api.GenericService
referenceConfig.setInterface("api.GenericService");
referenceConfig.setGeneric(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA);
// 使用 GenericService 代替所有接口引用
GenericService genericService = referenceConfig.get();
UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
// 泛型调用,需要把参数使用Java序列化为二进制数据
ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.serialize(null, out).writeObject("genericNativejava");
// 简单的泛型调用 : 基本属性类型以及Date、List、Map 等不需要转换,直接调用,如果返回值为 POJO,则会自动转化为 Map
JavaBeanDescriptor genericBean = JavaBeanSerializeUtil.serialize("genericBean");
Object sayHello = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{out.toByteArray()});
UnsafeByteArrayInputStream in = new UnsafeByteArrayInputStream((byte[]) sayHello);
Object result = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, in).readObject();
System.out.println("sayHello = " + result);
}
}
四、本地服务Mock
当远程服务不可用时,可以通过模拟服务提供者让消费者测试自己的功能,而不需要发起远程调用。
要实现Mock功能,首先需要消费端先实现服务端的mock 实现类,需要注意的是Mock实现类必须要符合
接口包名.类名Mock格式
。需要注意的是,在执行Mock服务实现类mock() 方法前,会先发起远程调用,当远程服务调用失败时,才会降级执行mock功能。
开启mock 功能需要设置 :
// 设置启动时候不检查服务是否可用
referenceConfig.setCheck(false);
// 设置启用Mock
referenceConfig.setMock(true);
五、 服务降级
Dubbo 提供了两种服务降级策略:
- force:return 策略:该种方式不会再调用服务提供者,而是直接返回客户端mock值。
- fail:return 策略:该种方式会先去尝试调用服务提供者,若调用失败,再返回mock值。
服务消费者需要开启 mock 功能。
/**
* 降级策略
* @param type force 或者 fail
*/
private static void doDemotion(String type) {
// 获取zk 的服务注册中心工厂
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
// 根据zk地址,获取具体的zk注册中心客户端实例
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://localhost:2181"));
// 将降级方案注册到zk
registry.register(URL.valueOf("override://0.0.0.0/api.GreetingService?category=configurators&dynamic=false&" +
"application=Api-consumer&" +
// 指定降级策略,return+null,表示mock返回为null
"mock=" + type + ":return+null" +
"&group=dubbo&version=1.0.0"));
// 取消降级方案
// registry.unregister(URL.valueOf("override://0.0.0.0/api.GreetingService?category=configurators&dynamic=false&" +
// "application=Api-consumer&" +
// "mock=" + type + ":return+null" +
// "&group=dubbo&version=1.0.0"));
}
以上:内容部分参考
《深度剖析Apache Dubbo 核心技术内幕》
https://blog.csdn.net/qq_36882793/article/details/103324692
如有侵扰,联系删除。 内容仅用于自我记录学习使用。如有错误,欢迎指正