天天看點

Dubbo源代碼實作五:RPC中的服務消費方實作

        剛開始使用Dubbo的人,可能對Dubbo的第一印象就是它是一個RPC架構,當然,所有的分布式架構都少不了互相通信的過程,何況Dubbo的任務就是幫助分布式業務系統完成服務的通訊、負載、注冊、發現和監控等功能。不得不承認,RPC是Dubbo提供服務的核心流程,為了相容多種使用場景,Dubbo顯然需要提供多種RPC方式(協定).

        開發一個簡單的RPC架構,重點需要考慮的是兩點,即編解碼方式和底層通訊協定的選型,編解碼方式指的是需要傳輸的資料在調用方将以什麼組織形式拆解成位元組流并在服務提供方以什麼形式解析出來。編解碼方式的設計需要考慮到後期的版本更新,是以很多RPC協定在設計時都會帶上目前協定的版本資訊。而底層通訊協定的選型都大同小異,一般都是TCP(當然也可以選擇建立于TCP之上更進階的協定,比如Avro、Thrift和HTTP等),在Java語言中就是指套接字Socket,當然,在Netty出現後,很少RPC架構會直接以自己寫Socket作為預設實作的通訊方式,但通常也會自己實作一個aio、nio或bio版本給那些“不友善”依賴Netty庫的應用系統來使用。

        在Dubbo的源碼中,有一個單獨子產品dubbo-rpc,其中,最重要的應該是Protocol和Invoker兩個接口,代表着協定(編解碼方式)和調用過程(通訊方式)。Invoker接口繼承于Node接口,Node接口規範了Dubbo體系中各元件之間通訊的基本要素: 

public interface Node {
    // 協定資料載體
    URL getUrl();
    // 狀态監測,目前是否可用
    boolean isAvailable();
    // 銷毀方法
    void destroy();
}
           

而Invoker接口則更簡單:

public interface Invoker<T> extends Node {
    // 擷取調用的接口
    Class<T> getInterface();
    // 調用過程
    Result invoke(Invocation invocation) throws RpcException;
}
           

從源代碼dubbo-rpc下的子子產品來看,我們能知道目前Dubbo支援dubbo(預設)、hessian、http、injvm(本地調用)、memcached、redis、rmi、thrift和webservice等9中RPC方式。根據Dubbo的官方手冊,injvm是一個僞協定,它不開啟端口,不發起遠端調用,隻在JVM内直接關聯,但執行Dubbo的Filter鍊,是以這一般用于線下測試。可是為啥Memcached和Redis也能用作RPC?這裡是指Dubbo端作為服務消費方,而Memcached或Redis作為服務提供方。

       我們這裡重點看調用方(服務消費方)部分的代碼。

      雖然Invoker接口中定義的是invoke方法,invoker方法的實作理應RPC的整個操作,但為了狀态檢查、上下文切換和準備、異常捕獲等,抽象類AbstractInvoker中定義了一個doInvoker抽象方法來支援不同的RPC方式所應做的純粹而具體的RPC過程,我們直接看AbstractInvoker中的invoker實作:

public Result invoke(Invocation inv) throws RpcException {
    if(destroyed) {
        throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() 
                                        + " use dubbo version " + Version.getVersion()
                                        + " is DESTROYED, can not be invoked any more!");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    // 填充接口參數
    if (attachment != null && attachment.size() > 0) {
       invocation.addAttachmentsIfAbsent(attachment);
    }
    // 填充業務系統需要透傳的參數
    Map<String, String> context = RpcContext.getContext().getAttachments();
    if (context != null) {
       invocation.addAttachmentsIfAbsent(context);
    }
    // 預設是同步調用,但也支援異步
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
       invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    }

    /**
     * 幂等操作:異步操作預設添加invocation id,它是一個自增的AtomicLong
     * 可以在RpcContext中設定attachments的{@link Constants.ASYNC_KEY}值來設定是同步還是異步
     */
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    try {
        
        // 執行具體的RPC操作
        return doInvoke(invocation);

    // 異常處理的代碼略去
    } catch (InvocationTargetException e) {
    } catch (RpcException e) {
    } catch (Throwable e) {
    }
}
           

      可以看出主要是用來做參數填充(包括方法參數、業務參數和Dubbo内定的參數),然後就直接調用具體的doInvoker方法了。Dubbo所支援的RPC協定都需繼承AbstractInvoker類。

         我們先來看看Dubbo中預設的dubbo協定的實作,即DubboInvoker,直接看其doInvoker的實作: 

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    // 确定此次調用該使用哪個client(一個client代表一個connection)
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        // 如果是多個client,則使用簡單的輪詢方式來決定
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 是否異步調用
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // 是否單向調用,注意,單向調用和異步調用相比不同,單向調用不等待被調用方的應答就直接傳回
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
           boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            // 單向調用隻負責發送消息,不等待服務端應答,是以沒有傳回值
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
           ResponseFuture future = currentClient.request(inv, timeout);
            // 異步調用先儲存future,便于後期處理
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
            // 預設的同步調用
           RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
           

