天天看點

RocketMQ源碼分析之NameServer發送請求流程源碼分析

作者:程式員阿龍

一、前言

前文我們分析了NameServer的核心元件、初始化和啟動流程、消息的序列化和反序列化,這一篇我們就來分析一下NameServer是如何發送請求的;

NettyRemotingServer代表了netty網絡通信伺服器,由他進行請求的發送,我們再重新回顧一下NettyRemotingServer的核心元件;

RocketMQ源碼分析之NameServer發送請求流程源碼分析

二、源碼導讀

1、我們發現NettyRemotingServer提供了3種發送請求的方式:同步請求、異步請求、Oneway請求(隻發送請求,不等待響應);而且都是直接調用父類NettyRemotingAbstract的方法,後續我們一個個分析;

2、如果發送的請求響應逾時了怎麼處理?

RocketMQ源碼分析之NameServer發送請求流程源碼分析

三、同步請求

  1. 封裝一個ResponseFuture對象;
  2. 把請求id->空的responseFuture放入到map裡去;
  3. 添加一個請求發送監聽器,監聽請求是否發送成功;
  4. 通過countDownLatch來等待請求的響應回來并設定逾時時間,如果說響應回來了或者逾時則繼續執行;
  5. 如果響應為空抛出異常,響應不為空正常傳回;

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl

// 同步的模式發送請求出去給指定的機器
// 我得先把請求發送出去,然後才可以去分析我收到了相應以後怎麼來進行處理
// channel是網絡連接配接,request是我要發送的請求,timeoutMillis是同步發送請求逾時時間
public RemotingCommand invokeSyncImpl(
        final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    // opaque就是一個請求id
    final int opaque = request.getOpaque();

    try {
        // 封裝一個ResponseFuture,這個就代表了後續我要擷取到的一個響應future對象,此時響應command是空的
        // 因為他是同步調用的,拿到了響應之後,就直接傳回響應對象就可以了
        // 異步發送請求,他會傳遞進來一個invokeCallback,通過異步回調機制來接收我的響應
        final ResponseFuture responseFuture = new ResponseFuture(
                channel, opaque, timeoutMillis, null, null);
        // 把請求id->空的responseFuture放入到map裡去
        this.responseTable.put(opaque, responseFuture);

        // 你要發送請求的網絡連接配接的遠端機器的位址
        final SocketAddress addr = channel.remoteAddress();

        // 通過netty的channel發送請求資料出去,這裡是用的是netty異步發送的機制
        // 如果說發送這個請求成功/失敗了以後,回調我的一個channel future listener,如果說發送成功了
        // responseFuture标記為true
        // 如果說發送失敗了以後,他會根據請求id把響應future對象從map裡移除掉,設定一下異常的cause
        // 發送請求command的時候,他這個時候其實這個remoting command對象,會經過netty編碼器,進行序列化和編碼
        // 從對象轉換成一個位元組數組,通過網絡連接配接發送出去
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });

        // 發送完了請求之後會在這裡通過responseFuture等待指定逾時時間,響應對象回來
        // 同步發送請求就會在這裡等待指定的timeout時間
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

        // 如果說響應回來了以後,結果呢響應是空的,逾時了,有逾時的問題,響應其實沒有拿到
        // 另外一種是請求根本沒有發送成功
        if (null == responseCommand) {
            // 如果說發送成功了,但是逾時了沒收到響應,此時是走timeout異常
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            }
            // 另外一種是請求壓根兒就沒發送成功,send request異常
            else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }

        // 如果說是響應成功了,此時就會把響應command傳回
        return responseCommand;
    } finally {
        // 最終一定會把請求id從responseTable裡移除掉
        this.responseTable.remove(opaque);
    }
}           

1、封裝一個ResponseFuture對象;

// 封裝一個ResponseFuture,這個就代表了後續我要擷取到的一個響應future對象,此時響應command是空的
// 因為他是同步調用的,拿到了響應之後,就直接傳回響應對象就可以了
// 異步發送請求,他會傳遞進來一個invokeCallback,通過異步回調機制來接收我的響應
final ResponseFuture responseFuture = new ResponseFuture(
        channel, opaque, timeoutMillis, null, null);           

