天天看點

Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析

服務路由源碼解析

1. 什麼是服務路由

服務路由包含一條(或若幹條)路由規則,路由規則決定了服務消費者的調用目标,即規定了服務消費者可調用哪些服務提供者。Dubbo 目前提供了三種服務路由實作,分别為條件路由ConditionRouter、腳本路由 ScriptRouter 和标簽路由 TagRouter。其中條件路由是我們最常使用的。

下面将以條件路由為例來講解服務路由的用法。

2. 路由規則的設定

路由規則是在 Dubbo 管控平台 Dubbo-Admin 中的。

(1) 啟動管控平台

A、啟動管控台

将 dubbo-admin-0.1.jar 檔案存放到任意目錄下,例如 D 盤根目錄下,直接運作。

注意這裡的 dubbo-admin 是已經配置好的。具體配置詳見《Dubbo(二)入門 Dubbo 管理控制台》。

Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析

B、 通路

在浏覽器位址欄中輸入 http://localhost:8080 ,即可看到 Dubbo 管理控制台界面。

(2) 設定路由規則

Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析

點選建立:

Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析

就在這裡設定路由規則。

3. 路由規則詳解

(1) 初識路由規則

應用粒度路由規則:

scope: application
force: true
runtime: true
enabled: true
key: governance-conditionrouter-consumer
conditions:
# app1 的消費者隻能消費所有端口為 20880 的服務提供執行個體
 - application=app1 => address=*:20880
# app2 的消費者隻能消費所有端口為 20881 的服務提供執行個體
 - application=app2 => address=*:20881
           

服務粒度路由規則:

scope: service
force: true
runtime: true
enabled: true
key: org.apache.dubbo.samples.governance.api.DemoService
conditions:
# DemoService 的 sayHello 方法隻能消費所有端口為 20880 的服務提供者執行個體
 - method=sayHello => address=*:20880
# DemoService 的 sayHi 方法隻能消費所有端口為 20881 的服務提供者執行個體
 - method=sayHi => address=*:20881
           

(2) 屬性詳解

A、 scope

必填項。表示路由規則的作用粒度,其取值将會決定 key 的取值。其取值範圍如下:

  • service:表示服務粒度
  • application:表示應用粒度

B、 Key

必填項。指定規則體将作用于哪個服務或應用。其取值取決于 scope 的取值。

  • scope 取值為 application 時,key 取值為 application 名稱,即<dubbo:application name=””/>的值。
  • scope 取值為 service 時,key 取值為[group:]service[:version]的組合,即組、接口名稱與版本号。

C、 enabled

可選項。指定目前路由規則是否立即生效。預設值為 true,表示立即生效。

D、force

可選項。指定當路由結果為空時,是否強制執行。如果不強制執行,路由結果為空則路由規則自動失效,不使用路由。預設為 false,不強制執行。

E、 runtime

可選項。指定是否在每次調用時執行路由規則。

  • 若為 false 則表示隻在提供者位址清單變更時會預先執行路由規則,并将路由結果進行緩存,消費者調用時直接從緩存中擷取路由結果。
  • 若為 true 則表示每次調用都要重新計算路由規則,其将會直接影響調用的性能。預設為 false。

F、 priority

可選項。用于設定路由規則的優先級,數字越大,優先級越高,越靠前執行。預設為 0。

(3) 規則體 conditions

必填項。定義具體的路由規則内容,由 1 到任意多條規則組成。

A、格式

路由規則由兩個條件組成,分别用于對服務消費者和提供者進行比對。

[服務消費者比對條件] => [服務提供者比對條件]

  • 當消費者的 URL 滿足比對條件時,對該消費者執行後面的過濾規則。
  • => 之後為提供者位址清單的過濾條件,所有參數和提供者的 URL 進行對比,消費者最終隻拿到過濾後的位址清單。
例如路由規則為:host = 10.20.153.10 => host = 10.20.153.11。其表示 IP 為 10.20.153.10的服務消費者隻可調用 IP 為 10.20.153.11 機器上的服務,不可調用其他機器上的服務。

