天天看點

Dubbo(四):服務路由的實作

  上一篇中,我們介紹了dubbo的負載均衡實作,見識了幾種常用的負載均衡算法。就單個功能而言,似乎dubbo并沒有太多的突出之處。事實上,一個成功的産品不必每個地方都要打破正常。更重要的是其全局優化的架構設計,以及如何使用現有的優秀解決方案為己服務。

  本篇将介紹另一種叢集環境中的高可用實作:路由服務的實作。它将從另一個角度補充dubbo的叢集功能完整性。

1. 路由出現的時機?

  服務路由是什麼?服務路由包含一條路由規則,路由規則決定了服務消費者的調用目标,即規定了服務消費者可調用哪些服務提供者。

  服務路由是什麼派上用場的呢?實際上,它是在進行消費都調用提供者的第一步操作。叢集的幾個政策的先後為: 服務路由 -> 負載均衡 -> 叢集容錯(重試);

  其調用入口架構是在 org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker 中的:

@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                // 服務路由,入口,由父類中調用
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            // 負載均衡入口
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                // 叢集容錯,進行重試調用
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
    // org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#list
    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        // 直接調用對應的路徑服務的 list() 方法進行路由。
        return directory.list(invocation);
    }
    // org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#list
    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }

        return doList(invocation);
    }
    // org.apache.dubbo.registry.integration.RegistryDirectory#doList
    @Override
    public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            // 1. No service provider 2. Service providers are disabled
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                    getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
                    NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
                    ", please check status of providers(disabled, not registered or in blacklist).");
        }

        if (multiGroup) {
            return this.invokers == null ? Collections.emptyList() : this.invokers;
        }

        List<Invoker<T>> invokers = null;
        try {
            // Get invokers from cache, only runtime routers will be executed.
            invokers = routerChain.route(getConsumerUrl(), invocation);
        } catch (Throwable t) {
            logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
        }

        return invokers == null ? Collections.emptyList() : invokers;
    }
    
    // org.apache.dubbo.rpc.cluster.RouterChain#route
    public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = invokers;
        // 根據注冊的 routers 依次調用,過濾 finalInvokers 之後傳回
        for (Router router : routers) {
            finalInvokers = router.route(finalInvokers, url, invocation);
        }
        return finalInvokers;
    }      

2. dubbo提供了哪些路由政策?

  Dubbo 目前提供了三種服務路由實作,分别為條件路由 ConditionRouter、腳本路由 ScriptRouter 和标簽路由 TagRouter。

  router 的建立時機:每次url發生變更後(如背景修改),都會觸發一次路由資訊重建。

// org.apache.dubbo.registry.integration.RegistryDirectory#notify
    @Override
    public synchronized void notify(List<URL> urls) {
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(this::judgeCategory));

        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        // 從url中取出相應的路由服務類,添加 routerChain 中,備用
        toRouters(routerURLs).ifPresent(this::addRouters);

        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        /**
         * 3.x added for extend URL address
         */
        ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
        List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
        if (supportedListeners != null && !supportedListeners.isEmpty()) {
            for (AddressListener addressListener : supportedListeners) {
                providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
            }
        }
        refreshOverrideAndInvoker(providerURLs);
    }
    // org.apache.dubbo.registry.integration.RegistryDirectory#toRouters
    /**
     * @param urls
     * @return null : no routers ,do nothing
     * else :routers list
     */
    private Optional<List<Router>> toRouters(List<URL> urls) {
        if (urls == null || urls.isEmpty()) {
            return Optional.empty();
        }

        List<Router> routers = new ArrayList<>();
        for (URL url : urls) {
            if (EMPTY_PROTOCOL.equals(url.getProtocol())) {
                continue;
            }
            String routerType = url.getParameter(ROUTER_KEY);
            if (routerType != null && routerType.length() > 0) {
                url = url.setProtocol(routerType);
            }
            try {
                // 根據router工廠類進行建立router, 該工廠類使用 SPI 機制進行生成,實作 RouterFactory
                // file=org.apache.dubbo.rpc.cluster.router.file.FileRouterFactory
                // script=org.apache.dubbo.rpc.cluster.router.script.ScriptRouterFactory
                // condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory
                // service=org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouterFactory
                // app=org.apache.dubbo.rpc.cluster.router.condition.config.AppRouterFactory
                // tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory
                // mock=org.apache.dubbo.rpc.cluster.router.mock.MockRouterFactory
                Router router = ROUTER_FACTORY.getRouter(url);
                if (!routers.contains(router)) {
                    routers.add(router);
                }
            } catch (Throwable t) {
                logger.error("convert router url to router error, url: " + url, t);
            }
        }

        return Optional.of(routers);
    }      

  是以,整體上整個router的建立,依賴于url中的router參數,用該參數找到對應的router工廠類,然後調用其 getRouter()方法生成具體的router. 我們簡單看看router的工廠類一般是什麼樣的?

