天天看點

nsq topic

與Topic相關的代碼主要位于<code>nsqd/topic.go</code>中。

上一篇文字我們講解了下nsq的啟動流程。對nsq的整體架構有了一個大概的了解。本篇文章就是由大到小。對于topic這一部分進行詳盡的講解。

topic 管理着多個 channel 通過從 client 中擷取消息,然後将消息發送到 channel 中傳遞給用戶端.在 channel 初始化時會加載原有的 topic 并在最後統一執行 topic.Start(),新建立的 topic 會同步給 lookupd 後開始運作. nsqd 中通過建立建立多個 topic 來管理不同類别的頻道.

可以看到。topic 采用了 map + *Channel 來管理所有的channel. 并且也有 memoryMsgChan 和 backend 2個隊列。

下面就是 topic 的建立流程,傳入的參數參數包括,topicName,上下文環境,删除回調函數:

可以看到先執行個體化了一個Topic指針對象。初始化<code>memoryMsgChan</code>隊列, 預設1000個。并且判斷topicName是否是臨時topic,如果是的話,<code>BackendQueue</code>(這是個接口)實作了一個空的記憶體Queue. 否則使用 <code>diskqueue</code>來初始化 backend隊列。

随後,<code>NewTopic</code>函數開啟一個新的goroutine來執行<code>messagePump</code>函數,該函數負責消息循環,将進入topic中的消息投遞到channel中。

最後,<code>NewTopic</code>函數執行<code>t.ctx.nsqd.Notify(t)</code>,該函數在topic和channel建立、停止的時候調用, <code>Notify</code>函數通過執行<code>PersistMetadata</code>函數,将topic和channel的資訊寫到檔案中。

在<code>Notify</code>函數的實作時,首先考慮了資料持久化的時機,如果目前nsqd尚在初始化,則不需要立即持久化資料,因為nsqd在初始化後會進行一次統一的持久化工作,

<code>Notify</code>在進行資料持久化的時候采用了異步的方式。使得topic和channel能以同步的方式來調用Nofity而不阻塞。在異步運作的過程中, 通過<code>waitGroup</code>和監聽<code>exitChan</code>的使用保證了結束程式時goroutine能正常退出。

在執行持久化之前,<code>case n.notifyChan &lt;- v:</code>語句向<code>notifyChan</code>傳遞消息,觸發<code>lookupLoop</code>函數(<code>nsqd/lookup.go</code>中)接收<code>notifyChan</code>消息的部分, 進而實作向<code>loopupd</code>注冊/取消注冊響應的topic或channel。

用戶端通過nsqd的HTTP API或TCP API向特定topic發送消息,nsqd的HTTP或TCP子產品通過調用對應topic的<code>PutMessage</code>或<code>PutMessages</code>函數, 将消息投遞到topic中。<code>PutMessage</code>或<code>PutMessages</code>函數都通過topic的私有函數<code>put</code>進行消息的投遞,兩個函數的差別僅在<code>PutMessage</code>隻調用一次<code>put</code>, <code>PutMessages</code>周遊所有要投遞的消息,對每條消息使用<code>put</code>函數進行投遞。預設topic會優先往<code>memoryMsgChan</code> 隊列内投遞,如果記憶體隊列已滿,才會往磁盤隊列寫入,(臨時的topic磁盤隊列不做任何存儲,資料直接丢棄)

topic的Start方法就是發送了個 startChan ,這裡有個小技巧,nsq使用了select來發送這個消息,這樣做的目的是如果start被并發調用了,第二個start會直接走到default裡,什麼都不做.

那麼這個Start函數都有哪裡調用的呢。

1、 nsqd啟動的時候,觸發<code>LoadMetadata</code> 會把檔案裡的topic加載到記憶體裡,這時候會調用Start方法

2、 使用者通過請求擷取topic的時候會通過 getTopic 來擷取或者建立topic

接下來我們看下 <code>messagePump</code>, 剛才的 startChan 就是發給了這個函數,該函數在建立新的topic時通過<code>waitGroup</code>在新的goroutine中運作。該函數僅在觸發 startChan 開始運作,否則會阻塞住,直到退出。

<code>messagePump</code>函數初始化時先擷取目前存在的channel數組,設定<code>memoryMsgChan</code>和<code>backendChan</code>,随後進入消息循環, 在循環中主要處理四種消息:

接收來自<code>memoryMsgChan</code>和<code>backendChan</code>兩個go channel進入的消息,并向目前的channal數組中的channel進行投遞

處理目前topic下channel的更新

處理目前topic的暫停和恢複

監聽目前topic的删除

這兩個case語句處理進入topic的消息,關于兩個go channel的差別會在後續的部落格中分析。 從<code>memoryMsgChanbackendChan</code>讀取到的消息是<code>*Message</code>類型,而從<code>backendChan</code>讀取到的消息是<code>byte</code>數組的。 是以取出<code>backendChan</code>的消息後海需要調用<code>decodeMessage</code>函數對<code>byte</code>數組進行解碼,傳回<code>*Message</code>類型的消息。 二者都儲存在<code>msg</code>變量中。

随後是将消息投到每個channel中,首先先對消息進行複制操作,這裡有個優化,對于第一次循環, 直接使用原消息進行發送以減少複制對象的開銷,此後的循環将對消息進行複制。對于即時的消息, 直接調用channel的<code>PutMessage</code>函數進行投遞,對于延遲的消息, 調用channel的<code>StartDeferredTimeout</code>函數進行投遞。對于這兩個函數的投遞細節,後續博文中會詳細分析。

Channel的更新比較簡單,從<code>channelMap</code>中取出每個channel,構成channel的數組以便後續進行消息的投遞。 并且根據目前是否有channel以及該topic是否處于暫停狀态來決定<code>memoryMsgChan</code>和<code>backendChan</code>是否為空。

這個case既處理topic的暫停也處理topic的恢複,<code>pause</code>變量決定其究竟是哪一種操作。 Topic的暫停和恢複其實和topic的更新很像,根據是否暫停以及是否有channel來決定是否配置設定<code>memoryMsgChan</code>和<code>backendChan</code>。

<code>messagePump</code>通過監聽<code>exitChan</code>來獲知topic是否被删除,當topic的删除時,跳轉到函數的最後,輸出日志後退出消息循環。

Topic關閉和删除的實作都是調用<code>exit</code>函數,隻是傳遞的參數不同,删除時調用<code>exit(true)</code>,關閉時調用<code>exit(false)</code>。 <code>exit</code>函數進入時通過<code>atomic.CompareAndSwapInt32</code>函數判斷目前是否正在退出,如果不是,則設定退出标記,對于已經在退出的topic,不再重複執行退出函數。 接着對于關閉操作,使用<code>Notify</code>函數通知lookupd以便其他nsqd獲知該消息。

随後,<code>exit</code>函數調用<code>close(t.exitChan)</code>和<code>t.waitGroup.Wait()</code>通知其他正在運作goroutine目前topic已經停止,并等待<code>waitGroup</code>中的goroutine結束運作。

最後,對于删除和關閉兩種操作,執行不同的邏輯來完成最後的清理工作:

對于删除操作,需要清空<code>channelMap</code>并删除所有channel,然後删除記憶體和磁盤中所有未投遞的消息。最後關閉<code>backend</code>管理的的磁盤檔案。

對于關閉操作,不清空<code>channelMap</code>,隻是關閉所有的channel,使用<code>flush</code>函數将所有<code>memoryMsgChan</code>中未投遞的消息用<code>writeMessageToBackend</code>儲存到磁盤中。最後關閉<code>backend</code>管理的的磁盤檔案。

<code>flush</code>函數也使用到了default分支來檢測是否已經處理完全部消息。 由于此時已經沒有生産者向<code>memoryMsgChan</code>提供消息,是以如果出現阻塞就表示消息已經處理完畢。

在删除topic時用到的<code>Empty</code>函數跟<code>flush</code>處理邏輯類似,隻不過<code>Empty</code>隻釋放<code>memoryMsgChan</code>消息,而不儲存它們。

topic 下的源碼基本就看完了,雖然還沒有别的部分完整的完整的串聯起來,但是也可以了解到,多個 topic 在初始化時就開啟了消息循環 goroutine,執行完 Start 後開始消息分發,如果是正常的Topic,除了預設10000的記憶體隊列,還會有個硬碟隊列。topic将收到的消息分發到管理的 channel 中.每個 topic 運作的 goroutine 比較簡單,隻有一個消息分發 goroutine: messagePump.