不過,這兩個條件存在預設的情況:

  • 服務消費者比對條件為空,表示不對服務消費者進行限制,所有消費者均将被路由到行後面的提供者。
  • 服務提供者比對條件為空,表示對符合消費者條件的消費者将禁止調用任何提供者。

例如路由規則為:=> host != 10.20.153.11,則表示所有消費者均可調用IP為10.20.153.11之外的所有提供者。

再如路由規則為:host = 10.20.153.10 => ,則表示 IP 為 10.20.153.10 的提供者不能調用任何提供者。

B、 符号支援

參數符号:

  • method:将調用方法作為路由規則比較的對象
  • argument:将調用方法參數作為路由規則比較的對象
  • protocol:将調用協定作為路由規則比較的對象
  • host:将 IP 作為路由規則比較的對象
  • port:将端口号作為路由規則比較的對象
  • address:将 IP:端口号作為路由規則比較的對象
  • application:将應用名稱作為路由規則比較的對象

條件符号:

  • 等于号(=):将參數類型

    比對上

    參數值作為路由條件
  • 不等号(!=):将參數類型

    不比對

    參數值作為路由條件

取值符号:

  • 逗号(,):多個取值的分隔符,如:host != 10.20.153.10,10.20.153.11
  • 星号(*):通配符,如:host != 10.20.*
  • 美元符号($):表示引用消費者的參數值,如:host = $host

C、 舉例

  • 白名單

    host != 10.20.153.10,10.20.153.11 =>

    禁用 IP 為 10.20.153.10 與 10.20.153.11 之外的所有主機。

  • 黑名單

    host = 10.20.153.10,10.20.153.11 =>

    IP 為 10.20.153.10 與 10.20.153.11 的主機将被禁用。

  • 隻暴露一部分的提供者

    => host = 172.22.3.1*,172.22.3.2*

    消費者隻可通路 IP 為 172.22.3.1與 172.22.3.2的提供者主機。

  • 為重要應用提供額外的機器

    application != kylin => host != 172.22.3.95,172.22.3.96

    應用名稱不為 kylin 的應用不能通路 172.22.3.95 與 172.22.3.96 兩台提供者主機。即隻有名稱為 kylin 的消費者可以通路 172.22.3.95 與 172.22.3.96 兩台提供者主機。當然,kylin 還可以通路其它提供者主機,而其它消費者也可以通路 172.22.3.95 與 172.22.3.96 之外的所有提供者主機。

  • 讀寫分離

    method = find*,list*,get*,is* => host = 172.22.3.94,172.22.3.95,172.22.3.96

    method != find*,list*,get*,is* => host = 172.22.3.97,172.22.3.98

    find、list、get、is 開頭的消費者方法會被路由到 172.22.3.94、172.22.3.95 與 172.22.3.96

    三台提供者主機,而其它方法則會被路由到 172.22.3.97 與 172.22.3.98 兩台提供者主機。

  • 前背景分離

    application = bops => host = 172.22.3.91,172.22.3.92,172.22.3.93

    application != bops => host = 172.22.3.94,172.22.3.95,172.22.3.96

    應用名稱的 bops 的消費者會被路由到 172.22.3.91、172.22.3.92 與 172.22.3.93 三台提供者主機,而其它消費者則會被路由到 172.22.3.94、172.22.3.95 與 172.22.3.96 三台提供者。

  • 隔離不同機房網段

    host != 172.22.3.* => host != 172.22.3.*

    不是 172.22.3 網段的消費者是不能通路 172.22.3 網段的提供者的。即隻有 172.22.3 網段的消費者才可通路 172.22.3 網段的提供者。當然,172.22.3 網段的消費者也可通路其它網段的提供者。

  • 隻通路本機的服務

    => host = $host

    $host 表示擷取消費者請求中的消費者主機 IP。故該規則就表示消費者隻能通路本機的服務。

示範:
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析

有個問題:

将上面通過黑白名單配置的路由規則從ZK中直接拷貝出來,在條件路由中配置:

Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
添加成功後看ZK:
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
啟動代碼:
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析

删掉路由規則,提供者端不動,重新啟動消費者(因為我們消費者是一個簡單Demo不是容器、Web應用,隻能重新啟動):