從上面的代碼可以看出,在dubbo協定中,分為三種調用方式:同步(預設)、異步和OneWay,同步好了解,就是阻塞等拿到被調用方的結果再傳回,異步也好了解,不等待被調用者的處理結果就直接傳回,但需要等到被調用者接收到異步請求的應答,OneWay(單向調用)在很多MQ和RPC架構中都有出現,即調用方隻負責調用一次,不管被調用方是否接收到該請求,更不會去理會被調用方的任何應答,OneWay一般隻會在無需保證調用結果的時候使用。在《Dubbo源代碼實作二》中我們已經提到過,負載的政策決定此次服務調用是請求哪個服務提供方(也就是哪台伺服器),當确定了調用哪個服務提供房後,其實也就是确定了使用哪個Invoker,這裡指DubboInvoker執行個體。RPC架構為了提高服務的吞吐量,通常服務消費方和服務提供方的伺服器之間會建立多個連接配接,如上面代碼中的clients,是以在确定使用哪個DubboInvoker執行個體後,會從中選擇一個(如上面代碼的取模輪詢)client來進行RPC調用。從上面給出的代碼可以看出,同步和異步的差別隻是同步直接在currentClient.request傳回的Future對象上進行了get操作來直接等待結果的傳回。

       Dubbo中的Client執行個體都是ExchangeClient的實作,而每個Client執行個體都會綁定一個Channel的執行個體,來處理通訊的具體細節,而所有的Channel執行個體都實作了ExchangeChannel接口。這裡我們先來看看HeaderExchangeChannel#request的實作: 

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0");
    // 相比OneWay,同步和異步調用屬于TwoWay
    req.setTwoWay(true);
    req.setData(request);
    // 建立DefaultFuture,用于将請求和應答關聯起來
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try{
        // 直接發送調用請求
        channel.send(req);
    }catch (RemotingException e) {
        future.cancel();
        throw e;
    }

    // 将future傳回,用于拿到服務調用的傳回值
    return future;
}
           