2.1. 路由工廠類的建構

  

/**
 * Application level router factory
 */
@Activate(order = 200)
public class AppRouterFactory implements RouterFactory {
    public static final String NAME = "app";

    private volatile Router router;

    @Override
    public Router getRouter(URL url) {
        // 一個工廠類中,隻有一個單例的router
        if (router != null) {
            return router;
        }
        // 雙重鎖 懶加載
        synchronized (this) {
            if (router == null) {
                router = createRouter(url);
            }
        }
        return router;
    }

    private Router createRouter(URL url) {
        
        return new AppRouter(url);
    }
}

// 可緩存路由
/**
 * Service level router factory
 */
@Activate(order = 300)
public class ServiceRouterFactory extends CacheableRouterFactory {

    public static final String NAME = "service";

    @Override
    protected Router createRouter(URL url) {
        return new ServiceRouter(url);
    }

}

// 條件路徑工廠類
public class ConditionRouterFactory implements RouterFactory {

    public static final String NAME = "condition";

    @Override
    public Router getRouter(URL url) {
        // 直接new對象傳回
        return new ConditionRouter(url);
    }

}

// 檔案路由工廠類,事實上它并不一個單純的路由工廠類,它需要依賴于别的路由工廠
public class FileRouterFactory implements RouterFactory {

    public static final String NAME = "file";

    private RouterFactory routerFactory;
    // 将别的路由工廠注入進來
    public void setRouterFactory(RouterFactory routerFactory) {
        this.routerFactory = routerFactory;
    }