2、把請求id->空的responseFuture放入到map裡去;

// 把請求id->空的responseFuture放入到map裡去
this.responseTable.put(opaque, responseFuture);           

3、添加一個請求發送監聽器,監聽請求是否發送成功;

// 通過netty的channel發送請求資料出去,這裡是用的是netty異步發送的機制
// 如果說發送這個請求成功/失敗了以後,回調我的一個channel future listener,如果說發送成功了
// responseFuture标記為true
// 如果說發送失敗了以後,他會根據請求id把響應future對象從map裡移除掉,設定一下異常的cause
// 發送請求command的時候,他這個時候其實這個remoting command對象,會經過netty編碼器,進行序列化和編碼
// 從對象轉換成一個位元組數組,通過網絡連接配接發送出去
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) throws Exception {
        if (f.isSuccess()) {
            responseFuture.setSendRequestOK(true);
            return;
        } else {
            responseFuture.setSendRequestOK(false);
        }

        responseTable.remove(opaque);
        responseFuture.setCause(f.cause());
        responseFuture.putResponse(null);
        log.warn("send a request command to channel <" + addr + "> failed.");
    }
});           

4、通過countDownLatch來等待請求的響應回來并設定逾時時間,如果說響應回來了或者逾時則繼續執行;

// 發送完了請求之後會在這裡通過responseFuture等待指定逾時時間,響應對象回來
// 同步發送請求就會在這裡等待指定的timeout時間
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);           
// 等待請求的響應回來,等待逾時時間,countDownLatch來等待,如果說響應回來了
// 要對countDownLatch進行countDown()操作
// await等待操作就會傳回
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
    // 我在這裡通過countDownLatch來等待,誰去對那1個數字去進行一次countDown,隻有有人countDown了以後
    // 從1數字變為0數字,等待指定逾時時間
    this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
    return this.responseCommand;
}           

5、如果響應為空抛出異常,響應不為空正常傳回;

// 如果說響應回來了以後,結果呢響應是空的,逾時了,有逾時的問題,響應其實沒有拿到
// 另外一種是請求根本沒有發送成功
if (null == responseCommand) {
    // 如果說發送成功了,但是逾時了沒收到響應,此時是走timeout異常
    if (responseFuture.isSendRequestOK()) {
        throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
            responseFuture.getCause());
    }
    // 另外一種是請求壓根兒就沒發送成功,send request異常
    else {
        throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
    }
}

// 如果說是響應成功了,此時就會把響應command傳回
return responseCommand;           

四、異步請求

  1. 通過semaphoreAsync擷取一個信号量,超過逾時時間直接抛出逾時異常;
  2. 封裝SemaphoreReleaseOnlyOnce隻支援釋放一次的對象來進行信号量釋放;
  3. 判斷是否逾時,可能擷取信号量時間過長導緻已經逾時了;
  4. 封裝一個ResponseFuture放入responseTable等待響應傳回;
  5. 添加一個監聽器監聽請求發送結果,請求發送結果會進行回調;

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeAsyncImpl

// 異步發送請求
// 其實就多了一個東西而已,invoke callback
// 請求發送完了以後我就直接傳回了,不等待了,響應回來了以後回調我的invoke callback
public void invokeAsyncImpl(
        final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    // 設定一個開始時間
    long beginStartTime = System.currentTimeMillis();
    // 擷取到我們的一個請求id
    final int opaque = request.getOpaque();

    // semaphoreAsync,異步調用請求的信号量
    // async semaphore信号量預設是64個,意思就是說并發控制,也就是說你最多同時隻能發起64個async request異步請求
    // semaphore來說的話,他會預設的話會設定一些信号量,如果說信号量那完了以後,此時線程就會阻塞在這裡了
    // countDownLatch,要很多線程都去進行countDown,把數字都countDown完了,此時才可以不用阻塞
    // 如果說64個信号量都擷取完了之後,一個異步請求發送完了收到了一個響應了之後,此時才能還回去一個信号量
    // 此時才可以有下一個異步請求拿到一個信号量再發送出去
    // 阻塞等待信号量的時候是有一個timeout,逾時時間的限制的,如果超過逾時時間此時還沒等到信号量,就直接傳回了就可以了
    boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);

    // 如果說在timeout逾時時間範圍之内擷取到了信号量了以後
    if (acquired) {
        // 我把信号量對象封裝了一下,僅僅支援釋放一次的信号量對象
        // 後續如果說我要進行信号量釋放,此時可以基于這個僅僅支援釋放一次的對象來進行釋放
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
        // 從開始時間到現在為止,耗費了多少時間去擷取一個信号量就是這裡的內插補點
        // 如果說你等待擷取一個信号量耗費的時間,超過了逾時時間之後,此時就說明有問題,我們就通過僅僅支援釋放一次的
        // 對象,把擷取到的信号量去做一次釋放,然後就抛異常,就說我逾時了
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTime) {
            once.release();
            throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
        }

        // 如果說我擷取信号量等待時間沒有超過逾時時間
        // 就可以正常去封裝一個response future,裡面會設定一些東西,網絡連接配接是channel,請求id是opaque
        // 真正你在發送請求的時候,等待響應的時候,逾時時間是要變化的,我為了等待擷取信号量
        // 已經耗費了一段時間了,這個時候把逾時時間減去我擷取信号量耗費的時間
        // 響應對象回來了以後需要回調,invoke callback回調對象
        // once,釋放掉信号量,得基于僅僅支援釋放一次的信号量對象來進行釋放
        final ResponseFuture responseFuture = new ResponseFuture(
                channel,
                opaque,
                timeoutMillis - costTime,
                invokeCallback,
                once
        );
        this.responseTable.put(opaque, responseFuture);

        try {
            // 通過netty網絡連接配接發送請求,請求發送結果會回調
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    }
                    requestFail(opaque);
                    log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                }
            });
        } catch (Exception e) {
            responseFuture.release();
            log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    }
    // 如果要是在逾時時間範圍以内沒拿到信号量,此時抛異常,too much request
    else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
        } else {
            String info =
                String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                    timeoutMillis,
                    this.semaphoreAsync.getQueueLength(),
                    this.semaphoreAsync.availablePermits()
                );
            log.warn(info);
            // 逾時異常
            throw new RemotingTimeoutException(info);
        }
    }
}           

1、通過semaphoreAsync擷取一個信号量,超過逾時時間直接抛出逾時異常;

// semaphoreAsync,異步調用請求的信号量
// juc裡面的countDownLatch和semaphore在實戰開發裡是如何來運用的
// async semaphore信号量預設是64個,意思就是說并發控制,也就是說你最多同時隻能發起64個async request異步請求
// semaphore來說的話,他會預設的話會設定一些信号量,如果說信号量那完了以後,此時線程就會阻塞在這裡了
// countDownLatch,要很多線程都去進行countDown,把數字都countDown完了,此時才可以不用阻塞
// 如果說64個信号量都擷取完了之後,一個異步請求發送完了收到了一個響應了之後,此時才能還回去一個信号量
// 此時才可以有下一個異步請求拿到一個信号量再發送出去
// 阻塞等待信号量的時候是有一個timeout,逾時時間的限制的,如果超過逾時時間此時還沒等到信号量,就直接傳回了就可以了
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);           
if (acquired) {}
// 如果要是在逾時時間範圍以内沒拿到信号量,此時抛異常,too much request
else {
    if (timeoutMillis <= 0) {
        throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
    } else {
        String info =
            String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                timeoutMillis,
                this.semaphoreAsync.getQueueLength(),
                this.semaphoreAsync.availablePermits()
            );
        log.warn(info);
        // 逾時異常
        throw new RemotingTimeoutException(info);
    }
}           

2、封裝SemaphoreReleaseOnlyOnce隻支援釋放一次的對象來進行信号量釋放;

// 如果說在timeout逾時時間範圍之内擷取到了信号量了以後
if (acquired) {
    // 我把信号量對象封裝了一下,僅僅支援釋放一次的信号量對象
    // 後續如果說我要進行信号量釋放,此時可以基于這個僅僅支援釋放一次的對象來進行釋放
    final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);           

