天天看點

Dubbo 源碼分析之服務降級

前言

關于服務降級,相信很多小夥伴都聽說過或者操作過。比如最近我們在 ​

​12306​

​​ 上搶票回家,明明看到剩餘的有票,可就是買不到,這就是很明顯的一個(讀)服務降級例子。再比如雙十一時我們付款時偶爾出現付款失敗,重新支付,也是(限流)服務降級的一種,也有許多其它降級的例子,大家可以自行搜尋。具體來說就是:​

​當伺服器壓力劇增的情況下,根據目前業務情況及流量對一些服務和頁面有政策的降級,以此釋放伺服器資源以保證核心任務的正常運作​

​​ 。服務降級的影響也很明顯,使用者體驗不好。在這裡就講講 ​

​Dubbo​

​ 的服務降級。

Dubbo 服務降級

在 ​

​Dubbo​

​ 中服務降級有兩種選擇

  • 屏蔽(​

    ​force​

    ​​):在可預知的情況下執行,比如上次拼多多​

    ​100​

    ​ 元優惠券事件,如果在發現漏洞後及時限制支付服務,就不會造成所謂被薅200億瀕臨倒閉的傳聞(僅僅舉例子,還是風控各種沒做好)
  • 容錯(​

    ​fail​

    ​):在服務逾時,網絡異常時等非業務異常時使用

服務降級配置

上面我們隻使用了 ​

​return null​

​,其實還有其它自定義配置

1、使用自定義mock類(接口名+Mock)

  • ​mock = default => DemoServiceMock​

  • ​mock = true => DemoServiceMock​

  • ​mock = fail => DemoServiceMock​

  • ​mock = force => DemoServiceMock​

    ​ 2、先普通執行,執行失敗之後再執行相應的mock邏輯
  • ​mock = fail:throw => throw new RpcException(" mocked exception for Service degradation. ");​

  • ​mock = fail:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);​

  • ​mock = fail:return => return null​

  • ​mock = fail:return xxx => return xxx​

  • ​mock = fail:return empty => return new Car()​

    ​ 3、直接執行相應的mock邏輯
  • ​mock = force:throw => throw new RpcException(" mocked exception for Service degradation. ");​

  • ​mock = force:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);​

  • ​mock = force:return => return null​

  • ​mock = force:return xxx => return xxx​

  • ​mock = force:return empty => return new Car()​

屏蔽例子

我簡單寫了一個接口,共有兩個方法,登入和注冊

public interface UserService {

    JsonResponse<String> login(User user);

    JsonResponse<String> register(User user);
}      

首先啟動服務者,消費者我決定使用單元測試的方式。

在示範之前,為了加深大家的了解,我們連接配接上 ​

​ZooKeeper​

​ 去檢視該服務的注冊資訊

Dubbo 源碼分析之服務降級

從圖檔可以看出在我們提供的服務 ​

​/dubbo/com.dfire.service.UserService​

​ 下共有四個目錄

  • consumers: 消費者的​

    ​URL​

    ​ 資訊
  • configurators: 覆寫​

    ​URL​

    ​ 配置資訊
  • routers: 路由配置資訊
  • providers: 提供者的​

    ​URL​

    ​ 資訊

此時由于我們的提供者已經啟動,是以其 ​

​URL​

​​ 資訊已經展示,​

​URL​

​ 解碼後内容為

dubbo://10.1.87.92:20880/com.dfire.service.UserService?anyhost=true&application=xiaosuda_provider&dispatcher=all&dubbo=2.6.5&generic=false&interface=com.dfire.service.UserService&methods=login,register&pid=3316&side=provider&threadpool=cached&threads=50&timestamp=1548320426971&uptime=1548320426991

基本都是我們配置的資訊。

接下來打開配置中心(好像 ​

​2.7.0+​

​​ 要使用 ​

​nacos​

​​,配置方法肯定大同小異) ​

​dubbo-admin​

Dubbo 源碼分析之服務降級

配置我們的注冊方法 ​

​register​

​​ 的服務降級政策為 屏蔽 (​

​force​

​​) 傳回的結果為 ​

​return null​

​,點選儲存

下面在看一下我們的 ​

​ZooKeeper​

​ 資訊

Dubbo 源碼分析之服務降級

發現 ​

​/dubbo/com.dfire.service.UserService/configurators​

​ 節點的資料内容從無變為

override://0.0.0.0/com.dfire.service.UserService?category=configurators&dynamic=false&enabled=true&register.mock=force:return null

是以 ​

​override​

​​ 開頭的協定,我們需要關注的參數:​

​register.mock=force:return nul​

​​,解釋為: ​

​com.dfire.service.UserService.register()​

​​方法的服務降級政策為屏蔽,直接傳回 ​

​null​

​​。