    @Override
    public Router getRouter(URL url) {
        try {
            // Transform File URL into Script Route URL, and Load
            // file:///d:/path/to/route.js?router=script ==> script:///d:/path/to/route.js?type=js&rule=<file-content>
            String protocol = url.getParameter(ROUTER_KEY, ScriptRouterFactory.NAME); // Replace original protocol (maybe 'file') with 'script'
            String type = null; // Use file suffix to config script type, e.g., js, groovy ...
            String path = url.getPath();
            if (path != null) {
                int i = path.lastIndexOf('.');
                if (i > 0) {
                    type = path.substring(i + 1);
                }
            }
            String rule = IOUtils.read(new FileReader(new File(url.getAbsolutePath())));

            // FIXME: this code looks useless
            boolean runtime = url.getParameter(RUNTIME_KEY, false);
            URL script = URLBuilder.from(url)
                    .setProtocol(protocol)
                    .addParameter(TYPE_KEY, type)
                    .addParameter(RUNTIME_KEY, runtime)
                    .addParameterAndEncoded(RULE_KEY, rule)
                    .build();
            // 将重新組裝後的url,傳遞委托給注入的路由工廠進行處理
            return routerFactory.getRouter(script);
        } catch (IOException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

}
    
@Activate
public class MockRouterFactory implements RouterFactory {
    public static final String NAME = "mock";

    @Override
    public Router getRouter(URL url) {
        return new MockInvokersSelector();
    }

}
// 腳本路由工廠
/**
 * ScriptRouterFactory
 * <p>
 * Example URLS used by Script Router Factory:
 * <ol>
 * <li> script://registryAddress?type=js&rule=xxxx
 * <li> script:///path/to/routerfile.js?type=js&rule=xxxx
 * <li> script://D:\path\to\routerfile.js?type=js&rule=xxxx
 * <li> script://C:/path/to/routerfile.js?type=js&rule=xxxx
 * </ol>
 * The host value in URL points out the address of the source content of the Script Router,Registry、File etc
 *
 */
public class ScriptRouterFactory implements RouterFactory {

    public static final String NAME = "script";

    @Override
    public Router getRouter(URL url) {
        // 直接new對象傳回
        return new ScriptRouter(url);
    }

}

// 标簽路由工廠,可緩存路由(使用一個ConcurrentHashMap集合容器進行儲存已建立的router)
@Activate(order = 100)
public class TagRouterFactory extends CacheableRouterFactory {

    public static final String NAME = "tag";
    // getRouter() 由 父類統一進行架構搭建,子類隻需實作 createRouter() 即可
    @Override
    protected Router createRouter(URL url) {
        return new TagRouter(url);
    }
}

/**
 * If you want to provide a router implementation based on design of v2.7.0, please extend from this abstract class.
 * For 2.6.x style router, please implement and use RouterFactory directly.
 */
public abstract class CacheableRouterFactory implements RouterFactory {
    private ConcurrentMap<String, Router> routerMap = new ConcurrentHashMap<>();

    @Override
    public Router getRouter(URL url) {
        return routerMap.computeIfAbsent(url.getServiceKey(), k -> createRouter(url));
    }

    protected abstract Router createRouter(URL url);
}      

  可以看出這些個工廠類,基本都是使用new的方法就傳回了對應的路由執行個體類。那麼是否有必要都在這些類外面包一個工廠類進行建立呢?直接建立不好嗎?事實上,這隻是個一種工廠模式的最佳實踐,是為了更好的隐藏建立邏輯。

2.2. 條件路由 ConditionRouter 詳解

  路由功能的實作,主要分為規則解析和規則應用兩個部分!

// 構造方法,主要是解析一些參數
    public ConditionRouter(URL url) {
        this.url = url;
        // priority=1
        this.priority = url.getParameter(PRIORITY_KEY, 0);
        // force=false
        this.force = url.getParameter(FORCE_KEY, false);
        // enabled=true
        this.enabled = url.getParameter(ENABLED_KEY, true);
        // rule=xxx
        // init 方法中詳細解析路由規則
        init(url.getParameterAndDecoded(RULE_KEY));
    }
    // 解析條件規則 host = 10.20.153.10 => host = 10.20.153.11
    public void init(String rule) {
        try {
            if (rule == null || rule.trim().length() == 0) {
                throw new IllegalArgumentException("Illegal route rule!");
            }
            // 規則如: host = 10.20.153.10 => host = 10.20.153.11
            rule = rule.replace("consumer.", "").replace("provider.", "");
            int i = rule.indexOf("=>");
            // 如果沒有=>, 則全部路由到 該規則指定的host中
            String whenRule = i < 0 ? null : rule.substring(0, i).trim();
            String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
            Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
            Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
            // NOTE: It should be determined on the business level whether the `When condition` can be empty or not.
            this.whenCondition = when;
            this.thenCondition = then;
        } catch (ParseException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
    // 解析條件規則鍵值對
    private static Map<String, MatchPair> parseRule(String rule)
            throws ParseException {
        Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
        if (StringUtils.isBlank(rule)) {
            return condition;
        }
        // Key-Value pair, stores both match and mismatch conditions
        MatchPair pair = null;
        // Multiple values
        Set<String> values = null;
        // ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)");
        final Matcher matcher = ROUTE_PATTERN.matcher(rule);
        while (matcher.find()) { // Try to match one by one
            String separator = matcher.group(1);
            String content = matcher.group(2);
            // Start part of the condition expression.
            if (StringUtils.isEmpty(separator)) {
                pair = new MatchPair();
                condition.put(content, pair);
            }
            // The KV part of the condition expression
            // &host=xxx
            else if ("&".equals(separator)) {
                if (condition.get(content) == null) {
                    pair = new MatchPair();
                    condition.put(content, pair);
                } else {
                    pair = condition.get(content);
                }
            }
            // The Value in the KV part.
            else if ("=".equals(separator)) {
                if (pair == null) {
                    throw new ParseException("Illegal route rule \""
                            + rule + "\", The error char '" + separator
                            + "' at index " + matcher.start() + " before \""
                            + content + "\".", matcher.start());
                }

                values = pair.matches;
                values.add(content);
            }
            // The Value in the KV part.
            else if ("!=".equals(separator)) {
                if (pair == null) {
                    throw new ParseException("Illegal route rule \""
                            + rule + "\", The error char '" + separator
                            + "' at index " + matcher.start() + " before \""
                            + content + "\".", matcher.start());
                }

                values = pair.mismatches;
                values.add(content);
            }
            // The Value in the KV part, if Value have more than one items.
            else if (",".equals(separator)) { // Should be separated by ','
                if (values == null || values.isEmpty()) {
                    throw new ParseException("Illegal route rule \""
                            + rule + "\", The error char '" + separator
                            + "' at index " + matcher.start() + " before \""
                            + content + "\".", matcher.start());
                }
                values.add(content);
            } else {
                throw new ParseException("Illegal route rule \"" + rule
                        + "\", The error char '" + separator + "' at index "
                        + matcher.start() + " before \"" + content + "\".", matcher.start());
            }
        }
        return condition;
    }      

  2. 接下來是如何使用這些配置好的規則

  路由服務由routerChain進行統一調用:

// org.apache.dubbo.rpc.cluster.RouterChain#route
    /**
     *
     * @param url
     * @param invocation
     * @return
     */
    public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = invokers;
        for (Router router : routers) {
            finalInvokers = router.route(finalInvokers, url, invocation);
        }
        return finalInvokers;
    }
    // 以下是條件路由的route()實作:
    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
            throws RpcException {
        if (!enabled) {
            return invokers;
        }

        if (CollectionUtils.isEmpty(invokers)) {
            return invokers;
        }
        try {
            // 如果不符合路由條件,直接傳回所有原樣 invokers 即可 
            if (!matchWhen(url, invocation)) {
                return invokers;
            }
            List<Invoker<T>> result = new ArrayList<Invoker<T>>();
            if (thenCondition == null) {
                logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
                return result;
            }
            for (Invoker<T> invoker : invokers) {
                // 否則依次比對每個候選 invokers, 符合條件的才傳回
                // 具體比對實作如下:
                if (matchThen(invoker.getUrl(), url)) {
                    result.add(invoker);
                }
            }
            if (!result.isEmpty()) {
                return result;
            } else if (force) {
                logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(RULE_KEY));
                return result;
            }
        } catch (Throwable t) {
            logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
        }
        return invokers;
    }
    // 路由源位址檢測,檢查要調用的服務位址是否命中了條件路由的規則
    boolean matchWhen(URL url, Invocation invocation) {
        // whenCondition 為空,代表攔截所有路徑
        return CollectionUtils.isEmptyMap(whenCondition) || matchCondition(whenCondition, url, null, invocation);
    }
    // 路由目的位址比對檢測,與路由源位址比對模式相同,僅将 whenCondition 換為 thenCondition
    private boolean matchThen(URL url, URL param) {
        return CollectionUtils.isNotEmptyMap(thenCondition) && matchCondition(thenCondition, url, param, null);
    }
    
    private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param, Invocation invocation) {
        Map<String, String> sample = url.toMap();
        boolean result = false;
        for (Map.Entry<String, MatchPair> matchPair : condition.entrySet()) {
            String key = matchPair.getKey();
            String sampleValue;
            //get real invoked method name from invocation
            if (invocation != null && (METHOD_KEY.equals(key) || METHODS_KEY.equals(key))) {
                sampleValue = invocation.getMethodName();
            } else if (ADDRESS_KEY.equals(key)) {
                sampleValue = url.getAddress();
            } else if (HOST_KEY.equals(key)) {
                sampleValue = url.getHost();
            } else {
                sampleValue = sample.get(key);
                // 為什麼要擷取兩次 sample.get(key); ?
                if (sampleValue == null) {
                    sampleValue = sample.get(key);
                }
            }
            if (sampleValue != null) {
                // 依次調用 MatchPair.isMatch() 方法,進行驗證
                // 隻要有一次驗證不通過,則目前 invocation 即不符合路由條件了
                if (!matchPair.getValue().isMatch(sampleValue, param)) {
                    return false;
                } else {
                    result = true;
                }
            } else {
                //not pass the condition
                if (!matchPair.getValue().matches.isEmpty()) {
                    return false;
                } else {
                    result = true;
                }
            }
        }
        return result;
    }
        // 在 MatchPair 中實作具體的判定是否目前位址是否比對路由資訊
        private boolean isMatch(String value, URL param) {
            // 隻有相等比對情況,直接取 matches 進行校驗即可
            if (!matches.isEmpty() && mismatches.isEmpty()) {
                for (String match : matches) {
                    // 簡單正則比對檢測, 主要處理 * 規則
                    if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                        return true;
                    }
                }
                return false;
            }
            // 隻有不相等比對情況, 直接取出 mismatches 校驗,反向輸出即可
            if (!mismatches.isEmpty() && matches.isEmpty()) {
                for (String mismatch : mismatches) {
                    if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                        return false;
                    }
                }
                return true;
            }
            // 相等和不相等兩種條件都存在時,優先使用 mismatches 進行配置,然後使用 matches 比對,即 mismatches 優先級高于 matches
            if (!matches.isEmpty() && !mismatches.isEmpty()) {
                //when both mismatches and matches contain the same value, then using mismatches first
                for (String mismatch : mismatches) {
                    if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                        return false;
                    }
                }
                for (String match : matches) {
                    if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                        return true;
                    }
                }
                return false;
            }
            return false;
        }
    }      