發現又成功執行

Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
可以看到這個路由規則是動态配置的。

4. 源碼解析

三個關鍵問題:

  • 在哪添加 RouterChain 到 Directory的
  • 在哪讀取 zk 中的路由規則并添加到 RouterChain 的
  • 在哪進行服務路由過濾的

(1) 添加 RouterChain 到 Directory

先看下都有哪些激活的RouterFactory :

找到META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory檔案:

Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
隻有最後四個類被@Activate标記

之前我們分析過服務訂閱流程,現在隻列路徑:

org.apache.dubbo.config.ReferenceConfig#init

org.apache.dubbo.config.ReferenceConfig#createProxy

org.apache.dubbo.rpc.Protocol#refer(為每個注冊中心虛拟一個invoker)

org.apache.dubbo.registry.integration.RegistryProtocol#refer

org.apache.dubbo.registry.integration.RegistryProtocol#doRefer

//org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 生成一個動态Directory
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        // 将consumer注冊到zk
        registry.register(directory.getRegisteredConsumerUrl());
    }
    // 将所有router添加到directory
    directory.buildRouterChain(subscribeUrl);
    // 訂閱服務
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    // 将invoker清單僞裝為一個invoker
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}
           

關鍵代碼就在這,生成動态清單RegistryDirectory後,調用了directory.buildRouterChain(subscribeUrl)方法:

//org.apache.dubbo.registry.integration.RegistryDirectory#buildRouterChain
public void buildRouterChain(URL url) {
    this.setRouterChain(RouterChain.buildChain(url));
}
           
DEBUG
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析

主要跟RouterChain.buildChain(url):

//org.apache.dubbo.rpc.cluster.RouterChain#buildChain
public static <T> RouterChain<T> buildChain(URL url) {
    return new RouterChain<>(url);
}
           

看RouterChain的構造:

private RouterChain(URL url) {
    List<RouterFactory> extensionFactories = ExtensionLoader
            // 加載并緩存RouterFactory的所有擴充類(四類)
            .getExtensionLoader(RouterFactory.class)
            //擷取激活的擴充類(之前看了有四個)
            .getActivateExtension(url, (String[]) null);

    // 周遊RouterFactory清單,每個Factory都會根據url建立一個路由器Router
    // 将所有生成的Router形成List集合
    List<Router> routers = extensionFactories.stream()
            .map(factory -> factory.getRouter(url))
            .collect(Collectors.toList());
    // 使用Router清單初始化RouterChain
    initWithRouters(routers);
}

//org.apache.dubbo.rpc.cluster.RouterChain#initWithRouters
public void initWithRouters(List<Router> builtinRouters) {
	//目前是RouterChain,維護了Router清單
    this.builtinRouters = builtinRouters;
    this.routers = new ArrayList<>(builtinRouters);
    //排序(自然排序,Router實作了Comparable接口,底層根據
    //org.apache.dubbo.rpc.cluster.Router#getPriority的值進行排序)
    this.sort();
}
           

DEBUG

看下RouterChain

Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析

包含的就是之前我們看到的四個激活的 RouterFactory 建立的對應的Router

這四個是屬于一開始預設激活的

(2) 讀取 zk 中的路由規則并添加到 RouterChain

A、擷取 router 子節點 url

先看下zk中關于路由的中繼資料:
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析

路由規則從ZK中讀取的時機,就是我們之前跟服務訂閱時,更新providers清單資訊一樣,還記得有三種分類節點嗎:

//org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 生成一個動态Directory
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        // 将consumer注冊到zk
        registry.register(directory.getRegisteredConsumerUrl());
    }
    // 将所有router添加到directory
    directory.buildRouterChain(subscribeUrl);
    // 訂閱服務
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    // 将invoker清單僞裝為一個invoker
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}
           

即directory.subscribe方法:

org.apache.dubbo.registry.integration.RegistryDirectory#subscribe

org.apache.dubbo.registry.RegistryService#subscribe

org.apache.dubbo.registry.support.FailbackRegistry#subscribe

org.apache.dubbo.registry.support.FailbackRegistry#doSubscribe

