Github項目:https://github.com/luxiaoxun/NettyRpc
Fork: https://github.com/sw008/NettyRpc
此項目很适合學習多線程和Netty
RPC調用流程
大體思路:整個異步/同步通訊流程,适用于大多數同步/異步socket通訊場景。可以在此基礎上拓展實作例如異步回調等多種使用功能。整個異步請求+響應過程,通過ConcurrentHashMap<id, RPCFuture>關聯。
1 用戶端線程A(out) 建立RPCFuture對象(此對象包含 請求ID,request對象,response對象 等資訊),儲存ConcurrentHashMap<id, RPCFuture>儲存。然後把request對象(含請求ID)通過輸出流發送給服務端。線程A繼續向下執行其他代碼(此時下面2-4異步執行),直到其調用RPCFuture.get()後,線程A挂起。
2 服務端IO線程(in) 輸入流Handler,接收請求資訊和Channel 轉發到工作線程池。
3 服務端工作線程(out) 處理成功後通過Channel輸出流傳回響應對象(包含請求ID)給用戶端。
4 用戶端線程B(in) 通過輸入流Handler,接收響應對象,通過傳回的請求ID在ConcurrentHashMap<id, RPCFuture>中找到發送時建立的RPCFuture更新其相應資訊,并更新其AQS的狀态,release喚醒調用RPCFuture.get()而挂起的線程A。
(Callback:此步還可以拓展實作異步回調,與RPCFuture同理。即第一步發送時建立回調執行對象,儲存到ConcurrentHashMap中,在此步找到此對象并執行,此時回調對象還可以獲得request、response資訊。也可設定一個單例回調對象,在傳回輸入流中執行其回調方法)
5 用戶端線程A 被喚醒取得響應結果response,繼續執行。
代碼實作過程
1 用戶端采用JDK動态代理建立ObjectProxy類代理對象,并與服務接口綁定。
2 用戶端調用服務接口方法,觸發動态代理對象的ObjectProxy.invoke()
3 用戶端發送請求, ObjectProxy.invoke(Object proxy, Method method, Object[] args) 是JDK動态代理InvocationHandler接口的方法
3.1 通過method、args,生成RpcRequest類對象(其包含成員變量 requestId、className、 methodName、parameterTypes、parameters)
3.2 ConnectManage.getInstance().chooseHandler() :RpcClientHandler 一個簡單的負載均衡方法,找到應該調用的伺服器。因為Netty客服端主機與服務端主機是通過一條Channel連結,每一條Channel代表一個服務端主機。每個RpcClientHandler中包含一個Channel連結服務端,一個ConcurrentHashMap<String, RPCFuture>記錄請求ID和其對應的請求
3.3 RpcClientHandler.sendRequest(RpcRequest request) 将請求對象發送給服務端主機,等待對方接收成功後,傳回RPCFuture對象實作異步調用。
RpcClientHandler類
ConcurrentHashMap<String, RPCFuture> pendingRPC;//儲存 請求ID+對應RPCFuture
public RPCFuture sendRequest(RpcRequest request) {
final CountDownLatch latch = new CountDownLatch(1);
//建立自定義異步請求類RPCFuture對象
RPCFuture rpcFuture = new RPCFuture(request);
//pendingRPC為ConcurrentHashMap<String, RPCFuture> 記錄請求ID和對應異步請求
//對方伺服器通過channel傳回Response對象時,本機輸入流方法 通過pendingRPC+請求ID更新對應RPCFuture狀态
pendingRPC.put(request.getRequestId(), rpcFuture);
//發送請求RpcRequest,并添加對方接收成功的異步監聽對象,回調對象ChannelFutureListener
channel.writeAndFlush(request).addListener(
new ChannelFutureListener() { //執行個體化 一個匿名局部内部類對象
//一個異步監聽對象 ,監聽線程回調由Netty架構實作
//服務端接收到後 回調此匿名内部類對象 的方法 (注意不是對方處理完回調)
@Override
public void operationComplete(ChannelFuture future) {
//此處使用局部内部類的閉包特性,此局部内部類對象可調用此方法的局部變量latch
//對方接受成功,通過CountDownLatch喚醒目前線程
latch.countDown();
}
});
try {
//目前線程挂起 等待接收監聽線程回調喚醒
latch.await();
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
//先傳回RPCFuture,此時隻代表請求送達,但是對方伺服器可能還沒有處理完成
return rpcFuture;
}
4 服務端接收處理資訊
服務端RpcHandler類繼承Netty的SimpleChannelInboundHandler并實作channelRead0()方法,接收用戶端資訊,并通過反射執行。
可以看到消費者Netty input Handler收到消息之後直接把 請求資訊+請求Channel 交給工作線程池。由工作線程處理請求,再通過請求Channel傳回資訊。即服務端IO線程隻負責接收資訊轉發給工作線程,所有處理又工作線程池異步完成并傳回結果。
RpcHandler類
public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest request) throws Exception {
//接到資訊後,直接送出到RpcServer中的線程池執行
RpcServer.submit(new Runnable() {
//同樣用到了局部内部類的閉包特性,可以調用目前方法局部變量
@Override
public void run() {
RpcResponse response = new RpcResponse();
//執行個體化RpcResponse 并裝配資訊
response.setRequestId(request.getRequestId());
try {
Object result = handle(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t.toString());
}
//發送response到用戶端
ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
//添加異步監聽對象,發送成功後回調此對象方法
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
logger.debug("Send response for request " + request.getRequestId());
}
});
}
});
}
5 用戶端接收響應資訊
RpcClientHandler類繼承Netty的SimpleChannelInboundHandler并實作channelRead0方法,接收服務端響應資訊。
可以發現用戶端發送請求和接收響應的方法都是RpcClientHandler類實作,因為發送和接收需要依靠同一個pendingRPC進行結果比對,發送時将RPCFuture放入其中,接收響應後通過請求ID更新對應RPCFuture。
RpcClientHandler類
//用戶端 收到響應資訊
ConcurrentHashMap<String, RPCFuture> pendingRPC;
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
//用過請求ID 在pendingRPC中找到發送時儲存的RPCFuture
String requestId = response.getRequestId();
//pendingRPC儲存了發送時的RPCFuture
RPCFuture rpcFuture = pendingRPC.get(requestId);
if (rpcFuture != null) {
pendingRPC.remove(requestId);
//更新對應rpcFuture,并且喚醒已經執行rpcFuture.get()的所有線程
rpcFuture.done(response);
}
}
6 RPCFuture類實作了Future接口,并通過AQS實作線程的挂起與喚醒。
方法調用線程 持有一個RPCFuture對象,并通過此對象get()挂起等待被其它線程喚醒。
IO輸入流線程 收到響應結果後,Map中找到對應RPCFuture對象喚醒被挂起線程。
某個線程挂起後,隻能等待其它線程通過将他挂起的Object來喚醒他。(此處是通過RPCFuture對象來挂起和喚醒)
sync對象實作了AbstractQueuedSynchronizer的tryRelease,tryAcquire方法。
當執行rpcFuture.done(response)時,将AQS中volatile int state通過CAS設定為1,喚醒已經執行rpcFuture.get()的所有線程。
拓展:可以通過CountDownLatch實作Future接口。
RPCFuture類
//5中,接收到服務端響應後執行的方法rpcFuture.done(response);
public void done(RpcResponse reponse) {
this.response = reponse;
//sync為AQS對象,通過CAS更新AQS中的狀态值volatile int state;
sync.release(1);
invokeCallbacks();
// Threshold
long responseTime = System.currentTimeMillis() - startTime;
if (responseTime > this.responseTimeThreshold) {
logger.warn("Service response time is too slow. Request id = " + reponse.getRequestId() + ". Response Time = " + responseTime + "ms");
}
}
目前程執行rpcFuture.get()時,判斷AQS中的volatile int state=1 ?,若還沒有響應資訊則目前線程進入挂起狀态。
RPCFuture類
@Override
public Object get() throws InterruptedException, ExecutionException {
//AQS中的狀态值volatile int state,判斷對方伺服器時候已經響應;
sync.acquire(-1);
if (this.response != null) {
return this.response.getResult();
} else {
return null;
}
}
6 Sync類,是RPCFuture的靜态内部類。通過CAS控制volatile int state=1,決定調用線程是否需要挂起。volatile保證了可見性, CAS保證了原子性,整個過程是線程安全。使比較+指派成為一個原子性操作,不會被其他線程打擾。可以把CAS了解成多線程的串行執行,再加上volatile的可見性有序性保障,是以是線程安全的。
AQS對象.acquire:請求資源,tryAcquire==false時挂起線程
AQS對象.release:釋放資源,tryRelease==true時喚醒一個挂起線程
http://www.cnblogs.com/waterystone/p/4920797.html
static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
//future 狀态常量
private final int done = 1; //已完成
private final int pending = 0; //未完成
@Override
//擷取資源
protected boolean tryAcquire(int arg) {
//判斷目前 volatile int state=1
//傳回false時,目前線程挂起
return getState() == done;
}
@Override
//釋放資源
protected boolean tryRelease(int arg) {
if (getState() == pending) {
//CAS設定 volatile int state=1
//CAS保證操作原子性,線程安全
if (compareAndSetState(pending, done)) {
//因為隻有發送線程會執行其請求對應的RPCFuture的get方法,是以隻會有一個線程挂起等待
//傳回true時,AQS架構會喚醒第一個等待線程
return true;
} else {
return false;
}
} else {
return true;
}
}
public boolean isDone() {
getState();
return getState() == done;
}
}
轉載于:https://www.cnblogs.com/sw008/p/11054297.html