2.3. 腳本路由的實作 ScriptRouter

  構造方法中主要解析一些必要參數,以及根據類型擷取作業系統的腳本解析引擎,非常重要。

public ScriptRouter(URL url) {
        this.url = url;
        this.priority = url.getParameter(PRIORITY_KEY, SCRIPT_ROUTER_DEFAULT_PRIORITY);
        // 擷取解析引擎,根據 type=javascript 等傳回
        engine = getEngine(url);
        // 擷取 rule=xxxx, 規則
        rule = getRule(url);
        try {
            // 有 GroovyScriptEngineImpl, NashornScriptEngine
            Compilable compilable = (Compilable) engine;
            function = compilable.compile(rule);
        } catch (ScriptException e) {
            logger.error("route error, rule has been ignored. rule: " + rule +
                    ", url: " + RpcContext.getContext().getUrl(), e);
        }
    }      

  而實際路由的方法,也是直接調用腳本引擎進行腳本解析而得:

// org.apache.dubbo.rpc.cluster.router.script.ScriptRouter#route
    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        try {
            // 将參數資訊封裝為 Bindings, 統一傳入腳本引擎
            Bindings bindings = createBindings(invokers, invocation);
            if (function == null) {
                return invokers;
            }
            // 調用腳本引擎的 function.eval() 方法,即将參數傳入規則腳本中,得到invokers
            // 并通過 getRoutedInvokers 将結果轉換成 List<Invoker<T>> 類型傳回
            return getRoutedInvokers(function.eval(bindings));
        } catch (ScriptException e) {
            logger.error("route error, rule has been ignored. rule: " + rule + ", method:" +
                    invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);
            return invokers;
        }
    }
    /**
     * create bindings for script engine
     */
    private <T> Bindings createBindings(List<Invoker<T>> invokers, Invocation invocation) {
        Bindings bindings = engine.createBindings();
        // create a new List of invokers
        bindings.put("invokers", new ArrayList<>(invokers));
        bindings.put("invocation", invocation);
        bindings.put("context", RpcContext.getContext());
        return bindings;
    }      

  上面的實作看起來還是有點抽象。我們拿出一個dubbo中的單測試樣例,看一下腳本路由的使用方式:

@Test
    public void testRoutePickInvokers() {
        // rule 寫法,即是 javascript 的文法,不過它需要調用一些java的方法,以便識别java中傳遞過來的參數以及傳回結果的對接
        // 該js代碼脫離了java引擎應該是不可被解析的
        String rule = "var result = new java.util.ArrayList(invokers.size());" +
                "for (i=0;i<invokers.size(); i++){ " +
                // 擷取 isAvailable() 屬性進行判斷是否可将該invoker列入候選清單
                "if (invokers.get(i).isAvailable()) {" +
                "result.add(invokers.get(i)) ;" +
                "}" +
                "} ; " +
                "return result;";
        // 定義一個 route函數,并立即調用它,進而達到傳回腳本結果的效果
        String script = "function route(invokers,invocation,context){" + rule + "} route(invokers,invocation,context)";
        Router router = new ScriptRouterFactory().getRouter(getRouteUrl(script));

        List<Invoker<String>> invokers = new ArrayList<Invoker<String>>();
        // 模型invoker 不可用
        Invoker<String> invoker1 = new MockInvoker<String>(false);
        Invoker<String> invoker2 = new MockInvoker<String>(true);
        Invoker<String> invoker3 = new MockInvoker<String>(true);
        invokers.add(invoker1);
        invokers.add(invoker2);
        invokers.add(invoker3);
        List<Invoker<String>> filteredInvokers = router.route(invokers, invokers.get(0).getUrl(), new RpcInvocation());
        Assertions.assertEquals(2, filteredInvokers.size());
        Assertions.assertEquals(invoker2, filteredInvokers.get(0));
        Assertions.assertEquals(invoker3, filteredInvokers.get(1));
    }      

  是以,其實腳本路由可以寫得非常靈活多變,但是維護成本有點高,它不像條件路由那樣簡潔明了。需要進行反複自測試後才可配置在正式環境中。