org.apache.dubbo.registry.dubbo.DubboRegistry#doSubscribe

public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (ANY_VALUE.equals(url.getServiceInterface())) {  // 處理interface屬性為*的情況
			...
        } else {  // 處理interface屬性為真正接口的情況
            // 存放所有分類節點下的所有子節點url
            List<URL> urls = new ArrayList<>();
            for (String path : toCategoriesPath(url)) {
	            //将url分成三類:providers、routers、configurators
	            //此時routers是我們需要關注的
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkListener = listeners.get(listener);
                }
                // 在zk中建立分類節點
                zkClient.create(path, false);
                // 為分類節點添加子節點清單變更的watcher監聽
                // 并傳回其所有子節點清單
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    // toUrlsWithEmpty()擷取目前分類節點下的所有子節點url
                    // 如果children為空,會生成一個empty的URL
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 主動調用notify()方法,更新分類節點子節點清單
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
           
DEBUG
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
看下執行notify時,urls内容:
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
看下路由相關參數
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析

B、 将路由添加到 directory

繼續走notify方法:

org.apache.dubbo.registry.support.FailbackRegistry#notify

org.apache.dubbo.registry.support.FailbackRegistry#doNotify

org.apache.dubbo.registry.support.AbstractRegistry#notify(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.util.List<org.apache.dubbo.common.URL>)

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
	//url是消費者url,urls是提供者url清單
	...
    // keep every provider's category.
    // key:category
    // value:為該category的所有子節點的url
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
            // 為每一個category建立一個List,然後将category作為key,
            // 将這個建立的list作用value
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            // 将目前周遊的url放入到相應的category的list中
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    // 為目前消費者url建立一個map,而這個map存的就是上面result中的内容
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    // 周遊所有的category
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        // 主動調用目前周遊category的notify(),更新其子節點清單
        listener.notify(categoryList);
        // We will update our cache file after each notification.
        // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
        saveProperties(url);
    }
}
           

這些代碼之前都跟過,繼續看listener.notify(categoryList):

Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
//org.apache.dubbo.registry.integration.RegistryDirectory#notify
public synchronized void notify(List<URL> urls) {
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(url -> {
                if (UrlUtils.isConfigurator(url)) {
                    return CONFIGURATORS_CATEGORY;
                } else if (UrlUtils.isRoute(url)) {
                    return ROUTERS_CATEGORY;
                } else if (UrlUtils.isProvider(url)) {
                    return PROVIDERS_CATEGORY;
                }
                return "";
            }));
    // 處理category為configurators的情況
    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    // 處理category為routers的情況
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    //lambda表達式,toRouters傳回的集合不為空
    // 則會調用addRouters方法,把自己作為參數
    toRouters(routerURLs).ifPresent(this::addRouters);

    // providers
    // 處理category為providers的情況
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    refreshOverrideAndInvoker(providerURLs);
}
           

此時我們關注toRouters方法:

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
	...
    private static final RouterFactory ROUTER_FACTORY = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
    ...
    private Optional<List<Router>> toRouters(List<URL> urls) {
        if (urls == null || urls.isEmpty()) {
            return Optional.empty();
        }

        List<Router> routers = new ArrayList<>();
        for (URL url : urls) {
            if (EMPTY_PROTOCOL.equals(url.getProtocol())) {
                continue;
            }
            String routerType = url.getParameter(ROUTER_KEY);
            if (routerType != null && routerType.length() > 0) {
                url = url.setProtocol(routerType);
            }
            try {
	            //核心代碼,ROUTER_FACTORY是一個自适應RouterFactory
                Router router = ROUTER_FACTORY.getRouter(url);
                if (!routers.contains(router)) {
                    routers.add(router);
                }
            } catch (Throwable t) {
                logger.error("convert router url to router error, url: " + url, t);
            }
        }
        return Optional.of(routers);
    }
           

如果routers不為空,則會執行addRouters方法,将Router添加到routerChain

//org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#addRouters
protected void addRouters(List<Router> routers) {
    routers = routers == null ? Collections.emptyList() : routers;
    routerChain.addRouters(routers);
}
           

(3) 服務路由過濾