關于配置覆寫:

  • ​override://​

    ​ 表示資料采用覆寫方式,支援 override 和 absent,可擴充,必填。
  • ​0.0.0.0​

    ​ 表示對所有 IP 位址生效,如果隻想覆寫某個 IP 的資料,請填入具體 IP,必填。
  • ​com.alibaba.dubbo.demo.DemoService​

    ​表示隻對指定服務生效,必填。
  • ​category=configurators​

    ​ 表示該資料為動态配置類型,必填。
  • ​dynamic=false​

    ​ 表示該資料為持久資料,當注冊方退出時,資料依然儲存在注冊中心,必填。
  • ​enabled=true​

    ​ 覆寫規則是否生效,可不填,預設生效。
  • ​application=demo-consumer​

    ​ 表示隻對指定應用生效,可不填,表示對所有應用生效。
  • ​mock=force:return+null​

    ​​表示将滿足以上條件的​

    ​mock​

    ​​ 參數的值覆寫為​

    ​force:return+null​

    ​​。如果想覆寫其它參數,直接加在​

    ​override​

    ​​ 的​

    ​URL​

    ​ 參數上。

接下來執行我們的單元測試

@RunWith(SpringRunner.class)
@SpringBootTest
public class ConsumerApplicationTests {

    @Autowired
    private UserService userService;

    @Test
    public void testForceNullMock() {
        User user = new User();
        user.setUsername("張三");
        user.setPassword("張三");
        JsonResponse<String> response = userService.register(user);
        Assert.assertNull(response);
    }
}      

執行後看到綠色的 ​

​Tests passed​

容錯例子

容錯例子也很簡單,就是在一次正常的服務調用失敗後執行服務降級操作,如果這一次正常的服務執行成功則不會執行服務降級操作。具體配置與屏蔽類似

Dubbo 源碼分析之服務降級

示範就不再示範了。

服務降級源碼分析

服務降級的入口在 ​

​MockClusterInvoker.invoke()​

​方法

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
        // 判斷方法名.mock 的值  即上面例子的:register.mock
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
        // 未設定mock 走正常 invoke
        if (value.length() == 0 || value.equalsIgnoreCase("false")){
          //no mock
          result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) { 
          if (logger.isWarnEnabled()) {
            logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " +  directory.getUrl());
          }
          //屏蔽
          result = doMockInvoke(invocation, null);
        } else {  
          //容錯
          try {
            //首先正常 invoke
            result = this.invoker.invoke(invocation);
          }catch (RpcException e) {
        if (e.isBiz()) {
          throw e;
        } else {
          if (logger.isWarnEnabled()) {
                logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " +  directory.getUrl(), e);
              }
              //異常後 走mock invoke
          result = doMockInvoke(invocation, e);
        }
      }
        }
        return result;
  }      

需要注意的點

  • ​directory.getUrl()​

    ​​ :​

    ​directory​

    ​​ 對象為​

    ​RegistryDirectory​

    ​​,​

    ​URL​

    ​ 會動态變化

​doMockInvoke​

​ 方法

private Result doMockInvoke(Invocation invocation,RpcException e){
        Result result = null;
        Invoker<T> minvoker ;
        
        List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
        if (mockInvokers == null || mockInvokers.size() == 0){
            minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
        } else {
            minvoker = mockInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
            }
//          
        } catch (Throwable me) {
            throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
        }
        return result;
    }      

代碼很簡單,首先獲得 ​

​mockInvokers​

​​ 集合,如果該集合為空建立一個對象,否則以集合的第一個為準。

繼續看​​

​selectMockInvoker​

/**
     * 傳回MockInvoker
     * 契約:
     * directory根據invocation中是否有Constants.INVOCATION_NEED_MOCK,來判斷擷取的是一個normal invoker 還是一個 mock invoker
     * 如果directorylist 傳回多個mock invoker,隻使用第一個invoker.
     * @param invocation
     * @return 
     */
    private List<Invoker<T>> selectMockInvoker(Invocation invocation){
        List<Invoker<T>> invokers = null;
        //TODO generic invoker?
        if (invocation instanceof RpcInvocation) {
            //存在隐含契約(雖然在接口聲明中增加描述,但擴充性會存在問題.同時放在attachement中的做法需要改進
            ((RpcInvocation) invocation).setAttachment(Constants.INVOCATION_NEED_MOCK, Boolean.TRUE.toString());
            //directory根據invocation中attachment是否有Constants.INVOCATION_NEED_MOCK,來判斷擷取的是normal invokers or mock invokers
            try {
                invokers = directory.list(invocation);
            } catch (RpcException e) {
                if (logger.isInfoEnabled()) {
                    logger.info("Exception when try to invoke mock. Get mock invokers error for service:"
                            + directory.getUrl().getServiceInterface() + ", method:" + invocation.getMethodName()
                            + ", will contruct a new mock with 'new MockInvoker()'.", e);
                }
            }
        }
        return invokers;
    }      

判斷 ​

​invocation​

​​ 是否為 ​

​RpcInvocation​

​​ 的執行個體,如果是為 ​

​invocation​

​​ 添加 ​

​invocation.need.mock=true​

然後在 ​

​RegistryDirectory​

​​ 的抽象類中 ​

​AbstractDirectory​

​​ 進行篩選 ​