從上面代碼可以看出,在直接調用channel.send發送資料時,先建立了一個DefaultFuture,它主要用于關聯請求和應答,DefaultFuture将稍後分析。後面,直接調用了Channel的send方法,dubbo協定底層直接使用了Netty架構,是以這裡指的是NettyChannel,見NettyChannel#send的代碼:

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);
    
    boolean success = true;
    int timeout = 0;
    try {
        ChannelFuture future = channel.write(message);
        /**
         * sent值隻是為了性能調優,預設是false
         */
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    
    // 當sent為true且資料發送時間超過指定的逾時時間時,由Dubbo負責抛出異常
    if(! success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}
           

根據Dubbo使用者手冊中所說,sent參數的配置主要用于性能調優,這裡當sent為true時(預設為false),将直接使用Netty的ChannelFuture來實作在給定的逾時時間内等待,如果資料發送時間超過指定的逾時時間,則抛出異常。之是以這樣做,是為了将Netty架構處理時間控制在逾時時間範圍内,否則Dubbo架構在外圍做的逾時機制(DefaultFuture)将徒勞。

      接下來,我們看看Dubbo如何将請求和應答關聯起來的,前面看到的HeaderExchangeChannel#request實作中,建立了一個Request對象,Request中有一個mId,用來唯一表示一個請求對象,而該mId在new的時候就會建立:

public Request() {
    mId = newId();
}
 
private static long newId() {
    // getAndIncrement()增長到MAX_VALUE時,再增長會變為MIN_VALUE,負數也可以做為ID
    return INVOKE_ID.getAndIncrement();
}
           

而DefaultFuture靠的就是這個mId來關聯請求和應答消息,DefaultFuture中有兩個很重要的屬性:FUTURS和CHANNELS,它們類型都是ConcurrentHashMap,key為mId,在建立DefaultFuture對象時會把mId和相關的Future和Channel塞到這兩個Map中,還有一個ReentrantLock類型的lock屬性,用于阻塞來等待應答,我們直接看DefaultFuture中擷取結果和接收到應答後的實作:

public Object get(int timeout) throws RemotingException {
    if (timeout <= 0) {
        // 預設的逾時時間是1秒
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    if (! isDone()) {
        long start = System.currentTimeMillis();
        lock.lock();
        try {
            while (! isDone()) {
                // 最多等制定的逾時時間
                done.await(timeout, TimeUnit.MILLISECONDS);
                // 如果已經有結果或者已經超過逾時時間,則break
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        if (! isDone()) {
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    return returnFromResponse();
}
 
public static void received(Channel channel, Response response) {
    try {
        // 擷取并移除該mId的Future
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at " 
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                        + ", response " + response 
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                            + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        // 擷取并移除該mId的Channel
        CHANNELS.remove(response.getId());
    }
}

private void doReceived(Response res) {
    lock.lock();
    try {
        response = res;
        if (done != null) {
            // 釋放信号
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}
           

由于received是靜态方法,是以可以直接在Netty中注冊的Handler中使用。

      那服務消費方和服務提供方的連接配接數量是由誰決定的呢?這個我們可以直接看DubboInvoker的建立方DubboProtocol中的代碼:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

private ExchangeClient[] getClients(URL url){
    //是否共享連接配接
    boolean service_share_connect = false;
    /** 如果在dubbo:reference中沒有設定{@link Constants.CONNECTIONS_KEY},則預設是共享連接配接  */
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    //如果connections不配置,則共享連接配接,否則每服務每連接配接
    if (connections == 0){
        service_share_connect = true;
        connections = 1;
    }

    // 一個client維護一個connection
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect){
            // 使用共享的TCP長連接配接
            clients[i] = getSharedClient(url);
        } else {
            // 單獨為該URL建立TCP長連接配接
            clients[i] = initClient(url);
        }
    }
    return clients;
}
           

從getClients的代碼可以看出,服務消費方和服務提供方的伺服器之間的連接配接數量是可以配置的,服務消費方和服務提供方都可以配置,當然服務消費方優先級更高,例如:

服務消費方A:<dubbo:reference   inter  /> 

服務消費方A:<dubbo:reference   inter  connections="5"  /> 

服務提供方B:<dubbo:service  inter  /> 

服務提供方B:<dubbo:service  inter  connections="10"  /> 

對于服務BarServiceA,由于消費方和提供方都沒有配置connections,是以,所有類似于BarServiceA這樣沒有配置connections的服務,消費方伺服器和提供方伺服器将公用一個TCP長連接配接,即上面代碼說提到的共享連接配接。而對于服務BarServiceA,因為配置了connections屬性,消費方A和提供方B之間将單獨建立5個(消費方配置優先級高于服務端配置,是以這裡是5而不是10)TCP長連接配接來專門給服務BarServiceA使用,以提高吞吐量和性能,至于每次調用應該如何從這5個連接配接中選,前面已經提到,這裡不再闡述。是以,為了提高某個服務的吞吐量,可以試着配置connections屬性,當然,前提是服務提供方性能過剩。

         對于異步調用,Dubbo的預設調用過濾鍊中有一個FutureFilter,當我們在dubbo:reference中配置了async="true"後,将會執行FutureFilter中的異步邏輯,這裡不再闡述,感興趣的同學可以去閱讀FutureFilter#asyncCallback部分的代碼。

繼續閱讀