天天看點

Github項目NettyRpc 閱讀(Netty+同/異步通訊+多線程+AQS+CAS+volatile)

Github項目:https://github.com/luxiaoxun/NettyRpc

Fork: https://github.com/sw008/NettyRpc

此項目很适合學習多線程和Netty

RPC調用流程

大體思路:整個異步/同步通訊流程,适用于大多數同步/異步socket通訊場景。可以在此基礎上拓展實作例如異步回調等多種使用功能。整個異步請求+響應過程,通過ConcurrentHashMap<id, RPCFuture>關聯。

1 用戶端線程A(out) 建立RPCFuture對象(此對象包含 請求ID,request對象,response對象 等資訊),儲存ConcurrentHashMap<id, RPCFuture>儲存。然後把request對象(含請求ID)通過輸出流發送給服務端。線程A繼續向下執行其他代碼(此時下面2-4異步執行),直到其調用RPCFuture.get()後,線程A挂起。

2 服務端IO線程(in) 輸入流Handler,接收請求資訊和Channel 轉發到工作線程池。

3 服務端工作線程(out) 處理成功後通過Channel輸出流傳回響應對象(包含請求ID)給用戶端。

4 用戶端線程B(in) 通過輸入流Handler,接收響應對象,通過傳回的請求ID在ConcurrentHashMap<id, RPCFuture>中找到發送時建立的RPCFuture更新其相應資訊,并更新其AQS的狀态,release喚醒調用RPCFuture.get()而挂起的線程A。

(Callback:此步還可以拓展實作異步回調,與RPCFuture同理。即第一步發送時建立回調執行對象,儲存到ConcurrentHashMap中,在此步找到此對象并執行,此時回調對象還可以獲得request、response資訊。也可設定一個單例回調對象,在傳回輸入流中執行其回調方法)

5 用戶端線程A 被喚醒取得響應結果response,繼續執行。

代碼實作過程

1 用戶端采用JDK動态代理建立ObjectProxy類代理對象,并與服務接口綁定。

2 用戶端調用服務接口方法,觸發動态代理對象的ObjectProxy.invoke()

3 用戶端發送請求, ObjectProxy.invoke(Object proxy, Method method, Object[] args)  是JDK動态代理InvocationHandler接口的方法

   3.1 通過method、args,生成RpcRequest類對象(其包含成員變量 requestId、className、 methodName、parameterTypes、parameters)

   3.2 ConnectManage.getInstance().chooseHandler() :RpcClientHandler  一個簡單的負載均衡方法,找到應該調用的伺服器。因為Netty客服端主機與服務端主機是通過一條Channel連結,每一條Channel代表一個服務端主機。每個RpcClientHandler中包含一個Channel連結服務端,一個ConcurrentHashMap<String, RPCFuture>記錄請求ID和其對應的請求

   3.3 RpcClientHandler.sendRequest(RpcRequest request) 将請求對象發送給服務端主機,等待對方接收成功後,傳回RPCFuture對象實作異步調用。

RpcClientHandler類

ConcurrentHashMap<String, RPCFuture> pendingRPC;//儲存 請求ID+對應RPCFuture

public RPCFuture sendRequest(RpcRequest request) {
        final CountDownLatch latch = new CountDownLatch(1);

        //建立自定義異步請求類RPCFuture對象 
        RPCFuture rpcFuture = new RPCFuture(request);

        //pendingRPC為ConcurrentHashMap<String, RPCFuture> 記錄請求ID和對應異步請求
        //對方伺服器通過channel傳回Response對象時,本機輸入流方法 通過pendingRPC+請求ID更新對應RPCFuture狀态
        pendingRPC.put(request.getRequestId(), rpcFuture);

        //發送請求RpcRequest,并添加對方接收成功的異步監聽對象,回調對象ChannelFutureListener
        channel.writeAndFlush(request).addListener(
            new ChannelFutureListener() { //執行個體化 一個匿名局部内部類對象
                //一個異步監聽對象 ,監聽線程回調由Netty架構實作
                //服務端接收到後 回調此匿名内部類對象 的方法  (注意不是對方處理完回調)     
                @Override
                public void operationComplete(ChannelFuture future) {
                    //此處使用局部内部類的閉包特性,此局部内部類對象可調用此方法的局部變量latch
                    //對方接受成功,通過CountDownLatch喚醒目前線程
                    latch.countDown();
                }
        });
        try {
            //目前線程挂起 等待接收監聽線程回調喚醒
            latch.await();
        } catch (InterruptedException e) {
            logger.error(e.getMessage());
        }

        //先傳回RPCFuture,此時隻代表請求送達,但是對方伺服器可能還沒有處理完成
        return rpcFuture;
}
           

 4 服務端接收處理資訊

   服務端RpcHandler類繼承Netty的SimpleChannelInboundHandler并實作channelRead0()方法,接收用戶端資訊,并通過反射執行。    

可以看到消費者Netty input Handler收到消息之後直接把 請求資訊+請求Channel 交給工作線程池。由工作線程處理請求,再通過請求Channel傳回資訊。即服務端IO線程隻負責接收資訊轉發給工作線程,所有處理又工作線程池異步完成并傳回結果。

RpcHandler類