2.4. 标簽路由 TagRouter

  大概就是根據tag=xxx 選擇相應的路由位址。該router還未正式釋出,不過可以看一下其大概實作:

@Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        if (CollectionUtils.isEmpty(invokers)) {
            return invokers;
        }

        // since the rule can be changed by config center, we should copy one to use.
        final TagRouterRule tagRouterRuleCopy = tagRouterRule;
        if (tagRouterRuleCopy == null || !tagRouterRuleCopy.isValid() || !tagRouterRuleCopy.isEnabled()) {
            return filterUsingStaticTag(invokers, url, invocation);
        }

        List<Invoker<T>> result = invokers;
        // 從url中取出 dubbo.tag=xxx 值 
        String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
                invocation.getAttachment(TAG_KEY);

        // if we are requesting for a Provider with a specific tag
        if (StringUtils.isNotEmpty(tag)) {
            List<String> addresses = tagRouterRuleCopy.getTagnameToAddresses().get(tag);
            // filter by dynamic tag group first
            if (CollectionUtils.isNotEmpty(addresses)) {
                result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
                // if result is not null OR it's null but force=true, return result directly
                if (CollectionUtils.isNotEmpty(result) || tagRouterRuleCopy.isForce()) {
                    return result;
                }
            } else {
                // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
                // dynamic tag group but force=false. check static tag
                result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(TAG_KEY)));
            }
            // If there's no tagged providers that can match the current tagged request. force.tag is set by default
            // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
            if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) {
                return result;
            }
            // FAILOVER: return all Providers without any tags.
            else {
                List<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(),
                        tagRouterRuleCopy.getAddresses()));
                return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
            }
        } else {
            // List<String> addresses = tagRouterRule.filter(providerApp);
            // return all addresses in dynamic tag group.
            List<String> addresses = tagRouterRuleCopy.getAddresses();
            if (CollectionUtils.isNotEmpty(addresses)) {
                result = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), addresses));
                // 1. all addresses are in dynamic tag group, return empty list.
                if (CollectionUtils.isEmpty(result)) {
                    return result;
                }
                // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
                // static tag group.
            }
            return filterInvoker(result, invoker -> {
                String localTag = invoker.getUrl().getParameter(TAG_KEY);
                return StringUtils.isEmpty(localTag) || !tagRouterRuleCopy.getTagNames().contains(localTag);
            });
        }
    }      

  其配置格式大緻如下:

String serviceStr = "---\n" +
                "force: false\n" +
                "runtime: true\n" +
                "enabled: false\n" +
                "priority: 1\n" +
                "key: demo-provider\n" +
                "tags:\n" +
                "  - name: tag1\n" +
                "    addresses: [\"30.5.120.37:20881\"]\n" +
                "  - name: tag2\n" +
                "    addresses: [\"30.5.120.37:20880\"]\n" +
                "...";      

2.5. AppRouter + ServiceRouter

  這兩個路由服務實際上不是獨立的路由實作類,它是包裝了 ConditionRouter 的實作,來完成特殊的業務邏輯。

// org.apache.dubbo.rpc.cluster.router.condition.config.AppRouter#AppRouter
    public AppRouter(URL url) {
        // 将 application=xxx   作為路由key
        super(url, url.getParameter(CommonConstants.APPLICATION_KEY));
        this.priority = APP_ROUTER_DEFAULT_PRIORITY;
    }
    // org.apache.dubbo.rpc.cluster.router.condition.config.ListenableRouter#ListenableRouter
    public ListenableRouter(URL url, String ruleKey) {
        super(url);
        this.force = false;
        // 初始化路由服務
        this.init(ruleKey);
    }
    private synchronized void init(String ruleKey) {
        if (StringUtils.isEmpty(ruleKey)) {
            return;
        }
        // +   .condition-router
        String routerKey = ruleKey + RULE_SUFFIX;
        ruleRepository.addListener(routerKey, this);
        String rule = ruleRepository.getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP);
        if (StringUtils.isNotEmpty(rule)) {
            this.process(new ConfigChangedEvent(routerKey, DynamicConfiguration.DEFAULT_GROUP, rule));
        }
    }
    
    @Override
    public synchronized void process(ConfigChangedEvent event) {
        if (logger.isInfoEnabled()) {
            logger.info("Notification of condition rule, change type is: " + event.getChangeType() +
                    ", raw rule is:\n " + event.getContent());
        }
        if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
            routerRule = null;
            conditionRouters = Collections.emptyList();
        } else {
            try {
                routerRule = ConditionRuleParser.parse(event.getContent());
                generateConditions(routerRule);
            } catch (Exception e) {
                logger.error("Failed to parse the raw condition rule and it will not take effect, please check " +
                        "if the condition rule matches with the template, the raw rule is:\n " + event.getContent(), e);
            }
        }
    }
    // 進行路由服務調用時,僅把功能委托給 conditionRouters 即可
    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        if (CollectionUtils.isEmpty(invokers) || conditionRouters.size() == 0) {
            return invokers;
        }

        // We will check enabled status inside each router.
        for (Router router : conditionRouters) {
            invokers = router.route(invokers, url, invocation);
        }

        return invokers;
    }      

  ServiceRouter 的實作也大緻一樣,隻是取的 routerKey 不同而已。

// org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouter#ServiceRouter
    public ServiceRouter(URL url) {
        // 與 AppRouter 的差别在于 routerKey 取值不同
        super(url, DynamicConfiguration.getRuleKey(url));
        this.priority = SERVICE_ROUTER_DEFAULT_PRIORITY;
    }
    // org.apache.dubbo.common.config.configcenter.DynamicConfiguration#getRuleKey
    /**
     * The format is '{interfaceName}:[version]:[group]'
     *
     * @return
     */
    static String getRuleKey(URL url) {
        return url.getColonSeparatedKey();
    }
    // org.apache.dubbo.common.URL#getColonSeparatedKey
    /**
     * The format is "{interface}:[version]:[group]"
     *
     * @return
     */
    public String getColonSeparatedKey() {
        StringBuilder serviceNameBuilder = new StringBuilder();
        serviceNameBuilder.append(this.getServiceInterface());
        append(serviceNameBuilder, VERSION_KEY, false);
        append(serviceNameBuilder, GROUP_KEY, false);
        return serviceNameBuilder.toString();
    }
          

  服務路由的出發點,是為了讓使用者能夠更靈活地配置一些特殊的調用場景,如跨機房調用,或者應用一些異常情況比如某執行個體不希望再被調用。總之,應用場景總是有的,否則就是在玩自嗨。

  了解其運作原理,讓我們更清楚,我們到底在路由什麼!

不要害怕今日的苦,你要相信明天,更苦!

繼續閱讀