天天看點

dubbo源碼分析-用戶端DubboInvoker調用服務端體會Netty的非阻塞IO使用

本文會介紹Dubbo用戶端DubboInvoker調用服務端時候異步同步調用,借此了解Netty的阻塞非阻塞用法

先來看官網的描述:

dubbo源碼分析-用戶端DubboInvoker調用服務端體會Netty的非阻塞IO使用
dubbo源碼分析-用戶端DubboInvoker調用服務端體會Netty的非阻塞IO使用

上面的描述對應實作在DubboInvoker類。

DubboInvoker doInvoke(final Invocation invocation)方法:

....
    try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
....
           

關鍵參數說明:

1)isOneway: oneway調用,隻發送請求,不接收傳回結果 //RpcContext中如此描述。

2)isAsync: 異步調用

3)else: 本地調用(同步調用),線程等待傳回結果

isOneway的用法 (隻發送,不傳回結果,發送時候阻塞保證發送成功)

對于1) currentClient.send(inv, isSent) 中的isSent參數作用:-

設定是否等待消息發出:(異步總是不等待傳回結果)

● sent=”true” 等待消息發出,消息發送失敗将抛出異常。

● sent=”false” 不等待消息發出,将消息放入IO隊列,即刻傳回。

注意:

首先,是否等待消息發出與是否接收傳回結果是兩回事。 例如, 該isOneway的做法是不接收傳回結果,但是如果isSent=true則等待消息發出;

其次,對與oneway調用,NettyChannel 中send方法裡的 sent = true

NettyChannel send(Object message, boolean sent)方法:

....
  try {
            ChannelFuture future = channel.write(message);//發送時候不阻塞
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout); // 線程運作此會等待消息發出,同步
            }
....
           

(HeaderExchangeChannel send –》到 NettyChannel send)

isAsync的用法 (調用發送不阻塞,發送時候不阻塞,擷取傳回結果時候阻塞, 擷取傳回結果的位置靈活,結果放在RpcContext中)

對于2)傳回結果包裝在Future中,并且Future放入到RpcContext中,RpcContext是線程安全,因為RpcContext裡用ThreadLocal儲存該RpcContext的具體内容

ResponseFuture future = currentClient.request(inv, timeout) ; //調用發送不阻塞
    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));  
           

以上兩個方法都是非阻塞的,也就是程式沒有任何等待直接執行。注意,對與Async調用,NettyChannel 中send方法裡的 sent=false

什麼時候取回該結果? 按官網描述可知,用RpcContext.getFuture.get()則可以擷取。但是當使用這個方法調用位置是阻塞的,因為要等待結果。

(HeaderExchangeChannel request–》到 NettyChannel send)

本地調用, (調用發送不阻塞,發送時候不阻塞,擷取傳回結果時候阻塞,擷取傳回結果的位置不靈活)

對于3)本地調用(同步調用),線程等待傳回結果

RpcContext.getContext().setFuture(null);

return (Result) currentClient.request(inv, timeout).get();

注意,get()方法會阻塞,線程運作到這裡等待傳回發送結果。

注意,

首先,對于本地調用(同步調用),NettyChannel 中send方法裡的 sent=false。

其次,重點了解!! currentClient.request(inv, timeout)裡執行的是Netty裡的非阻塞調用ChannelFuture future = channel.write(message); 而程式并沒有在netty的調用層次上阻塞(估計目的是讓netty高并發處理),但是get()會在主程式運作位置阻塞直到傳回結果,該實作是dubbo的DefaultFuture自己實作,與Netty不相關。可以看出,為了等待傳回結果,該阻塞是在業務線程裡控制,并不涉及Netty的阻塞用法。

繼續閱讀