消息隊列的概念
我們先從消息隊列說起。消息隊裡本質上就是一個pub-sub 釋出訂閱的一個模式。這裡有三個角色
- 生産者:負責生産消息
- 訂閱者:負責消費消息
- 隊列:負責存儲消息
為什麼需要消息隊列
生活中的一些應用場景
技術其實基本都是來源于生活的場景的。我記得大學期間去玩具廠打暑假工的時候。我被配置設定到其中一個組裡面。
一個玩具的制作不是全部由一個人完成的,而是每一個人做一部分,最後組合成為一個完整的玩具:當時是所有人排成一個隊列,然後前面就是一個傳送帶。上遊的人做完之後,就會放到傳送帶裡面傳送給下一個人繼續組裝。
上遊和下遊速度不比對:這裡會有一個問題,由于每一部分的難度和勞工的娴熟度不一樣,往往會造成互相等待的情況。比如我是暑假工,不是很娴熟,幹活的速度自然就會慢點,而我上遊是一個老員工,手腳麻利做得很快。每次我前面都堆了一大堆半成品,不得不催上面的大哥:搞不完了,大哥慢點。這個時候大哥就停下來跟我唠嗑。你看,這樣我的速度就會影響到大哥的速度,繼而影響整個産品的進度。
通過半成品暫存,來減少等待現象:工廠的計算錢的方式是看你完成的數量的。一開始大哥還是可以體諒的,在我幹不完的時候停下來等我,和我唠嗑,偶爾也幫我搞下,但是人家大哥也是來賺錢的,不是來和我唠嗑的。後面大哥就建議我把半成品先搬到地下暫存着,我慢慢搞,他繼續做,這樣大哥就不用停下來等我了,等到後面大哥想休息了過來幫我或者我自己加班把這些慢慢消化掉。
上面這個例子對應的消息隊列的概念:
- 生産者:大哥
- 消費者:我
- 消息隊列:傳送帶+暫存半成品的地面
是以我們可以得出,消息隊列解決幾個問題:
- 解耦上下遊,減少互相等待(大哥不用因為我的速度慢而停下來等我)
- 提供暫存功能(因為速度不比對,需要提供一個地方來暫存半成品)
- 異步處理(我自己後面加班慢慢把半成品處理完)
技術上的一些應用場景
秒殺是一個永遠繞不開的一個技能表演的場景,你技術行不行就看你能不能扛得住一個高并發的秒殺就知道了。就拿秒殺業務來講,tps 峰值在1w/s。一個秒殺業務涉及的庫存扣減,價格計算,訂單落庫。如果請求直接打到後端服務是扛不住的,是以我們需要一個消息隊列,來暫存使用者的請求消息,後端服務根據自己的消費速度從消息隊列裡面擷取消息來進行消費。這種場景,消息隊列就主要起到削峰填谷的作用,同時對消息隊列提出一個新的要求:
- 高性能高吞吐的儲存
- 支援消息堆積能力
- 消息的可靠性
消息如何存儲
從上面的一些使用場景和業務系統對消息隊列能力的要求,我們不得不思考一個問題:我們的消息應該存儲在哪裡?
消息我們可以直接在記憶體中使用數組或者隊列來存儲資料即可。性能非常高。但是有幾方面的缺點
- 資料丢失,比如異常情況伺服器當機重新開機後記憶體的消息會被丢失掉
- 資料量大的時候,記憶體放不下,或者需要高昂的成本 面對一些業務系統是不能容忍消息丢失的情況,比如訂單系統。日均幾百萬訂單來說,單純放記憶體存儲也不太可能,是以需要一款可以提供持久化的消息系統。
既然要存儲資料,就需要解決資料存哪裡?從存儲方式來看,主要有幾個方面:
- 關系型資料庫,比如mysql
- 分布式KV存儲,比如采用rocketdb實作的
- 檔案系統,log 的方式直接追加
解決了存儲之後,還要看需求是否滿足,比如性能,吞吐量,本質上就是資料結構的設計決定的。我們看看上面資料存儲方式對應的資料結構
# B-Tree vs LSM-Tree
B+tree
![image.png](https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/001b14879f144ceeb50368442798a7f9~tplv-k3u1fbpfcp-watermark.image?)
LSM
![image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/3125db45fdde420dbeda470a7af74dbf~tplv-k3u1fbpfcp-watermark.image?)
存儲 | 資料結構 | 寫性能 | 讀 |
mysql | B+ tree | 寫一條資料需要兩次寫入 1、資料寫入是按頁為機關進行寫的,假設頁的大小為B 位元組,那麼寫放大為Θ(B)(最壞的結果) 2、為了避免在寫頁的過程中出現故障,需要寫入redo log(WAL) | 既支援随機讀取又支援範圍查找的系統。讀放大為O(logBN/B),資料量大的适合性能會急劇下降,正常是b+ tree 超過4層,大約2000萬記錄是臨界點 |
rocketdb | LSM tree | Memtable/SSTable實作,寫的話也變成順序寫了(這一點是極大的優化點),但是背景會出現多路歸并算法來合并,這個過程占用磁盤IO 會到目前消息的讀寫有擾動,寫放大Θ(klogkN/B) | 讀的順序是MemTable->分層的sst ,性能會比B+ tree 略差,讀放大Θ((log2N/B)/logk) |
檔案系統 | append only log | 直接在檔案末尾追加,所有的的寫都是順序的,是以性能極高 | 不支援根據内容進行檢索,隻能根據檔案偏移量執行查詢 |
mysql 在大資料量的情況,性能會急劇下降,并且擴充性非常不友好,從這一點是直接排除了mysql 了。
分布式KV 存儲 天然的分布式系統,對大資料量和未來的擴充都問題不大,LSM tree 對寫性能和吞吐都比mysql 要好。查的時候比mysql 差一點,查詢其實是可以通過緩存等手段去做優化的,可以說是一個值得考慮選擇。
但是,滿足以上兩點性能和吞吐量最優的毫無疑問是使用檔案系統,因為消息不需要修改,讀和寫都是順序讀寫,性能極高。
但是。。。但是現實中的需求更複雜一點,我們可能需要使用多個隊列來完成不同的業務。比如一個隊列來處理訂單相關的業務,一個隊列來處理商品相關的業務等等。那麼我們該如何調整呢?我們都知道檔案 append only log 的方式是不支援根據消息的内容來搜尋的,是以如果所有的隊列的資料存在一個檔案中,顯然沒辦法滿足需求。換個思路,一個隊列一個檔案我們就可以繞開根據内容檢索的需求。貌似也是沒有問題,的确kafka 就是這麼玩的。
思考一個問題,每個隊列一個檔案,那麼讀寫還是順序的嗎?因為這個直接影響到性能 檔案數量少的情況,大體還是順序的
檔案數量大,大體上就不是那麼有序了
作為一款面向業務的高性能消息中間件,随着業務的複雜度變高,隊列數量是急劇變大的。如果要保證寫入的吞吐量和性能,還需要得所有的隊列都寫在同一個檔案。但是,按照隊列消費的場景就意味着要根據消息内容(隊列名字)來進行消費,append only log 是不支援檢索的,如何解決這個問題。想想我們在寫sql 的時候慢了,我們為了提速就會增加一個索引。萬事萬物都是想通的,這裡我們也可以根據建立一個隊列的索引,每一個隊列就是一個索引檔案。讀取資料的時候,先從索引隊列找到消息在檔案的偏移量後,在到資料檔案去讀取。這裡你可能意識到了,索引檔案的數量變大的之後,那麼對索引檔案的讀寫不就是又變成随機讀寫了嗎?性能又會急劇下降?
um um 好問題。一個一個來解決:
- 寫索引檔案的時候,我們可以改成異步寫,也就是寫完資料檔案,可以直接傳回給用戶端成功了,背景再由一個線程不停的從資料檔案擷取資料來建構索引,這樣就可以解決寫的性能瓶頸了
- 讀的話,我們要盡量想辦法繞開直接從磁盤讀,改成從記憶體讀。放在内容就意味着索引的内容要足夠小,不然根本放不下。是以這個優化的目标就變成盡量控制索引檔案的大小,放在記憶體裡面來避開磁盤讀進而提高性能
方案 | 優點 | 缺點 |
每一個queue 都單獨一個檔案 | 消費的時候不需要獨立建立一個索引,系統複雜度降低,并且性能高 | 當queue 很多的時候,并且每個queue 的資料量都不是很大情況,就會存在很多小檔案,寫和讀都講變成随機讀,性能會受到影響 |
所有queue 共享一個檔案 | 所有的寫都是順序寫的,性能比較高,可以支撐大量queue 性能也不至于下降的厲害 | 1、需要建立獨立的索引檔案,查詢資料的鍊路變長,需要先從索引查到資料再到資料檔案查詢 2、索引隊列本身也是小檔案,好在因為資料量少,基本可以常駐記憶體 3、讀變成随時讀,不過整體還是順序讀 |
rocketmq 中資料檔案稱為:commitlog,topic索引檔案稱為 consumeQueue
結論:選擇檔案系統,append only log.根據消息隊列即時消費和順序讀寫的特點,剛寫入的内容還在page cache ,就被讀走了,甚至都不需要回到磁盤,性能會非常高。看看順序讀寫的性能
資料量大了存儲怎麼辦
本地切割,大檔案變小檔案
如果所有的資料都存在一個commitlog 檔案的話,如果資料量大了,這個檔案必然會非常大,這樣對性能就會有所影響,解決也很簡單,我們大檔案切換成小檔案,每個檔案固定大小1G,寫滿了就切換到一個新的檔案
從上圖可以看到,其實commitlog 是一個目錄,下面挂着一些列大小相同的檔案。每個檔案都有一個名字,這裡的名字就是取消息最小的offset 作為檔案名字,由于每個檔案大小都是一樣的,也就意味着知道檔案名字後 加上檔案大小就可以知道這個檔案存放消息的下标範圍了。注意這裡一個很重要的點消息的offset 是全局遞增的,不再是具體檔案的偏移量。這裡就帶出一個問題,我們從consumeQueue 取出來的是offset ,如何定位到具體的檔案和目前檔案的偏移量。假如上圖的檔案大小是2,那麼我們要取offset =5的資料
- 根據檔案名稱查找offset=5 對應的檔案,可以得出fileName=04。代碼實作上是把所有的檔案按順序存到一個清單,然後轉化為查找清單小标.公式為
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
2.求出檔案的偏移量pos=offset%fileSize=5%2=1
/**
* Finds a mapped file by offset.
*
* @param offset Offset.
* @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
* @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
*/
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
// index 代表第幾個檔案,這裡減去第一個檔案的索引,因為檔案是可能被回收的,是以要減去
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
// mappedFiles 是CopyOnWriteArrayList ,是以有可能存在并發的情況導緻indexOuterOfException
}
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
// mappedFiles 是CopyOnWriteArrayList ,是以有可能存在并發的情況導緻indexOuterOfException
// 在上面異常的情況找不到,就需要周遊去查找了
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
// 這裡依然有可能找不到,比如我們原本是要找第一個檔案的,後面進來後剛好第一個檔案因為過期被删除了,是以這裡有可能被傳回第一個檔案
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
分布式存儲
上文提到,消息隊列的第一個特點就是資料量大,既然資料量大,資料如果隻存在一個機器上的話,必然面臨着瓶頸,是以我們需要把資料均衡的分發到各個機器上。思路其實也很簡單,一段很長的隊列平均切成N份,把這N份分别放到不同的機器上
從上圖我們可以看到,上半部分的隊列半平均切割成為2份,分别存儲在機器A 和機器B上。同時可以看到左下角多了一個topicA 的來把機器A和機器B的兩個隊列關聯起來,從原來的實際隊列變成了一個抽象的邏輯隊列(topic)
消息高可靠
雖然我們的消息已經分成切分成為多份放到不同的機器了,但是每一份都是都隻有一個副本,也就意味着,任何一台機器的硬碟壞掉的話,該機器上的消息就會丢失掉了,這對于錢大媽這種業務的系統是不可接受的。行業通常的做法一份資料存多個副本,并且確定所有的副本不能全都在同一台機器。問題來了,那麼這多份資料是同步雙寫還是異步雙寫呢?
方案 | 優點 | 缺點 |
同步雙寫 | 資料不會丢失 | 性能會降低,單個RT變長 |
異步雙寫 | 單個RT 更加小,性能更高,吞吐量更大 | 資料可能會丢失 |
其實每個業務場景需求是不一樣的,RocketMq 是支援可配置的
消息的内容和序列化
之前我們一直聊的都是宏觀上的設計方面,現在我們把視野聚焦到一些細節實作上.
commitlog 的消息結構
我們先看下commitlog 的消息内容
這裡重點解釋下MAGICCODE 這個字段,還是有點意思的。因為commitlog 下面每個檔案的長度是固定的,有沒有想過這樣一種場景,一條新的消息來了之後,但是剛好這個檔案所剩餘的空間不足以容納下這條消息的時候,你會怎麼做?一部分放在這個檔案,另外一部分放在新的檔案?如果是這樣新的檔案的名字是這條消息的offset 還是下條消息的offset?解析這條消息的時候就需要跨多個檔案去讀取,變得非常複雜,RocketMq 其實用的是另外一種方式,直接在最後面寫入一個結束符号告訴使用者結束了,後面的不用讀取了,這種方式就比較簡單了,雖然浪費一點點空間,但我覺得是值得的,業務消息本來并不大,并且已經限制了每條消息的大小了。這個辨別結束的辨別位就是放在MAGICCODE 來實作的,現在可以取兩個值
普通消息 MESSAGE_MAGIC_CODE = -626843481
檔案結束符号 BLANK_MAGIC_CODE = -875286124
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
// 進來就說明檔案空間不足了,寫上結束語就滾蛋了,傳回去寫下一個檔案
// 這裡的設計不是直接寫下一個檔案,而是傳回去重新進來寫,保持邏輯統一,這個設計也是挺清晰的
this.msgStoreItemMemory.clear();
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
// 寫完結束語可能還有剩,那也不管了,這樣可能有點浪費空間,那也沒辦法啦,畢竟都是順序寫的,不可能再下一個小消息來的時候再回頭寫
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
msgIdSupplier, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
org.apache.rocketmq.store.CommitLog.MessageExtEncoder#encode(org.apache.rocketmq.store.MessageExtBrokerInner)
protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
this.byteBuf.clear();
/**
* Serialize message
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (bodyLength > this.maxMessageBodySize) {
CommitLog.log.warn("message body size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageBodySize);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// 1 TOTALSIZE
this.byteBuf.writeInt(msgLen);
// 2 MAGICCODE
this.byteBuf.writeInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.byteBuf.writeInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.byteBuf.writeInt(msgInner.getQueueId());
// 5 FLAG
this.byteBuf.writeInt(msgInner.getFlag());
// 6 QUEUEOFFSET, need update later
this.byteBuf.writeLong(0);
// 7 PHYSICALOFFSET, need update later
this.byteBuf.writeLong(0);
// 8 SYSFLAG
this.byteBuf.writeInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.byteBuf.writeLong(msgInner.getBornTimestamp());
// 10 BORNHOST
ByteBuffer bornHostBytes = msgInner.getBornHostBytes();
this.byteBuf.writeBytes(bornHostBytes.array());
// 11 STORETIMESTAMP
this.byteBuf.writeLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
ByteBuffer storeHostBytes = msgInner.getStoreHostBytes();
this.byteBuf.writeBytes(storeHostBytes.array());
// 13 RECONSUMETIMES
this.byteBuf.writeInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.byteBuf.writeInt(bodyLength);
if (bodyLength > 0)
this.byteBuf.writeBytes(msgInner.getBody());
// 16 TOPIC
this.byteBuf.writeByte((byte) topicLength);
this.byteBuf.writeBytes(topicData);
// 17 PROPERTIES
this.byteBuf.writeShort((short) propertiesLength);
if (propertiesLength > 0)
this.byteBuf.writeBytes(propertiesData);
return null;
}
consumeQueue 的消息結構
有了commitlog 了,我們還需要建構一個consumeQueue 來作為索引才能夠供消費者使用,這個建構的過程是異步的,也就是資料儲存到commitlog 成功後就可以直接傳回給用戶端了,伺服器背景會啟動一個線程一直在拉取commitlog 的消息出來進行建構consumeQueue.
從上圖我們可以看出consumeQueue 每一條記錄的長度是固定的,固定為20.為啥要固定呢? 消費者來consumeQueue 取消息的時候,拿到是consumeQueue的小标,而不是真實的offset,這個時候如果是定長才可能通過資料小标快速定位到消息的位置進行擷取消息,如果是變長是沒有辦法實作的
tagsCode 的設計
消息的tag 是可選的,并且每個消息的tag長度還不一定相等,如果要維持consumeQueue 的記錄是固定為20 的話,那就意味着不能直接存放tag 的内容,這裡巧妙的設定為tag 的hashCode 。因為hash 是可能出現沖突的,意味着在服務端進行過濾消息的話可能會出現誤判,把不該由消費者消費的消息發送給了消費者,是以消費者需要在本地做二次等值過濾,畢竟hash 沖突的可能性沒有那麼高,是以犧牲一點網絡上的帶寬換來讀取消息的性能是值得的,從這裡可以看出,架構上總是有取舍的,不是完美的。
String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
if (tags != null && tags.length() > 0) {
tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
}
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
if (null == tags || tags.length() == 0) { return 0; }
return tags.hashCode();
}
另外還有一個很有意思的是延遲消息,tagsCode 存放的是消息的延遲時間,這樣就可以不用回到commitlog 就可以知道這個消息該什麼時候投放給消費者,大大提高了性能
// Timing message processing
{
/**
* 延遲消息,消息隊裡的tagsCode 存在的是延遲的時間,這樣在拿消息出來的時候可以快速的判斷這個消息是否已經到了
*/
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);
if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
}
if (delayLevel > 0) {
//這裡對延遲消息做了個特殊處理,把延遲時間放到tagCode,牛逼,支援消費的時候快速過濾
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
}
}
public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
Long time = this.delayLevelTable.get(delayLevel);
if (time != null) {
return time + storeTimestamp;
}
return storeTimestamp + 1000;
}
來看下consumeQueue 的建構源碼,通過一個背景線程從commitlog 擷取消息進行建構
private void doReput() {
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
// 這裡用一個doNext 變量來控制兩個for 循環的結束(裡面還嵌套一個for 循環),這個倒是可以借鑒下
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 從commitLog 中找出未放到消息隊列的消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 對消息做了校驗和處理延遲消息,tagCode 放的是延遲時間,友善快速處理,真是牛逼
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 這裡才是重頭戲,抓住重點
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
&& DefaultMessageStore.this.messageArrivingListener != null) {
// long輪詢 原來是在這裡實時推送消息的
// 因為用戶端隻會從主讀取消息不會從從讀取消息
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
notifyMessageArrive4MultiQueue(dispatchRequest);
}
// TODO-willJo:2021/9/24 這裡當機,上面已經實時推送消息,恢複的時候豈不是就重複推送消息了?用戶端會和自己消費的offSet 對比吧?恩,幂等還是很有必要
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.add(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
if (isExtWriteEnable()) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
if (multiQueue) {
multiDispatchLmqQueue(request, maxRetries);
}
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
零拷貝技術
普通讀寫
read(file, tmp_buf, len);
write(socket, tmp_buf, len);
從上圖可以看到,總共發生4次上下文切換和4次資料copy
- 通過系統調用讀取資料的時候,發生第一次上下文切換,從使用者态進入到核心态。緊接着發生第一次資料拷貝,通過DMA 把硬碟的資料copy 到核心的buffer
- 讀操作傳回,這個時候發生第二次上下文切換,從核心态進入到使用者态。緊接着發生第二次資料拷貝是CPU 把核心buffer 中的資料copy 到使用者空間的buffer,
- 通過系統調用寫資料的時候,發生第三次上下文切換,從使用者态進入到核心态。緊接着發生第三次資料拷貝,通過CPU從使用者空間的buffer copy 到socket buffer.
- 寫操作傳回,發生了第四次上下文切換,從核心态進入到使用者态。注意實際上資料還是沒有發送出去,這一步是沒有辦法保證的。,而是異步通過DMA 把socket buffer copy 資料到協定棧發送出去
nmap+write 優化
tmp_buf = mmap(file, len);
write(socket, tmp_buf, len);
從上圖可以看到,總共發生4次上下文切換和3次資料copy
- 通過nmap系統調用讀取資料的時候,發生第一次上下文切換,從使用者态進入到核心态。緊接着發生第一次資料拷貝,通過DMA 把硬碟的資料copy 到核心的buffer
- 讀操作傳回,這個時候發生第二次上下文切換,從核心态進入到使用者态。由于nmap 的實作是使用者态和核心态是共享緩存的,是以是不需要進行資料copy.
- 通過系統調用寫資料的時候,發生第三次上下文切換,從使用者态進入到核心态。緊接着發生第三次資料拷貝,通過CPU從使用者空間的buffer copy 到socket buffer.
- 寫操作傳回,發生了第四次上下文切換,從核心态進入到使用者态。注意實際上資料還是沒有發送出去,這一步是沒有辦法保證的。,而是異步通過DMA 把socket buffer copy 資料到協定棧發送出去
sendfile+DMA gather copy 優化
sendfile(socket, file, len);
從上圖可以看到,總共發生2次上下文切換和2次資料copy
- 通過sendfile系統調用讀取資料的時候,發生第一次上下文切換,從使用者态進入到核心态。緊接着發生第一次資料拷貝,通過DMA 把硬碟的資料copy 到核心的buffer
- 這個時候是沒有資料 copy 到socket buffer 的,拷貝過去的隻是一些檔案描述符和資料長度,是非常輕量的。 sendfile 傳回成功,發生第二次上下文切換,從核心态,切換到使用者态,另外異步通過DMA 把socket buffer copy 資料到協定棧發送出去
RocketMq 使用nmap+write 進行優化
commitlog 目錄下面挂載的檔案,在RocketMq 對應的類是MappedFile,我們先看看這個類是怎麼建立和初始化的。 當在寫入消息的時候,需要判斷該檔案是否存在
if (null == mappedFile || mappedFile.isFull()) {
// 檔案寫滿後,會寫新的檔案,建立新的檔案的時候必定會有毛刺,其實rocketmq 是會提前配置設定檔案來解決這個問題的,并且會在新的檔案每一頁新0
// 和上鎖(mlock)來確定新檔案在虛拟記憶體中,不至于導緻中斷回盤帶來的性能影響,确實有點牛逼
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
mappedFile 初始化
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
/**
* 這裡通過mmap 的技術,來減少使用者态和核心态的資料拷貝,進而提成性能(kafka 其實用的是sendfile 技術)
* 記憶體映射檔案的實際檔案寫入時機可能是作業系統定期調用,髒頁過大,程式主動調用byteBuffer.force,是以後面一個flush 失敗的時候不理其實作業系統也會刷進去
*/
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
建立一個nmap 的代價是很高的,并且剛建立的檔案還沒有進行加載到記憶體裡面,依然是在磁盤。這種剛好一個消息進來的時候,檔案已經寫滿了,再去建立一個新的檔案,就會出現毛刺,是以RocketMq 采用了提前預熱的方式,通過開一個背景線程去掃描mappedFile,發現到達一個配置門檻值的時候就去預熱下下一個檔案
// pre write mappedFile 這裡提前預熱檔案
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
// 每一個頁都是寫個0
byteBuffer.put(i, (byte) 0);
// force flush when flush disk type is sync
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
//使用mlock 盡量鎖住虛拟記憶體在實體記憶體中
this.mlock();
}
我們可以看到預熱的方式就是往檔案裡面寫0,并且使用了mlock 把記憶體鎖住在實體記憶體,還是很牛逼的。
為什麼RocketMq 用nmap 而不是sendfile
這裡要明确一點是,sendfile 是不經過使用者空間,直接把資料從檔案系統通過socket 發送出去的了,但是RocketMq的消費有時候需要根據消息的内容做messageFilter的過濾确認後才能傳回給使用者,也就意味着這個資料需要經過使用者态處理,是以是沒有辦法使用sendfile 。但是也不是說不能優化,如果沒有messageFilter 過濾的是否就意味着可以直接使用sendfile 了呢
org.apache.rocketmq.store.DefaultMessageStore#getMessage
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
// 這裡把消息取出來後還要進行一層過濾
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}