一、前言
前文我們分析了NameServer的核心元件、初始化和啟動流程、消息的序列化和反序列化,這一篇我們就來分析一下NameServer是如何發送請求的;
NettyRemotingServer代表了netty網絡通信伺服器,由他進行請求的發送,我們再重新回顧一下NettyRemotingServer的核心元件;
二、源碼導讀
1、我們發現NettyRemotingServer提供了3種發送請求的方式:同步請求、異步請求、Oneway請求(隻發送請求,不等待響應);而且都是直接調用父類NettyRemotingAbstract的方法,後續我們一個個分析;
2、如果發送的請求響應逾時了怎麼處理?
三、同步請求
- 封裝一個ResponseFuture對象;
- 把請求id->空的responseFuture放入到map裡去;
- 添加一個請求發送監聽器,監聽請求是否發送成功;
- 通過countDownLatch來等待請求的響應回來并設定逾時時間,如果說響應回來了或者逾時則繼續執行;
- 如果響應為空抛出異常,響應不為空正常傳回;
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl
// 同步的模式發送請求出去給指定的機器
// 我得先把請求發送出去,然後才可以去分析我收到了相應以後怎麼來進行處理
// channel是網絡連接配接,request是我要發送的請求,timeoutMillis是同步發送請求逾時時間
public RemotingCommand invokeSyncImpl(
final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
// opaque就是一個請求id
final int opaque = request.getOpaque();
try {
// 封裝一個ResponseFuture,這個就代表了後續我要擷取到的一個響應future對象,此時響應command是空的
// 因為他是同步調用的,拿到了響應之後,就直接傳回響應對象就可以了
// 異步發送請求,他會傳遞進來一個invokeCallback,通過異步回調機制來接收我的響應
final ResponseFuture responseFuture = new ResponseFuture(
channel, opaque, timeoutMillis, null, null);
// 把請求id->空的responseFuture放入到map裡去
this.responseTable.put(opaque, responseFuture);
// 你要發送請求的網絡連接配接的遠端機器的位址
final SocketAddress addr = channel.remoteAddress();
// 通過netty的channel發送請求資料出去,這裡是用的是netty異步發送的機制
// 如果說發送這個請求成功/失敗了以後,回調我的一個channel future listener,如果說發送成功了
// responseFuture标記為true
// 如果說發送失敗了以後,他會根據請求id把響應future對象從map裡移除掉,設定一下異常的cause
// 發送請求command的時候,他這個時候其實這個remoting command對象,會經過netty編碼器,進行序列化和編碼
// 從對象轉換成一個位元組數組,通過網絡連接配接發送出去
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 發送完了請求之後會在這裡通過responseFuture等待指定逾時時間,響應對象回來
// 同步發送請求就會在這裡等待指定的timeout時間
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
// 如果說響應回來了以後,結果呢響應是空的,逾時了,有逾時的問題,響應其實沒有拿到
// 另外一種是請求根本沒有發送成功
if (null == responseCommand) {
// 如果說發送成功了,但是逾時了沒收到響應,此時是走timeout異常
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
}
// 另外一種是請求壓根兒就沒發送成功,send request異常
else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
// 如果說是響應成功了,此時就會把響應command傳回
return responseCommand;
} finally {
// 最終一定會把請求id從responseTable裡移除掉
this.responseTable.remove(opaque);
}
}
1、封裝一個ResponseFuture對象;
// 封裝一個ResponseFuture,這個就代表了後續我要擷取到的一個響應future對象,此時響應command是空的
// 因為他是同步調用的,拿到了響應之後,就直接傳回響應對象就可以了
// 異步發送請求,他會傳遞進來一個invokeCallback,通過異步回調機制來接收我的響應
final ResponseFuture responseFuture = new ResponseFuture(
channel, opaque, timeoutMillis, null, null);
2、把請求id->空的responseFuture放入到map裡去;
// 把請求id->空的responseFuture放入到map裡去
this.responseTable.put(opaque, responseFuture);
3、添加一個請求發送監聽器,監聽請求是否發送成功;
// 通過netty的channel發送請求資料出去,這裡是用的是netty異步發送的機制
// 如果說發送這個請求成功/失敗了以後,回調我的一個channel future listener,如果說發送成功了
// responseFuture标記為true
// 如果說發送失敗了以後,他會根據請求id把響應future對象從map裡移除掉,設定一下異常的cause
// 發送請求command的時候,他這個時候其實這個remoting command對象,會經過netty編碼器,進行序列化和編碼
// 從對象轉換成一個位元組數組,通過網絡連接配接發送出去
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
4、通過countDownLatch來等待請求的響應回來并設定逾時時間,如果說響應回來了或者逾時則繼續執行;
// 發送完了請求之後會在這裡通過responseFuture等待指定逾時時間,響應對象回來
// 同步發送請求就會在這裡等待指定的timeout時間
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
// 等待請求的響應回來,等待逾時時間,countDownLatch來等待,如果說響應回來了
// 要對countDownLatch進行countDown()操作
// await等待操作就會傳回
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
// 我在這裡通過countDownLatch來等待,誰去對那1個數字去進行一次countDown,隻有有人countDown了以後
// 從1數字變為0數字,等待指定逾時時間
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
5、如果響應為空抛出異常,響應不為空正常傳回;
// 如果說響應回來了以後,結果呢響應是空的,逾時了,有逾時的問題,響應其實沒有拿到
// 另外一種是請求根本沒有發送成功
if (null == responseCommand) {
// 如果說發送成功了,但是逾時了沒收到響應,此時是走timeout異常
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
}
// 另外一種是請求壓根兒就沒發送成功,send request異常
else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
// 如果說是響應成功了,此時就會把響應command傳回
return responseCommand;
四、異步請求
- 通過semaphoreAsync擷取一個信号量,超過逾時時間直接抛出逾時異常;
- 封裝SemaphoreReleaseOnlyOnce隻支援釋放一次的對象來進行信号量釋放;
- 判斷是否逾時,可能擷取信号量時間過長導緻已經逾時了;
- 封裝一個ResponseFuture放入responseTable等待響應傳回;
- 添加一個監聽器監聽請求發送結果,請求發送結果會進行回調;
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeAsyncImpl
// 異步發送請求
// 其實就多了一個東西而已,invoke callback
// 請求發送完了以後我就直接傳回了,不等待了,響應回來了以後回調我的invoke callback
public void invokeAsyncImpl(
final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 設定一個開始時間
long beginStartTime = System.currentTimeMillis();
// 擷取到我們的一個請求id
final int opaque = request.getOpaque();
// semaphoreAsync,異步調用請求的信号量
// async semaphore信号量預設是64個,意思就是說并發控制,也就是說你最多同時隻能發起64個async request異步請求
// semaphore來說的話,他會預設的話會設定一些信号量,如果說信号量那完了以後,此時線程就會阻塞在這裡了
// countDownLatch,要很多線程都去進行countDown,把數字都countDown完了,此時才可以不用阻塞
// 如果說64個信号量都擷取完了之後,一個異步請求發送完了收到了一個響應了之後,此時才能還回去一個信号量
// 此時才可以有下一個異步請求拿到一個信号量再發送出去
// 阻塞等待信号量的時候是有一個timeout,逾時時間的限制的,如果超過逾時時間此時還沒等到信号量,就直接傳回了就可以了
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
// 如果說在timeout逾時時間範圍之内擷取到了信号量了以後
if (acquired) {
// 我把信号量對象封裝了一下,僅僅支援釋放一次的信号量對象
// 後續如果說我要進行信号量釋放,此時可以基于這個僅僅支援釋放一次的對象來進行釋放
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
// 從開始時間到現在為止,耗費了多少時間去擷取一個信号量就是這裡的內插補點
// 如果說你等待擷取一個信号量耗費的時間,超過了逾時時間之後,此時就說明有問題,我們就通過僅僅支援釋放一次的
// 對象,把擷取到的信号量去做一次釋放,然後就抛異常,就說我逾時了
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
// 如果說我擷取信号量等待時間沒有超過逾時時間
// 就可以正常去封裝一個response future,裡面會設定一些東西,網絡連接配接是channel,請求id是opaque
// 真正你在發送請求的時候,等待響應的時候,逾時時間是要變化的,我為了等待擷取信号量
// 已經耗費了一段時間了,這個時候把逾時時間減去我擷取信号量耗費的時間
// 響應對象回來了以後需要回調,invoke callback回調對象
// once,釋放掉信号量,得基于僅僅支援釋放一次的信号量對象來進行釋放
final ResponseFuture responseFuture = new ResponseFuture(
channel,
opaque,
timeoutMillis - costTime,
invokeCallback,
once
);
this.responseTable.put(opaque, responseFuture);
try {
// 通過netty網絡連接配接發送請求,請求發送結果會回調
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
}
// 如果要是在逾時時間範圍以内沒拿到信号量,此時抛異常,too much request
else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
// 逾時異常
throw new RemotingTimeoutException(info);
}
}
}
1、通過semaphoreAsync擷取一個信号量,超過逾時時間直接抛出逾時異常;
// semaphoreAsync,異步調用請求的信号量
// juc裡面的countDownLatch和semaphore在實戰開發裡是如何來運用的
// async semaphore信号量預設是64個,意思就是說并發控制,也就是說你最多同時隻能發起64個async request異步請求
// semaphore來說的話,他會預設的話會設定一些信号量,如果說信号量那完了以後,此時線程就會阻塞在這裡了
// countDownLatch,要很多線程都去進行countDown,把數字都countDown完了,此時才可以不用阻塞
// 如果說64個信号量都擷取完了之後,一個異步請求發送完了收到了一個響應了之後,此時才能還回去一個信号量
// 此時才可以有下一個異步請求拿到一個信号量再發送出去
// 阻塞等待信号量的時候是有一個timeout,逾時時間的限制的,如果超過逾時時間此時還沒等到信号量,就直接傳回了就可以了
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {}
// 如果要是在逾時時間範圍以内沒拿到信号量,此時抛異常,too much request
else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
// 逾時異常
throw new RemotingTimeoutException(info);
}
}
2、封裝SemaphoreReleaseOnlyOnce隻支援釋放一次的對象來進行信号量釋放;
// 如果說在timeout逾時時間範圍之内擷取到了信号量了以後
if (acquired) {
// 我把信号量對象封裝了一下,僅僅支援釋放一次的信号量對象
// 後續如果說我要進行信号量釋放,此時可以基于這個僅僅支援釋放一次的對象來進行釋放
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
3、判斷是否逾時,可能擷取信号量時間過長導緻已經逾時了;
// 從開始時間到現在為止,耗費了多少時間去擷取一個信号量就是這裡的內插補點
// 如果說你等待擷取一個信号量耗費的時間,超過了逾時時間之後,此時就說明有問題,我們就通過僅僅支援釋放一次的
// 對象,把擷取到的信号量去做一次釋放,然後就抛異常,就說我逾時了
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
4、封裝一個ResponseFuture放入responseTable等待響應傳回;
// 如果說我擷取信号量等待時間沒有超過逾時時間
// 就可以正常去封裝一個response future,裡面會設定一些東西,網絡連接配接是channel,請求id是opaque
// 真正你在發送請求的時候,等待響應的時候,逾時時間是要變化的,我為了等待擷取信号量
// 已經耗費了一段時間了,這個時候把逾時時間減去我擷取信号量耗費的時間
// 響應對象回來了以後需要回調,invoke callback回調對象
// once,釋放掉信号量,得基于僅僅支援釋放一次的信号量對象來進行釋放
final ResponseFuture responseFuture = new ResponseFuture(
channel,
opaque,
timeoutMillis - costTime,
invokeCallback,
once
);
this.responseTable.put(opaque, responseFuture);
5、添加一個監聽器監聽請求發送結果,請求發送結果會進行回調;
try {
// 通過netty網絡連接配接發送請求,請求發送結果會回調
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
// 發送請求失敗了,移除掉這個請求對應的responseFuture
private void requestFail(final int opaque) {
ResponseFuture responseFuture = responseTable.remove(opaque);
if (responseFuture != null) {
responseFuture.setSendRequestOK(false);
responseFuture.putResponse(null);
try {
// 調用我們設定的異步回調接口
executeInvokeCallback(responseFuture);
} catch (Throwable e) {
log.warn("execute callback in requestFail, and callback throw", e);
} finally {
// 一定會對我們的信号量僅僅釋放一次
responseFuture.release();
}
}
}
/**
* Execute callback in callback executor. If callback executor is null, run directly in current thread
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
// 拿到一個異步回調的線程池
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
// 開啟一個線程異步的去回調我們的invoke callback
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
// 如果是異步調用的話,在這裡完成異步回調了以後,就會去進行一個信号量釋放
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
// 如果說響應回來了以後,執行invoke回調
public void executeInvokeCallback() {
if (invokeCallback != null) {
// 通過cas確定invoke callback僅僅隻能回調一次
if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
// 執行一次異步的回調
invokeCallback.operationComplete(this);
}
}
}
五、Oneway請求
- 标記本次請求為oneway請求;
- 通過semaphoreOneway擷取信号量,如果逾時則抛出逾時異常;
- 封裝SemaphoreReleaseOnlyOnce隻支援釋放一次的對象來進行信号量釋放;
- 添加一個監聽器監聽請求發送結果,發送成功,失敗都可以,失敗隻列印告警日志;
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImpl
// 發送一個oneway請求,隻要把請求發送出去就可以了,不要去等待響應
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 标記本次請求為oneway請求
request.markOnewayRPC();
// 同時最多隻可以有指定數量的oneway請求發送出去
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
// 隻要我的一個oneway請求發送出去了,發送成功了,失敗了,都可以,此時就把信号量做一個釋放
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
1、标記本次請求為oneway請求;
// 标記本次請求為oneway請求
request.markOnewayRPC();
public void markOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
this.flag |= bits;
}
2、通過semaphoreOneway擷取信号量,如果逾時則抛出逾時異常;
// 同時最多隻可以有指定數量的oneway請求發送出去
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
3、封裝SemaphoreReleaseOnlyOnce隻支援釋放一次的對象來進行信号量釋放;
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void release() {
if (this.semaphore != null) {
// 基于atomic實作了基于CAS的僅僅支援一次的釋放
// cas操作,多線程并發的時候,僅僅會跑成功一次,僅僅可以讓我們的這次信号量擷取,就釋放一次
if (this.released.compareAndSet(false, true)) {
this.semaphore.release();
}
}
}
4、添加一個監聽器監聽請求發送結果,回調時釋放信号量,發送成功,失敗都可以,失敗隻列印告警日志;
// 隻要我的一個oneway請求發送出去了,發送成功了,失敗了,都可以,此時就把信号量做一個釋放
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
public void release() {
if (this.semaphore != null) {
// 基于atomic實作了基于CAS的僅僅支援一次的釋放
// cas操作,多線程并發的時候,僅僅會跑成功一次,僅僅可以讓我們的這次信号量擷取,就釋放一次
if (this.released.compareAndSet(false, true)) {
this.semaphore.release();
}
}
}
六、判斷請求是否逾時
/**
* <p>
* This method is periodically invoked to scan and expire deprecated request.
* 這個方法周期性的運作,去掃描發送出去的請求是否逾時了,是誰周期性的來調用他呢,肯定是有一個定時排程的線程
* 應該是在他的子類裡
* </p>
*/
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
// 請求發送時間 + 逾時時間 <= 目前時間,目前時間已經超過了一個請求逾時時間了
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
// 把信号量做一個釋放
rep.release();
it.remove(); // 把這個請求的responseFuture做一個移除
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
// 對于異步調用的話得進行回調這樣子
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
七、總結
我們看完NameServer三種發送請求流程,我們可以學到裡面的countDownLatch和semaphore在實戰開發裡是如何來運用的;
下文我們還會分析,如果發送的請求一直拿不的響應NameServer是怎麼處理的,我們是不是也可以學習一下?