dubbo
-
- dubbo配置項
- dubbo spi
- 負載均衡政策
- 異步調用
- dubbo線程池
- 路由規則
dubbo 是一款高性能,輕量級rpc架構,可以和spring內建
三大核心能力:
面向接口的遠端方法調用
智能容錯和負載均衡
服務自動注冊和發現
其他特性:
高度可擴充能力 :協定,傳輸,序列化都被設計成擴充點
運作期流浪排程:配置路由規則實作灰階釋出等功能
可視化服務治理和運維
dubbo配置項
<dubbo:application name="service-consumer" >
<dubbo:parameter key="qos.enable" value="true" ></dubbo:parameter>
<dubbo:parameter key="qos.port" value="22222"></dubbo:parameter>
<dubbo:parameter key="qos.accept.foreign.ip" value="true" ></dubbo:parameter>
</dubbo:application>
<!-- -->
<dubbo:consumer timeout="2000" check="false" ></dubbo:consumer>
<!-- 使用zookeeper注冊中心暴露發現服務位址 -->
<dubbo:registry address="zookeeper://127.0.0.1:2182" timeout="10000"/>
//...省略
dubbo:application
對應 org.apache.dubbo.config.ApplicationConfig類,代表目前應用的資訊
name: 目前應用程式的名稱。
owner: 目前應用程式的負責人。
qosEnable : 是否啟動QoS 預設true(qos:線上運維dubbo 通過telnet 線上運維dubbo)
qosPort : 啟動QoS綁定的端口 預設22222
qosAcceptForeignIp: 是否允許遠端通路 預設是false
dubbo:registry
org.apache.dubbo.config.RegistryConfig, 代表該子產品所使用的注冊中心。一個子產品中的服務可以将 其注冊到多個注冊中心上,也可以注冊到一個上。在service和reference也會引入這個注冊中心。
id : 當目前服務中provider或者consumer中存在多個注冊中心時,則使用需要增加該配置。在一 些公司,會通過業務線的不同選擇不同的注冊中心,是以一般都會配置該值。
address : 目前注冊中心的通路位址。
protocol : 目前注冊中心所使用的協定是什麼。也可以直接在 address 中寫入,比如使用 zookeeper,就可以寫成 zookeeper://xx.xx.xx.xx:2181
timeout : 當與注冊中心不再同一個機房時,大多會把該參數延長。
dubbo:protocol
org.apache.dubbo.config.ProtocolConfig, 指定服務在進行資料傳輸所使用的協定。
id : 在大公司,可能因為各個部門技術棧不同,是以可能會選擇使用不同的協定進行互動。這裡
在多個協定使用時,需要指定。
name : 指定協定名稱。預設使用 dubbo 。
dubbo:service
org.apache.dubbo.config.ServiceConfig, 服務提供者,用于指定目前需要對外暴露的服務資訊,和dubbo:reference 大緻相同。
interface : 指定目前需要進行對外暴露的接口是什麼。
ref : 具體實作對象的引用,一般我們在生産級别都是使用Spring去進行Bean托管的,是以這裡面 一般也指的是Spring中的BeanId。
version : 對外暴露的版本号。不同的版本号,消費者在消費的時候隻會根據固定的版本号進行消費。
executes: 最大的并行度。
可能導緻叢集功能無法充分利用或者堵塞
但是也可以啟動部分對應用的保護功能
可以不做配置,結合後面的熔斷限流使用
dubbo:reference
org.apache.dubbo.config.ReferenceConfig, 消費者的配置
id : 指定該Bean在注冊到Spring中的id。
interface: 服務接口名
version : 指定目前服務版本,與服務提供者的版本一緻。
registry : 指定所具體使用的注冊中心位址。這裡面也就是使用上面在 dubbo:registry 中所聲明的id。
mock: 用于在方法調用出現錯誤時,當做服務降級來統一對外傳回結果。(dubbo:service也有)
timeout: 用于指定目前方法或者接口中所有方法的逾時時間。(dubbo:service也有)
retries: 用于指定目前服務在執行時出現錯誤或者逾時時的重試機制。(dubbo:service也有)
注意提供者是否有幂等,否則可能出現資料一緻性問題
注意提供者是否有類似緩存機制,如出現大面積錯誤時,可能因為不停重試導緻雪崩
dubbo:method
org.apache.dubbo.config.MethodConfig, 用于在制定的 dubbo:service 或者 dubbo:reference 中的 更具體一個層級,指定具體方法級别在進行RPC操作時候的配置,針對于具體方法的特殊處理。
name : 指定方法名稱,用于對這個方法名稱的RPC調用進行特殊配置。
async: 是否異步 預設false
dubbo spi
jdk spi
dubbo的spi和jdk的spi差不多
maven引入dubbo的依賴
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.5</version>
</dependency>
</dependencies>
//接口上要加spi注解
@SPI("human")
public interface HelloService {
String sayHello();
}
resources檔案下的META-INF/services/+接口路徑 要寫成META-INF/dubbo/+接口路徑
内容建議寫成key=value的格式,和jdkspi一樣隻寫com.service.impl.HumanHelloService也可以隻是不利于擴充
human=com.service.impl.HumanHelloService
測試一下
public class DubboSpiMain {
public static void main(String[] args) {
// 擷取擴充加載器
ExtensionLoader<HelloService> extensionLoader = ExtensionLoader.getExtensionLoader(HelloService.class);
// 周遊所有的支援的擴充點 META-INF.dubbo
Set<String> extensions = extensionLoader.getSupportedExtensions();
for (String extension : extensions){
String result = extensionLoader.getExtension(extension).sayHello();
System.out.println(result);
}
}
}
dubbo自己做SPI的目的
- JDK 标準的 SPI 會一次性執行個體化擴充點所有實作,如果有擴充實作初始化很耗時,但如果沒用上也加載,會很浪費資源
- 如果有擴充點加載失敗,則所有擴充點無法使用
- 提供了對擴充點包裝的功能(Adaptive),并且還支援通過set的方式對其他的擴充點進行注入
Adaptive功能
Dubbo中的Adaptive功能,主要解決的問題是如何動态的選擇具體的擴充點。通過 getAdaptiveExtension 統一對指定接口對應的所有擴充點進行封裝,通過URL的方式對擴充點來進行動态選擇。 (dubbo中所有的注冊資訊都是通過URL的形式進行處理的。)這裡同樣采用相同的方式進行 實作。
建立接口
//預設的實作類為key是human的類
@SPI("human")
public interface HelloService {
String sayHello();
@Adaptive
String sayHello(URL url);
}
建立兩個實作類
public class DogHelloService implements HelloService{
@Override
public String sayHello() {
return "wang wang";
}
@Override
public String sayHello(URL url) {
return "wang url";
}
}
public class HumanHelloService implements HelloService{
@Override
public String sayHello() {
return "hello 你好";
}
@Override
public String sayHello(URL url) {
return "hello url";
}
}
src/main/resources/META-INF/dubbo/com.test.service.HelloService中内容為
human=com.test.service.impl.HumanHelloService
dog=com.test.service.impl.DogHelloService
實際使用時根據url動态使用實作類
public class DubboAdaptiveMain {
public static void main(String[] args) {
//?之前的不重要,主要是?後面的hello.service=dog
//hello.service--》HelloService
URL url = URL.valueOf("test://localhost/hello?hello.service=dog");
HelloService adaptiveExtension = ExtensionLoader.getExtensionLoader(HelloService.class).getAdaptiveExtension();
String msg = adaptiveExtension.sayHello(url);
System.out.println(msg);
}
}
利用dubbo的spi機制來自定義攔截器
實作 org.apache.dubbo.rpc.Filter 接口
@Activate(group = {CommonConstants.CONSUMER,CommonConstants.PROVIDER})
public class DubboInvokeFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
long startTime = System.currentTimeMillis();
try {
// 執行方法
return invoker.invoke(invocation);
} finally {
System.out.println("invoke time:"+(System.currentTimeMillis()-startTime) + "毫秒");
}
}
}
src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter内容
timeFilter=com.test.filter.DubboInvokeFilter
負載均衡政策
參考官網
Random LoadBalance
随機,按權重設定随機機率。
在一個截面上碰撞的機率高,但調用量越大分布越均勻,而且按機率使用權重後也比較均勻,有利于動态調整提供者權重。
RoundRobin LoadBalance
輪詢,按公約後的權重設定輪詢比率。
存在慢的提供者累積請求的問題,比如:第二台機器很慢,但沒挂,當請求調到第二台時就卡在那,久而久之,所有請求都卡在調到第二台上。
LeastActive LoadBalance
最少活躍調用數,相同活躍數的随機,活躍數指調用前後計數差。
使慢的提供者收到更少請求,因為越慢的提供者的調用前後計數差會越大。
ConsistentHash LoadBalance
一緻性 Hash,相同參數的請求總是發到同一提供者。
當某一台提供者挂時,原本發往該提供者的請求,基于虛拟節點,平攤到其它提供者,不會引起劇烈變動。
算法參見:http://en.wikipedia.org/wiki/Consistent_hashing
預設隻對第一個參數 Hash,如果要修改,請配置 <dubbo:parameter key=“hash.arguments” value=“0,1” />
預設用 160 份虛拟節點,如果要修改,請配置 <dubbo:parameter key=“hash.nodes” value=“320” />
配置負載均衡
//服務端服務級别
<dubbo:service interface="..." loadbalance="roundrobin" />
//用戶端服務級别
<dubbo:reference interface="..." loadbalance="roundrobin" />
//服務端方法級别
<dubbo:service interface="...">
<dubbo:method name="..." loadbalance="roundrobin"/>
</dubbo:service>
//用戶端方法級别
<dubbo:reference interface="...">
<dubbo:method name="..." loadbalance="roundrobin"/>
</dubbo:reference>
//在服務提供者一方配置負載均衡
@Service(loadbalance = "random")
public class HelloServiceImpl implements HelloService {
public String sayHello(String name) {
return "hello " + name;
}
}
//在服務消費者一方配置負載均衡政策
@Reference(check = false,loadbalance = "random")
自定義負載均衡
實作org.apache.dubbo.rpc.cluster.LoadBalance接口
public class OnlyFirstLoadbalancer implements LoadBalance {
@Override
public <T> Invoker<T> select(List<Invoker<T>> list, URL url, Invocation invocation) throws RpcException {
// 所有的服務提供者 按照IP + 端口排序 選擇第一個
return list.stream().sorted((i1,i2)->{
final int ipCompare = i1.getUrl().getIp().compareTo(i2.getUrl().getIp());
if(ipCompare == 0){
return Integer.compare(i1.getUrl().getPort(),i2.getUrl().getPort());
}
return ipCompare;
}).findFirst().get();
}
}
在dubbo-spi-loadbalance工程的 META-INF/dubbo 目錄下建立
org.apache.dubbo.rpc.cluster.LoadBalance 檔案,并将目前類的全名寫入
onlyFirst=com.laogu.loadbalance.OnlyFirstLoadbalancer
使用配置
@Reference(loadbalance = "onlyFirst")
private HelloService helloService;
異步調用
通過xml方式引入
<dubbo:reference id="helloService" interface="com.test.service.HelloService">
<dubbo:method name="sayHello" async="true" />
</dubbo:reference>
獲得異步方法的傳回結果
異步調用特殊說明
需要特别說明的是,該方式的使用,請確定dubbo的版本在2.5.4及以後的版本使用。 原因在于在2.5.3 及之前的版本使用的時候,會出現異步狀态傳遞問題。
比如我們的服務調用關系是 A -> B -> C , 這時候如果A向B發起了異步請求,在錯誤的版本時,B向C發 起的請求也會連帶的産生異步請求。這是因為在底層實作層面,他是通過 RPCContext 中的
attachment 實作的。在A向B發起異步請求時,會在 attachment 中增加一個異步标示字段來表明異步 等待結果。B在接受到A中的請求時,會通過該字段來判斷是否是異步處理。但是由于值傳遞問題,B向 C發起時同樣會将該值進行傳遞,導緻C誤以為需要異步結果,導緻傳回空。這個問題在2.5.4及以後的 版本進行了修正。
dubbo線程池
dubbo在使用時,都是通過建立真實的業務線程池進行操作的。目前已知的線程池模型有兩個和java中 的互相對應:
fix: 表示建立固定大小的線程池。也是Dubbo預設的使用方式,預設建立的執行線程數為200,并且是沒有任何等待隊列的。是以再極端的情況下可能會存在問題,比如某個操作大量執行時,可能存在堵塞的情況。
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
cache: 建立非固定大小的線程池,當線程不足時,會自動建立新的線程。但是使用這種的時候需 要注意,如果突然有高TPS的請求過來,方法沒有及時完成,則會造成大量的線程建立,對系統的 CPU和負載都是壓力,執行越多反而會拖慢整個系統。
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
自定義線程池
自定義一個監控線程數量報警的線程池
public class WachingThreadPool extends FixedThreadPool implements Runnable{
private static final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class);
// 定義線程池使用的閥值
private static final double ALARM_PERCENT = 0.90;
private final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>();
public WachingThreadPool(){
// 每隔3秒列印線程使用情況
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit.SECONDS);
}
// 通過父類建立線程池
@Override
public Executor getExecutor(URL url) {
final Executor executor = super.getExecutor(url);
if(executor instanceof ThreadPoolExecutor){
THREAD_POOLS.put(url,(ThreadPoolExecutor)executor);
}
return executor;
}
@Override
public void run() {
// 周遊線程池
for (Map.Entry<URL,ThreadPoolExecutor> entry: THREAD_POOLS.entrySet()){
final URL url = entry.getKey();
final ThreadPoolExecutor executor = entry.getValue();
// 計算相關名額
final int activeCount = executor.getActiveCount();
final int poolSize = executor.getCorePoolSize();
double usedPercent = activeCount / (poolSize*1.0);
LOGGER.info("線程池執行狀态:[{}/{}:{}%]",activeCount,poolSize,usedPercent*100);
if (usedPercent > ALARM_PERCENT){
LOGGER.error("超出警戒線! host:{} 目前使用率是:{},URL:{}",url.getIp(),usedPercent*100,url);
}
}
}
}
src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool的内容為
watching=com.test.threadpool.WachingThreadPool
在服務提供方項目中設定使用該線程池生成器