​invokers​

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed){
            throw new RpcException("Directory already destroyed .url: "+ getUrl());
        }
        //模版方法  執行個體為RegistryDirectory 獲得所有Invoker
        List<Invoker<T>> invokers = doList(invocation);
        // 開啟時移除斷流位址或者關閉時清空熔斷資訊 addCircuitBreaker
        CircuitBreakerManager circuitBreakerManager = ExtensionLoader.getExtensionLoader(CircuitBreakerManager.class).getAdaptiveExtension();
        invokers = circuitBreakerManager.filterCricuitBreakInvoker(invokers,invocation);
        //路由政策 預設會在routers集合中添加MockInvokersSelector
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && localRouters.size() > 0) {
            for (Router router: localRouters){
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }      

上面代碼作用:從服務字典中獲得所有的服務,然後進行路由過濾把過濾後的服務清單傳回。

主要看​​

​MockInvokersSelector​

​​類的 ​

​route​

​ 方法。

public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
      URL url, final Invocation invocation) throws RpcException {
    if (invocation.getAttachments() == null) {
      return getNormalInvokers(invokers);
    } else {
      // invocation.need.mock  就是在MockClusterInvoker.selectMockInvoker 中添加的
      String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
      if (value == null) 
        return getNormalInvokers(invokers);
      else if (Boolean.TRUE.toString().equalsIgnoreCase(value)){
        return getMockedInvokers(invokers);
      } 
    }
    return invokers;
  }      

主要就是判斷 ​

​invocation.need.mock​

​​ 的值是否為 ​

​true​

​​ 。如果為 ​

​true​

​​ 調用​

​getMockedInvokers​

​​,否則調用​

​getNormalInvokers​

private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers){
    //判斷是否存在mock://協定的提供者  如果不存在 直接傳回原提供者 
    if (! hasMockProviders(invokers)){
      return invokers;
    } else { //存在mock://協定的提供者  去掉mock://的提供者過濾後傳回
      List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
      for (Invoker<T> invoker : invokers){
        if (! invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)){
          sInvokers.add(invoker);
        }
      }
      return sInvokers;
    }
  }      
private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) {
  //判斷是否存在mock://協定的提供者  如果不存在 直接傳回null
    if (! hasMockProviders(invokers)){
      return null;
    }
    List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1);
    for (Invoker<T> invoker : invokers){
      // 過濾mock://的提供者 
      if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)){
        sInvokers.add(invoker);
      }
    }
    return sInvokers;
  }      

調用層次比較深,大家最好 ​

​debug​

​​ 檢視。過濾完之後就又回到了 ​

​MockClusterInvoker.doMockInvoke​

​ 方法.

private Result doMockInvoke(Invocation invocation,RpcException e){
        Result result = null;
        Invoker<T> minvoker ;
        
        List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
        //mockInvokers 結果傳回 可能為空
        if (mockInvokers == null || mockInvokers.size() == 0){
            minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
        } else {
            minvoker = mockInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
            }
//          
        } catch (Throwable me) {
            throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
        }
        return result;
    }      

下面就要進入 ​

​MockInvoker.invoke​

​ 方法了

public Result invoke(Invocation invocation) throws RpcException {
        //該值即我們在dubbo-admin設定的  可能是 force:return null
        String mock = getUrl().getParameter(invocation.getMethodName()+"."+Constants.MOCK_KEY);
        if (invocation instanceof RpcInvocation) {
            //設定invoke 為目前執行個體
            ((RpcInvocation) invocation).setInvoker(this);
        }
        if (StringUtils.isBlank(mock)){
            mock = getUrl().getParameter(Constants.MOCK_KEY);
        }
        
        if (StringUtils.isBlank(mock)){
            throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
        }
        //首先進行URL解碼獲得mock的具體要執行的結果 比如force:return null 執行normallizeMock後結果為return null
        mock = normallizeMock(URL.decode(mock));
        //配置 “return” 執行該語句
        if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())){
            RpcResult result = new RpcResult();
            result.setValue(null);
            return result;
        } else if (mock.startsWith(Constants.RETURN_PREFIX)) {    //配置 “return *” *表示任務内容執行該語句
            mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
            mock = mock.replace('`', '"');
            try {
                Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
                Object value = parseMockValue(mock, returnTypes);
                return new RpcResult(value);
            } catch (Exception ew) {
                throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: "+ url , ew);
            }
        } else if (mock.startsWith(Constants.THROW_PREFIX)) {    //配置 “throw *” 執行該語句
            mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
            mock = mock.replace('`', '"');
            if (StringUtils.isBlank(mock)){
                throw new RpcException(" mocked exception for Service degradation. ");
            } else { //使用者自定義類
                Throwable t = getThrowable(mock);
                throw new RpcException(RpcException.BIZ_EXCEPTION, t);
            }
        } else { //自定以mock實作執行該語句
             try {
                 Invoker<T> invoker = getInvoker(mock);
                 return invoker.invoke(invocation);
             } catch (Throwable t) {
                 throw new RpcException("Failed to create mock implementation class " + mock , t);
             }
        }
    }