服務路由過濾是發生在消費者遠端調用的時候:

從消費者動态代理類的InvokerInvocationHandler開始追蹤:

//org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    // 若目前調用的方法是Object的方法,則進行本地調用
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    // 若目前調用的方法是重寫的toString()、hashCode()與equals(),則調用重寫的
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }
    // 遠端調用
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
           

跟invoke:

org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke

org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke

//org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    // 服務路由
    List<Invoker<T>> invokers = list(invocation);
    // 擷取負載均衡政策
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}
           

核心代碼就在list方法:

//org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#list
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
	//此時directory是動态清單RegistryDirectory
    return directory.list(invocation);
}
           

繼續跟

//org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#list
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
	//Directory是可以銷毀的,如果銷毀了,裡面所有invoker就不能用了
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }

    return doList(invocation);
}
           

doList的實作在org.apache.dubbo.registry.integration.RegistryDirectory#doList

public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
	    // forbidden為true的兩種情況:
	    // 1.沒有provider  2.所有providers都失效了
        // 1. No service provider 2. Service providers are disabled
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry "+ ...);
    }

    if (multiGroup) {
        return this.invokers == null ? Collections.emptyList() : this.invokers;
    }

    List<Invoker<T>> invokers = null;
    try {
	    // 核心代碼在這,用的責任鍊設計模式
        // Get invokers from cache, only runtime routers will be executed.
        invokers = routerChain.route(getConsumerUrl(), invocation);
    } catch (Throwable t) {
        logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
    }

    return invokers == null ? Collections.emptyList() : invokers;
}
           
DEBUG,看下此時routerChain中有幾個Router:
Dubbo(十五)源碼解析 之 服務路由服務路由源碼解析1. 什麼是服務路由2. 路由規則的設定3. 路由規則詳解4. 源碼解析
看到除了一開始預設激活的4個,這裡多了一個從ZK中擷取的條件路由器

跟routerChain.route:

//org.apache.dubbo.rpc.cluster.RouterChain#route
public List<Invoker<T>> route(URL url, Invocation invocation) {
    List<Invoker<T>> finalInvokers = invokers;
    for (Router router : routers) {
	    //每次router.route的結果作為下一個router.route的參數
	    //責任鍊設計模式
        finalInvokers = router.route(finalInvokers, url, invocation);
    }
    return finalInvokers;
}
           

此時我們一共有5個router,我們隻關注我們定義的條件路由:

//org.apache.dubbo.rpc.cluster.router.condition.ConditionRouter#route
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
        throws RpcException {
    // 若條件路由規則中的enabled屬性為false,則路由規則不起作用,
    // 意味着,消費者可以調用所有invoker,是以這裡傳回invokers清單
    if (!enabled) {
        return invokers;
    }

    if (CollectionUtils.isEmpty(invokers)) {
        return invokers;
    }
    // matchWhen()是對路由規則中=>左側進行判斷,即對消費者進行判斷
    // 若該方法傳回false,則表示目前規則對該消費者不起作用,則目前
    // 消費者可以調用所有invoker,是以這裡傳回invokers清單
    try {
        if (!matchWhen(url, invocation)) {
            return invokers;
        }
        // 用于存放路由結果
        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
        // thenCondition用于緩存=>的右側内容,即提供者判斷條件
        // 若其為空,則表示目前路由規則為黑名單,即目前的消費者
        // 不能通路任何invoker,是以會傳回一個空的result
        if (thenCondition == null) {
            logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
            return result;
        }

        // matchThen() 用于判斷提供者條件
        // 周遊所有invoker,逐個判斷該提供者是否符合目前路由規則中=>右側的條件,
        // 若符合,則将其寫入到result清單
        for (Invoker<T> invoker : invokers) {
            if (matchThen(invoker.getUrl(), url)) {
                result.add(invoker);
            }
        }
        if (!result.isEmpty()) {
            return result;
        } else if (force) {
            logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(RULE_KEY));
            return result;
        }
    } catch (Throwable t) {
        logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
    }
    return invokers;
}
           

其中matchWhen和matchThen的細節就不跟了,很複雜,感興趣的自己看吧。

繼續閱讀