路由規則
路由是決定一次請求中需要發往目标機器的重要判斷,通過對其控制可以決定請求的目标機器。我們可 以通過建立這樣的規則來決定一個請求會交給哪些伺服器去處理。
快速入門
//執行了這個之後消費端正常調用提供端,就會按照路由規則執行
public class DubboRouterMain {
public static void main(String[] args) {
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension() ;
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://127.0.0.1:2181"));
//condition表示這是一個條件路由
registry.register(URL.valueOf("condition://0.0.0.0/com.test.service.HelloService?category=routers&force=true&dynamic=true&rule=" + URL.encode("=> host != 192.168.XX.XX")));
}
}
參考官網
路由規則
向注冊中心寫入路由規則的操作通常由監控中心或治理中心的頁面完成
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("route://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule=" + URL.encode("host = 10.20.153.10 => host = 10.20.153.11")));
route:// 表示路由規則的類型,支援條件路由規則和腳本路由規則,可擴充,必填。
0.0.0.0 表示對所有 IP 位址生效,如果隻想對某個 IP 的生效,請填入具體 IP,必填。
com.foo.BarService 表示隻對指定服務生效,必填。
group=foo 對指定服務的指定group生效,不填表示對未配置group的指定服務生效
version=1.0對指定服務的指定version生效,不填表示對未配置version的指定服務生效
category=routers 表示該資料為動态配置類型,必填。
dynamic=false 表示該資料為持久資料,當注冊方退出時,資料依然儲存在注冊中心,必填。
enabled=true 覆寫規則是否生效,可不填,預設生效。
force=false 當路由結果為空時,是否強制執行,如果不強制執行,路由結果為空的路由規則将自動失效,可不填,預設為 false。
runtime=false 是否在每次調用時執行路由規則,否則隻在提供者位址清單變更時預先執行并緩存結果,調用時直接從緩存中擷取路由結果。如果用了參數路由,必須設為 true,需要注意設定會影響調用的性能,可不填,預設為 false。
priority=1 路由規則的優先級,用于排序,優先級越大越靠前執行,可不填,預設為 0。
rule=URL.encode(“host = 10.20.153.10 => host = 10.20.153.11”) 表示路由規則的内容,必填。
條件路由規則
基于條件表達式的路由規則,如:host = 10.20.153.10 => host = 10.20.153.11
規則:
=> 之前的為消費者比對條件,所有參數和消費者的 URL 進行對比,當消費者滿足比對條件時,對該消費者執行後面的過濾規則。
=> 之後為提供者位址清單的過濾條件,所有參數和提供者的 URL 進行對比,消費者最終隻拿到過濾後的位址清單。
如果比對條件為空,表示對所有消費方應用,如:=> host != 10.20.153.11
如果過濾條件為空,表示禁止通路,如:host = 10.20.153.10 =>