public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest request) throws Exception {
        //接到資訊後,直接送出到RpcServer中的線程池執行
        RpcServer.submit(new Runnable() { 
            //同樣用到了局部内部類的閉包特性,可以調用目前方法局部變量
            @Override
            public void run() {
                RpcResponse response = new RpcResponse();
                //執行個體化RpcResponse 并裝配資訊 
                response.setRequestId(request.getRequestId());
                try {
                    Object result = handle(request);
                    response.setResult(result);
                } catch (Throwable t) {
                    response.setError(t.toString());
                }
                //發送response到用戶端
                ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
                    //添加異步監聽對象,發送成功後回調此對象方法
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        logger.debug("Send response for request " + request.getRequestId());
                    }
                });
            }
        });
    }
           

 5 用戶端接收響應資訊

   RpcClientHandler類繼承Netty的SimpleChannelInboundHandler并實作channelRead0方法,接收服務端響應資訊。

   可以發現用戶端發送請求和接收響應的方法都是RpcClientHandler類實作,因為發送和接收需要依靠同一個pendingRPC進行結果比對,發送時将RPCFuture放入其中,接收響應後通過請求ID更新對應RPCFuture。

RpcClientHandler類
//用戶端 收到響應資訊
ConcurrentHashMap<String, RPCFuture> pendingRPC;
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
        //用過請求ID 在pendingRPC中找到發送時儲存的RPCFuture 
        String requestId = response.getRequestId();
        //pendingRPC儲存了發送時的RPCFuture
        RPCFuture rpcFuture = pendingRPC.get(requestId);
        if (rpcFuture != null) {
            pendingRPC.remove(requestId);
            //更新對應rpcFuture,并且喚醒已經執行rpcFuture.get()的所有線程
            rpcFuture.done(response);
        }
}
           

6 RPCFuture類實作了Future接口,并通過AQS實作線程的挂起與喚醒。

方法調用線程 持有一個RPCFuture對象,并通過此對象get()挂起等待被其它線程喚醒。

IO輸入流線程 收到響應結果後,Map中找到對應RPCFuture對象喚醒被挂起線程。

某個線程挂起後,隻能等待其它線程通過将他挂起的Object來喚醒他。(此處是通過RPCFuture對象來挂起和喚醒)

sync對象實作了AbstractQueuedSynchronizer的tryRelease,tryAcquire方法。

當執行rpcFuture.done(response)時,将AQS中volatile int state通過CAS設定為1,喚醒已經執行rpcFuture.get()的所有線程。

拓展:可以通過CountDownLatch實作Future接口。

RPCFuture類
//5中,接收到服務端響應後執行的方法rpcFuture.done(response);
public void done(RpcResponse reponse) {
        this.response = reponse;

        //sync為AQS對象,通過CAS更新AQS中的狀态值volatile int state;
        sync.release(1);

        invokeCallbacks();
        // Threshold
        long responseTime = System.currentTimeMillis() - startTime;
        if (responseTime > this.responseTimeThreshold) {
            logger.warn("Service response time is too slow. Request id = " + reponse.getRequestId() + ". Response Time = " + responseTime + "ms");
        }
    }
           

 目前程執行rpcFuture.get()時,判斷AQS中的volatile int state=1 ?,若還沒有響應資訊則目前線程進入挂起狀态。

RPCFuture類

@Override
    public Object get() throws InterruptedException, ExecutionException {

        //AQS中的狀态值volatile int state,判斷對方伺服器時候已經響應;
        sync.acquire(-1);

        if (this.response != null) {
            return this.response.getResult();
        } else {
            return null;
        }
    }
           

6 Sync類,是RPCFuture的靜态内部類。通過CAS控制volatile int state=1,決定調用線程是否需要挂起。volatile保證了可見性, CAS保證了原子性,整個過程是線程安全。使比較+指派成為一個原子性操作,不會被其他線程打擾。可以把CAS了解成多線程的串行執行,再加上volatile的可見性有序性保障,是以是線程安全的。

AQS對象.acquire:請求資源,tryAcquire==false時挂起線程

AQS對象.release:釋放資源,tryRelease==true時喚醒一個挂起線程

http://www.cnblogs.com/waterystone/p/4920797.html

static class Sync extends AbstractQueuedSynchronizer {

        private static final long serialVersionUID = 1L;

        //future 狀态常量
        private final int done = 1; //已完成
        private final int pending = 0; //未完成

        @Override
        //擷取資源
        protected boolean tryAcquire(int arg) {
            //判斷目前 volatile int state=1
            //傳回false時,目前線程挂起
            return getState() == done;
        }

        @Override
        //釋放資源
        protected boolean tryRelease(int arg) {
            if (getState() == pending) {
                //CAS設定 volatile int state=1
                //CAS保證操作原子性,線程安全
                if (compareAndSetState(pending, done)) {                    
                    //因為隻有發送線程會執行其請求對應的RPCFuture的get方法,是以隻會有一個線程挂起等待
                    //傳回true時,AQS架構會喚醒第一個等待線程
                    return true;
                } else {
                    return false;
                }
            } else {
                return true;
            }
        }

        public boolean isDone() {
            getState();
            return getState() == done;
        }
    }
           

轉載于:https://www.cnblogs.com/sw008/p/11054297.html