3、判斷是否逾時,可能擷取信号量時間過長導緻已經逾時了;

// 從開始時間到現在為止,耗費了多少時間去擷取一個信号量就是這裡的內插補點
// 如果說你等待擷取一個信号量耗費的時間,超過了逾時時間之後,此時就說明有問題,我們就通過僅僅支援釋放一次的
// 對象,把擷取到的信号量去做一次釋放,然後就抛異常,就說我逾時了
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
    once.release();
    throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}           

4、封裝一個ResponseFuture放入responseTable等待響應傳回;

// 如果說我擷取信号量等待時間沒有超過逾時時間
// 就可以正常去封裝一個response future,裡面會設定一些東西,網絡連接配接是channel,請求id是opaque
// 真正你在發送請求的時候,等待響應的時候,逾時時間是要變化的,我為了等待擷取信号量
// 已經耗費了一段時間了,這個時候把逾時時間減去我擷取信号量耗費的時間
// 響應對象回來了以後需要回調,invoke callback回調對象
// once,釋放掉信号量,得基于僅僅支援釋放一次的信号量對象來進行釋放
final ResponseFuture responseFuture = new ResponseFuture(
        channel,
        opaque,
        timeoutMillis - costTime,
        invokeCallback,
        once
);
this.responseTable.put(opaque, responseFuture);           

5、添加一個監聽器監聽請求發送結果,請求發送結果會進行回調;

try {
    // 通過netty網絡連接配接發送請求,請求發送結果會回調
    channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) throws Exception {
            if (f.isSuccess()) {
                responseFuture.setSendRequestOK(true);
                return;
            }
            requestFail(opaque);
            log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
        }
    });
} catch (Exception e) {
    responseFuture.release();
    log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
    throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}           
// 發送請求失敗了,移除掉這個請求對應的responseFuture
private void requestFail(final int opaque) {
    ResponseFuture responseFuture = responseTable.remove(opaque);
    if (responseFuture != null) {
        responseFuture.setSendRequestOK(false);
        responseFuture.putResponse(null);
        try {
            // 調用我們設定的異步回調接口
            executeInvokeCallback(responseFuture);
        } catch (Throwable e) {
            log.warn("execute callback in requestFail, and callback throw", e);
        } finally {
            // 一定會對我們的信号量僅僅釋放一次
            responseFuture.release();
        }
    }
}           
/**
 * Execute callback in callback executor. If callback executor is null, run directly in current thread
 */
private void executeInvokeCallback(final ResponseFuture responseFuture) {
    boolean runInThisThread = false;
    // 拿到一個異步回調的線程池
    ExecutorService executor = this.getCallbackExecutor();
    if (executor != null) {
        try {
            // 開啟一個線程異步的去回調我們的invoke callback
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        responseFuture.executeInvokeCallback();
                    } catch (Throwable e) {
                        log.warn("execute callback in executor exception, and callback throw", e);
                    } finally {
                        // 如果是異步調用的話,在這裡完成異步回調了以後,就會去進行一個信号量釋放
                        responseFuture.release();
                    }
                }
            });
        } catch (Exception e) {
            runInThisThread = true;
            log.warn("execute callback in executor exception, maybe executor busy", e);
        }
    } else {
        runInThisThread = true;
    }

    if (runInThisThread) {
        try {
            responseFuture.executeInvokeCallback();
        } catch (Throwable e) {
            log.warn("executeInvokeCallback Exception", e);
        } finally {
            responseFuture.release();
        }
    }
}           
// 如果說響應回來了以後,執行invoke回調
public void executeInvokeCallback() {
    if (invokeCallback != null) {
        // 通過cas確定invoke callback僅僅隻能回調一次
        if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
            // 執行一次異步的回調
            invokeCallback.operationComplete(this);
        }
    }
}           

五、Oneway請求

  1. 标記本次請求為oneway請求;
  2. 通過semaphoreOneway擷取信号量,如果逾時則抛出逾時異常;
  3. 封裝SemaphoreReleaseOnlyOnce隻支援釋放一次的對象來進行信号量釋放;
  4. 添加一個監聽器監聽請求發送結果,發送成功,失敗都可以,失敗隻列印告警日志;

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImpl

// 發送一個oneway請求,隻要把請求發送出去就可以了,不要去等待響應
    public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        // 标記本次請求為oneway請求
        request.markOnewayRPC();

        // 同時最多隻可以有指定數量的oneway請求發送出去
        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
            try {
                // 隻要我的一個oneway請求發送出去了,發送成功了,失敗了,都可以,此時就把信号量做一個釋放
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        once.release();
                        if (!f.isSuccess()) {
                            log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                        }
                    }
                });
            } catch (Exception e) {
                once.release();
                log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
            } else {
                String info = String.format(
                    "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
                    timeoutMillis,
                    this.semaphoreOneway.getQueueLength(),
                    this.semaphoreOneway.availablePermits()
                );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }           

1、标記本次請求為oneway請求;

// 标記本次請求為oneway請求
request.markOnewayRPC();           
public void markOnewayRPC() {
    int bits = 1 << RPC_ONEWAY;
    this.flag |= bits;
}           

2、通過semaphoreOneway擷取信号量,如果逾時則抛出逾時異常;

// 同時最多隻可以有指定數量的oneway請求發送出去
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);           
} else {
    if (timeoutMillis <= 0) {
        throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
    } else {
        String info = String.format(
            "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
            timeoutMillis,
            this.semaphoreOneway.getQueueLength(),
            this.semaphoreOneway.availablePermits()
        );
        log.warn(info);
        throw new RemotingTimeoutException(info);
    }
}           

3、封裝SemaphoreReleaseOnlyOnce隻支援釋放一次的對象來進行信号量釋放;

final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);           
public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
    this.semaphore = semaphore;
}

public void release() {
    if (this.semaphore != null) {
        // 基于atomic實作了基于CAS的僅僅支援一次的釋放
        // cas操作,多線程并發的時候,僅僅會跑成功一次,僅僅可以讓我們的這次信号量擷取,就釋放一次
        if (this.released.compareAndSet(false, true)) {
            this.semaphore.release();
        }
    }
}           

4、添加一個監聽器監聽請求發送結果,回調時釋放信号量,發送成功,失敗都可以,失敗隻列印告警日志;

// 隻要我的一個oneway請求發送出去了,發送成功了,失敗了,都可以,此時就把信号量做一個釋放
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) throws Exception {
        once.release();
        if (!f.isSuccess()) {
            log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
        }
    }
});           
public void release() {
    if (this.semaphore != null) {
        // 基于atomic實作了基于CAS的僅僅支援一次的釋放
        // cas操作,多線程并發的時候,僅僅會跑成功一次,僅僅可以讓我們的這次信号量擷取,就釋放一次
        if (this.released.compareAndSet(false, true)) {
            this.semaphore.release();
        }
    }
}           

六、判斷請求是否逾時

/**
 * <p>
 * This method is periodically invoked to scan and expire deprecated request.
 * 這個方法周期性的運作,去掃描發送出去的請求是否逾時了,是誰周期性的來調用他呢,肯定是有一個定時排程的線程
 * 應該是在他的子類裡
 * </p>
 */
public void scanResponseTable() {
    final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
    Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Integer, ResponseFuture> next = it.next();
        ResponseFuture rep = next.getValue();

        // 請求發送時間 + 逾時時間 <= 目前時間,目前時間已經超過了一個請求逾時時間了
        if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
            // 把信号量做一個釋放
            rep.release();
            it.remove(); // 把這個請求的responseFuture做一個移除
            rfList.add(rep);
            log.warn("remove timeout request, " + rep);
        }
    }

    for (ResponseFuture rf : rfList) {
        try {
            // 對于異步調用的話得進行回調這樣子
            executeInvokeCallback(rf);
        } catch (Throwable e) {
            log.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}           

七、總結

我們看完NameServer三種發送請求流程,我們可以學到裡面的countDownLatch和semaphore在實戰開發裡是如何來運用的;

下文我們還會分析,如果發送的請求一直拿不的響應NameServer是怎麼處理的,我們是不是也可以學習一下?

繼續閱讀