天天看點

RockeMQ-存儲機制-可用性政策

broker的可用性政策-快速失敗機制

具體實作在BrokerFastFailure,會執行一個定時任務掃描寫消息任務的隊列,當發現ospagecache繁忙的時候,就取出一個請求任務快速傳回,直到OSPageCache不在繁忙。

然後會周遊任務隊列,如果發現某一個請求任務已經逾時,那麼也會立即傳回避免producer堵塞。

具體實作邏輯:

private void cleanExpiredRequest() {
        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
            try {
                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                    if (null == runnable) {
                        break;
                    }

                    final RequestTask rt = castRunnable(runnable);
                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }

        while (true) {
            try {
                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                    // peek不删除元素
                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
                    if (null == runnable) {
                        break;
                    }
                    final RequestTask rt = castRunnable(runnable);
                    if (rt == null || rt.isStopRun()) {
                        break;
                    }

                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                    if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
                        if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
                            rt.setStopRun(true);
                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
                        }
                    } else {
                        break;
                    }
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }
    }      

在remote的NettyRemotingAbstract類中的processRequestCommand,會把遠端請求處理包裝成一個RequestTask 然後放到sendMessageExecutor中

final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);      

實際上會把任務放到隊列中 this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());

取出來的時候,進行強轉。final RequestTask rt = castRunnable(runnable);

繼續閱讀