最近花了我幾個月的業餘時間,對equeue做了一個重大的改造,消息持久化采用本地寫檔案的方式。到現在為止,總算完成了,是以第一時間寫文章分享給大家這段時間我所積累的一些成果。
昨天,我寫過一篇關于equeue 2.0性能測試結果的文章,有興趣的可以看看。
之前equeue的消息持久化是采用sql server的。一開始我覺得沒什麼問題,采用的是異步定時批量持久化,使用sqlbulkcopy的方法,這個方法測試下來,批量插入消息的性能還不錯,就決定使用了。一開始我并沒有在使用到equeue後做內建的性能測試。在功能上确實沒什麼問題了。而且使用db持久化也有很多好處,比如消息查詢很簡單,db天生支援各種方式的查詢。删除消息也非常簡單,一條delete語句即可。是以功能實作比較順利。但後來當我對equeue做性能測試時,發現一些問題。當資料庫伺服器和broker本身部署在不同的伺服器上時,持久化消息也會走網卡,消耗帶寬,影響消息的發送和消費的tps。而如果資料庫伺服器部署在broker同一台伺服器上,則因為sqlserver本身也會消耗cpu以及記憶體,也會影響broker的消息發送和消費的tps。另外sqlbulkcopy的速度,再本身機器正在接收大量的發送消息和拉取消息的請求時,會不太穩定。經過一些測試,發現整個equeue broker的性能不太理想。然後又想想,broker伺服器有有一個硬體一直沒有好好利用起來,那就是硬碟。假設我們的消息是持久化到本地硬碟的,順序寫檔案,就應該能解決sql server的問題了。是以,開始調研如何實作檔案持久化消息的方案了。
之前消息存儲在sql server,如果消費者每次讀取消息時,總是從資料庫去讀取,那對資料庫就是不斷的寫入和讀取,性能不太理想。是以當初的思路是,盡量把最近可能要被消費的消息緩存在本地記憶體中。當初的做法是設計了一個很大的concurrentdictionary<long, message>,這個字典就是存放了所有可能會被消費的消息。如果要消費的消息目前不在這個字典裡,就批量從db拉取一批出來消費。這個設計可以盡可能的避免讀取db的情況。但是帶來了另一個問題。就是我們對這個字典在高并發不斷的寫入和讀取。且這個字典裡緩存的消息又很多,到到達幾百上千萬時,gc的壓力過大,導緻很多線程都會被阻塞。嚴重影響broker的tps。
是以,基于上面的兩個主要原因,我想到了兩個思路來解決:1)采用寫檔案的方式來持久化消息;2)使用非托管記憶體來緩存将要被消費的消息;下面我們來看看這兩個設計的一些關鍵問題的設計思路。
之前一直無法駕馭寫檔案的設計。因為精細化的将資料寫入檔案,并能要精确的讀取想要的資料,實在沒什麼經驗。之前雖然也知道阿裡的rocketmq的消息持久化也是采用順序寫檔案的方式的,但是看了代碼,發現設計很複雜,一下子也比較難懂。嘗試看了多次也無法完全了解。是以一直無法掌握這種方式。有一天不經意間想到之前看過的eventstore這個開源項目中,也有寫檔案的設計。這個項目是cqrs架構之父greg young所主導的開源項目,是一個專門為es(event sourcing)設計模式中提供儲存事件流支援的事件流存儲系統。于是下定決心專研其源碼,看c#代碼肯定還是比java容易,呵呵。經過一段時間的摸索之後,基本學到了它是如何寫檔案以及如何讀檔案的。了解了很多設計思路。然後,在看懂了eventstore的檔案存儲設計之後,再去看rocketmq的檔案持久化的設計,發現驚人的相似。原來看不懂的代碼現在也能看懂了,因為思路差不多的。是以,這給我開始動手提供了很大的信心。經過自己的一些準備(檔案讀寫的性能驗證)和設計思路整理後,終于開始動手了。
其實說出來也很簡單。之前一直以為寫檔案就是一個消息一行呗。這樣當我們要找哪個消息時,隻需要知道行号即可。确實,理論上這樣也挺好。但上面這兩個開源項目都不是這樣做的,而是都是采用更精細化的直接寫二進制的方式。搞清楚寫入的格式之後,還要考慮一個檔案寫不下的時候怎麼辦?因為一個檔案總是有大小的,比如1g,那超過1g後,必然要建立新的檔案,然後把消息寫入新的檔案。是以,我們就又有了chunk的概念。一個chunk就是一個檔案,假設我們現在實作了一個filemessagestore,表示對檔案持久化的封裝,那這個filemessagestore肯定維護了一堆的chunk。然後我們也很容易想到一點,就是chunk有3種狀态:1)new,表示剛建立的chunk,這種chunk我們可以寫入新消息進去;2)completed,已寫入完成的chunk,這種chunk是隻讀的;3)ongoing的chunk,就是當filemessagestore初始化時,要從磁盤的某個chunk的目錄下加載所有的chunk檔案,那不難了解,最後一個檔案之前的chunk檔案應該都是completed的;最後一個chunk檔案可能寫入了一半,就是之前沒完全用完的。是以,本質上new和ongoing的chunk其實是一樣的,隻是初始化的方式不同。
至此,我們知道了寫檔案的兩個關鍵思路:1)按二進制寫;2)拆分為chunk檔案,且每個chunk檔案有狀态;按二進制寫主要的思路是,假如我們目前要寫入的消息的二進制數組大小為100個位元組,也就是說消息的長度為100,那我們可以先把消息的長度寫入檔案,再接着寫入消息本身。這樣我們讀取消息時,隻要知道了寫入消息長度時的那個position,就能先讀取到消息的長度,然後就能知道接下來要讀取多少位元組為消息内容。進而能正确讀取消息出來。
另外再分享一點,eventstore中,寫入一個事件到檔案中時,還會在寫入消息内容後再寫入這個消息的長度到檔案裡。也就是說,寫入一個資料到檔案時,會在頭尾都寫入該資料的長度。這樣做的好處是什麼呢?就是當我們想從後往前讀資料時,也能友善的做到,因為每個資料的前後都記錄了該資料的長度。這點應該不難了解吧?而eventstore是一個面向流的存儲系統,我們對事件流确實可能從前往後讀,也可能是從後往前讀。另外這個設計還有一個好處,就是起到了校驗資料合法性的目的。當我們根據長度讀取資料後,再資料之後再讀取一個長度,如果這兩個長度一緻,那資料應該就沒問題的。在rocketmq中,是通過crc校驗的方式來保證讀取的資料沒有問題。我個人還是比較喜歡eventstore的做法。是以equeue裡現在寫入資料就是這樣做的。
上面我介紹了一種寫入不定長資料到檔案的設計思路,這種設計是為了解決寫入消息到檔案的情況,因為消息的長度是不定的。在equeue中,我們還有一另一種寫檔案的場景。就是隊列資訊的持久化。equeue的架構是一個topic下有多個queue,每個queue裡有很多消息,消費者負載均衡是通過給消費者配置設定均勻數量的queue的方式來達到的。這樣我們隻要確定寫入queue的消息是均勻的,那每個consumer消費到的消息數就是均勻的。那一個queue裡記錄的是什麼呢?就是一個消息和其在隊列的位置的對應關系。假設消息寫入在檔案的實體位置為10000,然後這個消息在queue裡的索引是100,那這個隊列就會把這兩個位置對應起來。這樣當我們要消費這個queue中索引為100的消息時,就能找到這個消息在檔案中的實體位置為10000,就能根據這個位置找到消息的内容了。如果是托管記憶體,我們隻需要弄一個dictionary,key是消息在隊列中的offset,value是消息在檔案中的實體offset即可。這樣我們有了這個dict,就能輕松建立起對應關系了。但上面我說過,這種巨大的dict是要占用記憶體的,會有gc的問題。是以更好的辦法是,把這個對應關系也寫入檔案,那怎麼做呢?這時就又需要更精細化的設計了。想到了其實也很簡單,這個設計我是從rocketmq中學到的。就是我們設計一種固定長度的結構體,這個結構體裡就存放一個資料,就是消息在檔案的實體位置(為了後面好表達,我命名為messageposition),一個long值,一個long的長度是8個位元組。也就是說,這個檔案中,每個寫入的資料的長度都是8個位元組。假設我們一個檔案要儲存100w個messageposition。那這個檔案的長度就是100w * 8這麼多位元組,大概為7.8mb。那麼這樣做有什麼好處呢?好處就是,假如我們現在要消費這個queue裡的第一個消息,那這個消息的messageposition在這個檔案中的位置0,第二個消息在這個檔案中的位置是8,第三個就是16,以此類推,第n 個消息就是(n-1) * 8。也就是說,我們無須顯式的把消息在隊列中的位置資訊也寫入到檔案,而是通過這樣的固定算法,就能精确的算出queue中某個消息的messageposition是寫入在檔案的哪個位置。然後拿到了messageposition之後,就能從message的chunk檔案中讀取到這個消息了。
通過上面的分析,我們知道了,producer發送一個消息到broker時,broker會寫兩次磁盤。一次是現将消息本身寫入磁盤(message chunk裡),另一次是将消息的寫入位置寫入到磁盤(queue chunk裡)。細心的朋友可能會問,假如我第一次寫入成功,但第二次寫入時失敗,比如正好機器斷電或者目前broker伺服器正好出啥問題 了,沒有寫入成功。那怎麼辦呢?這個沒有什麼大的影響。因為首先這種情況會被認為是消息發送失敗。是以producer還會重新發送該消息,然後broker收到消息後還會再做一次這兩個寫入操作。也就是說,第一次寫入的消息内容永遠也不會用到了,因為那個寫入位置永遠也不會在queue chunk裡有記錄。
下面的代碼展示了寫消息到檔案的核心代碼:


storemessage方法内部實作:


queue.addmessage方法的内部實作:
chunkwriter的内部實作:


當然,我上面為了簡化問題的複雜度。是以沒有引入關于如何根據某個全局的messageposition找到其在哪個message chunk的問題。這個其實也很好做,我們首先固定好每個message chunk檔案的大小。比如大小為256mb,然後我們為每個chunk檔案設計一個chunkheader,每個chunk檔案總是先把這個chunkheader寫入檔案,這個header裡記錄了這個檔案的起始位置和結束位置,以及檔案的大小。這樣我們根據某個messageposition計算其在哪個chunk檔案時,隻需要把這個messagepositon對chunk的大小做取摸操作即可。根據資料的位置找其在哪個chunk的代碼看起來如下面這樣這樣:


代碼很簡單,就不多講了。拿到了chunk對象後,我們就可以把dataposition傳給chunk,然後chunk内部把這個全局的dataposition轉換為本地的一個位置,就能準确的定位到這個資料在目前chunk檔案的實際位置了。将全局位置轉換為本地的位置的算法也很簡單直接:


隻需要把這個全局的位置減去目前chunk的資料開始位置,就能知道這個全局位置相對于目前chunk的本地位置了。
好了,上面介紹了消息如何寫入的主要思路以及如何讀取資料的思路。
另外一點還想提一下,就是關于刷盤的政策。一般我們寫資料到檔案後,是需要調用檔案流的flush方法的,確定資料最終刷入到了磁盤上。否則資料就還是在緩沖區裡。當然,我們需要注意到,即便調用了flush方法,資料可能也還沒真正邏輯到磁盤,而隻是在作業系統内部的緩沖區裡。這個我們就無法控制了,我們能做到的是調用了flush方法即可。那當我們每次寫入一個資料到檔案都要調用flush方法的話,無疑性能是低下的,是以就有了所謂的異步刷盤的設計。就是我們寫入消息後不立即調用flush方法,而是采用一個獨立的線程,定時調用flush方法來實作刷盤。目前equeue支援同步刷盤和異步刷盤,開發者可以自己配置決定采用哪一種。異步刷盤的間隔預設是100ms。當我們在追求高吞吐量時,應該考慮異步刷盤,但要求資料可靠性更高但對吞吐量可以低一點時,則可以使用同步刷盤。如果又要高吞吐又要資料高可靠,那就隻有一個辦法了,呵呵。就是多增加一些broker機器,通過叢集來彌補單台broker寫入資料的瓶頸。
假設我們現在要從一個檔案讀取資料,且是多線程并發的讀取,要怎麼設計?一個辦法是,每次讀取時,建立檔案流,然後建立streamreader,然後讀取檔案,讀取完成後釋放streamreader并關閉檔案流。但每次要讀取檔案的一個資料都要這樣做的話性能不是太好,因為我們反複的建立這樣的對象。是以,這裡我們可以使用對象池的概念。就是chunk内部,預先建立好一些reader,當需要讀檔案時,擷取一個可用的reader,讀取完成後,再把reader歸還到對象池裡。基于這個思路,我設計了一個簡單的對象池:


當一個chunk初始化時,我們預先初始化好固定數量(可配置)的reader對象,并把這些對象放入一個concurrentqueue裡(對象池的作用),然後要讀取資料時,從從concurrentqueue裡拿一個可用的reader即可,如果目前并發太高拿不到怎麼辦,就等待直到拿到為止,目前我是等待1ms後繼續嘗試拿,直到最後拿到為止。然後returnreaderworkitem就是資料讀取完之後歸還reader到對象池。就是不是很簡單哦。這樣的設計,可以避免不斷的建立檔案流和reader對象,可以避免gc的副作用。
大家知道,當broker重新開機時,我們是需要掃描磁盤上chunk目錄下的所有chunk檔案的。那怎麼掃描呢?上面其實我也簡單提到過。首先,我們可以對每個chunk檔案的檔案名的命名定義一個規則,第一個chunk檔案的檔案名比如為:message-chunk-000000000,第二個為:message-chunk-000000001,以此類推。那我們掃描時,隻要先把所有的檔案名擷取到,然後對檔案名升序排序。那最後一個檔案之前的檔案肯定都是寫入完全了的,即上面我說的completed狀态的,而最後一個檔案是還沒有寫入完的,還可以接着寫。是以我們初始化時,隻需要先初始化最後一個之前的所有chunk檔案,最後再初始化最後一個檔案即可。這裡我所說的初始化不是要把整個chunk檔案的内容都加載到記憶體,而是隻是讀取這個檔案的chunkheader的資訊維護在記憶體即可。有了header資訊,我們就可以為後續的資料讀取提供位置計算了。是以,整個加載過程是很快的,讀取100個chunk檔案的chunkheader也不過一兩秒的時間,完全不影響broker的啟動時間。對于初始化completed的chunk比較簡單,隻需要讀取chunkheader資訊即可。但是初始化最後一個檔案比較麻煩,因為我們還要知道這個檔案目前寫入到哪裡了?進而我們可以從這個位置的下一個位置接着往下寫。那怎麼知道這個檔案目前寫入到哪裡了呢?其實比較複雜。有很多技術,我看到rocketmq和eventstore這兩個開源項目中都采用了checkpoint的技術。就是當我們每次寫入一個資料到檔案後,都會更新一下checkpoint,即表示目前寫入到這個檔案的哪裡了。然後這個checkpoint值我們也是定時異步儲存到某個獨立的小檔案裡,這個檔案裡隻儲存了這個checkpoint。這樣的設計有一個問題,就是假如資料寫入了,但由于checkpoint的儲存不是實時的,是以理論上會出現checkpint值會小于實際檔案寫入的位置的情況。一般我們忽略這種情況即可,即可能會存在初始化時,下次寫入可能會覆寫一定的之前已經寫入的資料,因為checkpoint可能是稍微老一點的。
而我在設計時,希望能再嚴謹一點,取消checkpoint的設計,而是采用在初始化ongoing狀态的chunk檔案時,從檔案的頭開始不斷往下讀,當最後無法往下讀時,我們就知道這個檔案我們目前寫入到哪裡了。那怎麼知道無法往下讀了呢?也就是說怎麼确定後續的檔案内容不是我們寫入的?也很簡單。對于不固定資料長度的chunk來說,由于我們每次寫入一個資料時都是同時在前後寫入這個資料的長度;是以我們再初始化讀取這個檔案時,可以借助這一點來校驗,但出現不符合這個規則的資料時,就認為後續不是正常的資料了。對于固定長度的chunk來說,我們隻要保證每次寫入的資料的資料是非0了。而對于equeue的場景,固定資料的chunk裡存儲的都是消息在message chunk中的全局位置,一個long值;但這個long值我們正常是從0開始的,怎麼辦呢?很簡單,我們寫入messageposition時,總是加1即可。即假如目前的messageposition為0,那我們實際寫入1,如果為100,則實際寫入的值是101。這樣我們就能確定這個固定長度的chunk檔案裡每個資料都是非0的。然後我們在初始化這樣的chunk檔案時,隻要不斷讀取固定長度(8個位元組)的資料,當出現讀取到的資料為0時,就認為已經到頭了,即後續的不是我們寫入的資料了。然後我們就能知道接下來要從哪裡開始讀取了哦。
上面我介紹了如何讀檔案的思路。我們也知道了,我們是在消費者要消費消息時,從檔案讀取消息的。但對從檔案讀取消息總是沒有比從記憶體讀取消息來的快。我們前面的設計都沒有把記憶體好好利用起來。是以我們能否考慮把未來可能要消費的chunk檔案的内容直接緩存在記憶體呢?這樣我們就可以避免對檔案的讀取了。肯定可以的。那怎麼做呢?前面我提高多,曾經我們用托管記憶體中的concurrentdictionary<long, message>這樣的字典來緩存消息。我也提到這會帶來垃圾回收而影響性能的問題。是以我們不能直接這樣簡單的設計。經過我的一些嘗試,以及從eventstore中的源碼中學到的,我們可以使用非托管記憶體來緩存chunk檔案。我們可以使用marshal.allochglobal來申請一塊完整的非托管記憶體,然後再需要釋放時,通過marshal.freehglobal來釋放。然後,我們可以通過unmanagedmemorystream來通路這個非托管記憶體。這個是核心思路。那麼怎樣把一個chunk檔案緩存到非托管記憶體呢?很簡單了,就是掃描這個檔案的所有内容,把内容都寫入記憶體即可。代碼如下:


代碼很簡單,不用多解釋了。需要注意的是,上面這個方法針對的是completed狀态的chunk,即已經寫入完成的chunk的。已經寫入完全的chunk是隻讀的,不會再發生更改,是以我們可以随便緩存在記憶體中。
那對于新建立出來的chunk檔案呢?正常情況下,消費者來得及消費時,我們總是在不斷的寫入最新的chunk檔案,也在不斷的從這個最新的chunk檔案讀取消息。那我們怎麼確定消費最新的消息時,也不需要從檔案讀取呢?也很簡單,就是在建立一個chunk檔案時,如果記憶體足夠,也同時建立一個一樣大小的基于非托管記憶體的chunk。然後我們再寫入消息到檔案chunk成功後,再同時寫入這個消息到非托管記憶體的chunk。這樣,我們在消費消息,讀取消息時總是首先判斷目前chunk是否關聯了一個非托管記憶體的chunk,如果有,就優先從記憶體讀取即可。如果沒有才從檔案chunk讀取。
但是從檔案讀取時,可能會遇到一個問題。就是我們剛寫入到檔案的資料可能無法立即讀取到。因為寫入的資料沒有立即刷盤,是以無法通過reader讀取到。是以,我們不能僅通過判斷目前寫入的位置來判斷目前是否還有資料可以被讀取,而是考慮目前的最後一次刷盤的位置。理論上隻能讀取刷盤之前的資料。但即便這樣設計了,在如果目前硬碟不是ssd的情況下,好像也會出現讀不到資料的問題。偶爾會報錯,有朋友在測試時已經遇到了這樣的問題。那怎麼辦呢?我想了一個辦法。因為這種情況歸根接地還是因為我們邏輯上認為已經寫入到檔案的資料由于未及時刷盤或者作業系統本身的内部緩存的問題,導緻資料未能及時寫入磁盤。出現這種情況一定是最近的一些資料。是以我們如果能夠把比如最近寫入的10000(可配置)個資料都緩存在本地托管記憶體中,然後讀取時先看本地緩存的托管記憶體中有沒有這個位置的資料,如果有,就不需要讀檔案了。這樣就能很好的解決這個問題了。那怎麼確定我們隻緩存了最新的10000個資料且不會超出10000個呢?答案是環形隊列。這個名字聽起來很高大上,其實就是一個數組,數組的長度為10000,然後我們在寫入資料時,我們肯定知道這個資料在檔案中的位置的,我們可以把這個位置(一個long值)對10000取摸,就能知道該把這個資料緩存在這個數組的哪個位置了。通過這個設計確定緩存的資料不會超過10000個,且確定一定隻緩存最新的資料,如果新的資料儲存到數組的某個下标時,該下标已經存在以前已經儲存過的資料了,就自動覆寫掉即可。由于這個數組的長度不是很長,是以每什麼gc的問題。
但是光這樣還不夠,我們這個數組中的每個元素至少要記錄這個元素對應的資料在檔案中的位置。這個是為了我們在從數組中擷取到資料後,進一步校驗這個資料是否是我想要的那個位置的資料。這點大家應該可以了解的吧。下面這段代碼展示了如何從環形數組中讀取想要的資料:


_cacheitems是目前chunk内的一個環形數組,然後假如目前我們要讀取的資料的位置是dataposition,那我們隻需要先對環形資料的長度取摸,得到一個下标,即上面代碼中的index。然後就能從數組中拿到對應的資料了,然後如果這個資料存在,就進一步判斷這個資料dataposition是否和要求的dataposition,如果一緻,我們就能确定這個資料确實是我們想要的資料了。就可以傳回了。
是以,通過上面的兩種緩存(非托管記憶體+托管記憶體環形數組)的設計,我們可以確定幾乎不用再從檔案讀取消息了。那什麼時候還是會從檔案讀取呢?就是在1)記憶體不夠用了;2)目前要讀取的資料不是最近的10000個;這兩個前提下,才會從檔案讀取。一般我們線上伺服器,肯定會保證記憶體是可用的。equeue現在有兩個記憶體使用的水位。一個是當實體記憶體使用到多少百分比(預設值為40%)時,開始清理已經不再活躍的chunk檔案的非托管記憶體chunk;那什麼是不活躍呢?就是在最近5s内沒有發生過讀寫的chunk。這個設計我覺得是非常有效的,因為假如一個chunk有5s沒有發生過讀寫,那一般肯定是沒有消費者在消費它了。另一個水位是指,最多equeue broker最多使用實體記憶體的多少百分比(預設值為75%),這個應該好了解。這個水位是為了保證equeue不會把所有實體記憶體都吃光,是為了確定伺服器不會因為記憶體耗盡而當機或導緻服務不可用。
那什麼時候會出現大量使用伺服器記憶體的情況呢?我們可以推導出來的。正常情況下,消息寫入第一個chunk,我們就在讀取第一個chunk;寫入第二個chunk我們也會跟着讀取第二個chunk;假設目前寫入到了第10個chunk,那理論上前面的9個chunk之前緩存的非托管記憶體都可以釋放了。因為肯定超過5s沒有發生讀寫了。但是假如現在消費者有很多,且每個消費者的消費進度都不同,有些很快,有些很慢,當所有的消費者的消費進度正好覆寫到所有的chunk檔案時,就意味着每個chunk檔案都在發生讀取。也就是說,每個chunk都是活躍的。那此時就無法釋放任何一個chunk的非托管記憶體了。這樣就會導緻占用大量非托管記憶體了。但由于75%的水位的設計,broker記憶體的使用是不會超過實體記憶體75%的。在建立新的chunk或者嘗試緩存一個completed的chunk時,總是會判斷目前使用的實體記憶體是否已經超過75%,如果已經超過,就不會配置設定對應的非托管記憶體了。
删除消息的設計比較簡單。主要的思路是,當我們的消息已經被所有的消費者都消費過了,且滿足我們的删除政策了,就可以删除了。rocketmq删除消息的政策比較粗暴,沒有考慮消息是否經被消費,而是直接到了一定的時間就删除了,比如最多隻保留2天。這個是rocketmq的設計。equeue中,會確定消息一定是被所有的消費者都消費了才會考慮删除。然後目前我設計的删除政策有兩種:
按chunk檔案數;即設計一個閥值,表示磁盤上最多儲存多少個chunk檔案。目前預設值為100,每個chunk檔案的大小為256mb。也就是大概總磁盤占用25g。一般我們的硬碟肯定有25g的。當我們不關心消息儲存多久而隻從檔案數的角度來決定消息是否要删除時,可以使用這個政策;
按時間來删除,預設是7天,即當某個chunk是7天前建立的,那我們就可以建立了。這種政策是不關心chunk總共有多少,完全根據時間的次元來判斷。
實際上,應該可能還有一些需求希望能把兩個政策合起來考慮的。這個目前我沒有做,我覺得這兩種應該夠了。如果大家想做,可以自己擴充的。
另外,上面我說過equeue中目前有兩種chunk檔案,一種是存儲消息本身的,我叫做message chunk;一種是存儲隊列資訊的,我叫做queue chunk;而queue chunk的資料是依賴于message chunk的。上面我說的兩種删除政策是針對message chunk而言的。而queue chunk,由于這個依賴性,我覺得比較合理的方式是,隻需要判斷目前queue chunk中的所有的消息對應的message chunk是否已經都删除了,如果是,難說明這個queue chunk也已經沒意義了,就可以删除了。但隻要這個queue chunk中至少還有一個消息的chunk檔案沒删除,那這個queue chunk就不會删除。
上面這個隻是思路哦,真實的代碼肯定比這個複雜,呵呵。有興趣的朋友還是要看代碼的。
之前用sql server的方式,由于db很容易查消息,是以查詢消息不是大問題。但是現在我們的消息是放在檔案裡的,那要怎麼查詢呢?目前支援根據消息id來查詢。當producer發送一個消息到broker,broker傳回結果裡會包含消息的id。producer的正确做法應該是要用日志或其他方式記錄這個id,并最好和自己的目前業務消息的某個業務id一起記錄,比如commandid或者eventid,這樣我們就能根據我們的業務id找到消息id,然後根據消息id找到消息内容了。那消息id現在是怎麼設計的呢?也是受到rocketmq的啟發,消息id由兩部分組成:1)broker的ip;2)消息在broker的檔案中的全局位置;這樣,當我們要根據某個消息id查詢時,就可以先定位到這個消息在哪個broker上,也同時知道了消息在檔案的哪個位置了,這樣就能最終讀取這個消息的内容了。
為什麼要這樣設計呢?如果我們的broker沒有叢集,那其實不需要包含broker的ip;這個設計是為了未來equeue broker會支援叢集的,那個時候,我們就必須要知道某個消息id對應的broker是哪個了。
equeue中,每個queue,都會有一個對應的consumer。消費進度就是這個queue目前被消費到哪裡了,一個offset值。比如offset為100,就表示目前這個queue已經消費到第99(因為是從0開始的)個位置的消息了。因為一個broker上有很多的queue,比如有100個。而我們現在是使用檔案的方式來存儲資訊了。是以自然消費進度也是用檔案了。但由于消費進度的資訊很少,也不是遞增的形式。是以我們可以簡單設計,目前equeue采用一個檔案的方式來儲存所有queue的消費進度,檔案内容為json,json裡記錄了每個queue的消費進度。檔案内容看起來像下面這樣:
上面的json辨別一個名為samplegroup的consumergroup,他消費了一個名為topic1的topic,然後這個topic下的每個queue的消費進度記錄了下來。如果有另一個consumergroup,也消費了這個topic,那消費進度是隔離的。如果還不清楚consumergroup的同學,要去看一下我之前寫的equeue的文章了。
到目前為止,還有沒有其他可優化的大的地方呢?有。之前我做equeue時,總是把消息從資料庫讀取出來,然後構造出消息對象,再把消息對象序列化為二進制,再傳回給consumer。這裡涉及到從db拿出來,再序列化為二進制。學習了rocketmq的代碼後,我們可以做的更聰明一點。因為其實基于檔案存儲時,我們從檔案裡拿出來的已經是二進制了。是以可以直接把二進制傳回給消費者即可。不需要先轉換為對象再做序列化了。通過這個設計的改進,我們現在的消費者消費消息,可以說無任何瓶頸了,非常快。
在測試寫檔案的這個版本時,我們很希望知道每個chunk的讀寫情況的統計,進而确定設計是正确的。是以,我給equeue的chunk增加了實時統計chunk讀寫的統計服務。目前我們在運作equeue自帶的例子時,broker會每個一秒列印出所有chunk的讀寫情況,這個特性極大的友善我們判斷消息的發送和消費是否正常,消費是否有延遲等。
這次我給equeue的web背景管理控制台也完善了一下隊列的增加和減少的設計。增加隊列(即隊列的擴容)比較簡單,直接新增即可。但是當我們要删除一個隊列時,怎樣安全的删除呢?主要是要確定删除這個隊列時,已經沒有producer或consumer在使用這個隊列了。要怎麼做到呢?我的思路是,為每個queue對象設計兩個屬性,表示對producer是否可見,對consumer是否可見。當我們要删除某個queue時,可以:1)先讓其對producer不可見,這樣producer後續就不會再發送新的消息到這個隊列了;然後等待,直到這個隊列裡的消息都被所有的消費者消費掉了;然後再設定為對consumer不可見。然後再過幾秒,確定每個消費者都不會再向這個隊列發出拉取消息的請求了。這樣我們就能安全的删除這個隊列了。删除隊列的邏輯大概如如下:


代碼應該很簡單直接,不多解釋了。隊列的動态新增和删除,可以友善我們線上應付線上活動時,随時為消費者提供更高的并行消費能力,以及活動結束後去掉多餘的隊列。是非常實用的功能。
這個功能,也是非常實用的。這個版本我加了上去。以前equeue隻有topic的概念,沒有tag的概念。tag是對topic的二級過濾。比如當某個producer發送了3個消息,topic都是topic,然後tag分别是01,02,03。然後consumer訂閱了這個topic,但是訂閱這個topic時同時制定了tag,比如指定為02,那這個consumer就隻會收到一個消息。tag為01,03的消息是不會收到的。這個就是tag的功能。我覺得tag對我們是非常有用的,它可以極大的減少我們定義topic。本來我們必須要定義一個新的topic時,現在可能隻需要定義一個tag即可。關于tag的實作,我就不展開了。
終于到最後一點了,終于堅持快寫完了,呵呵。equeue web背景管理控制台現在支援消息堆積的報警了。當equeue broker上目前所有未消費的消息數達到一定的閥值時,就會發送郵件進行報警。我們可以把我們的郵件和我們的手機短信進行綁定,比如移動的139郵箱我記得就有這個功能。這樣我們就能第一時間知道broker上是否有大量消息堆積了,可以讓我們第一時間處理問題。
這篇文章感覺是我有史以來寫過的最有幹貨的一篇了,呵呵。一氣呵成,也是對我前面幾個月的所有積累知識經驗的一次性釋放吧。希望能給大家一些幫助。我寫文章比較喜歡寫思路,不太喜歡介紹如何用。我覺得一個程式員,最重要的是要學會如何思考去解決自己想解決的問題。而不是别人直接告訴你如何去解決。通過做equeue這個分布式消息隊列,也算是我自己的一個實踐過程。我非常鼓勵大家寫開源項目哦,當你專注于實作某個你感興趣的開源項目時,你就會有目标性的去學習相關的知識,你的學習就不會迷茫,不會為了學技術而學技術了。我在做equque時,要考慮各種東西,比如通信層的設計、消息持久化、整個架構設計,等等。我覺得是非常鍛煉人的。
一個人時間短暫,如果能用有限的時間做出好的東西可以造福後人,那我們來到這個世上也算沒白來了,你說對嗎?是以,我們千萬不要放棄我們的理想,雖然堅持理想很難,但也要堅持。