為什麼使用消息隊列?
什麼業務場景,這個業務場景有個什麼技術挑戰,如果不用MQ可能會很麻煩,但是你現在用了MQ之後帶給了你很多的好處。消息隊列的常見使用場景,其實場景有很多,但是比較核心的有3個:解耦、異步、削峰。
解耦:
A系統發送個資料到BCD三個系統,接口調用發送,那如果E系統也要這個資料呢?那如果C系統現在不需要了呢?現在A系統又要發送第二種資料了呢?而且A系統要時時刻刻考慮BCDE四個系統如果挂了咋辦?要不要重發?我要不要把消息存起來?
你需要去考慮一下你負責的系統中是否有類似的場景,就是一個系統或者一個子產品,調用了多個系統或者子產品,互相之間的調用很複雜,維護起來很麻煩。但是其實這個調用是不需要直接同步調用接口的,如果用MQ給他異步化解耦,也是可以的,你就需要去考慮在你的項目裡,是不是可以運用這個MQ去進行系統的解耦。
異步:
A系統接收一個請求,需要在自己本地寫庫,還需要在BCD三個系統寫庫,自己本地寫庫要30ms,BCD三個系統分别寫庫要300ms、450ms、200ms。最終請求總延時是30 + 300 + 450 + 200 = 980ms,接近1s,異步後,BCD三個系統分别寫庫的時間,A系統就不再考慮了。
削峰:
每天0點到16點,A系統風平浪靜,每秒并發請求數量就100個。結果每次一到16點~23點,每秒并發請求數量突然會暴增到1萬條。但是系統最大的處理能力就隻能是每秒鐘處理1000個請求啊。怎麼辦?需要我們進行流量的削峰,讓系統可以平緩的處理突增的請求。
消息隊列有什麼優點和缺點?
優點上面已經說了,就是在特殊場景下有其對應的好處,解耦、異步、削峰。
缺點呢?
系統可用性降低
系統引入的外部依賴越多,越容易挂掉,本來你就是A系統調用BCD三個系統的接口就好了,ABCD四個系統好好的,沒啥問題,你偏加個MQ進來,萬一MQ挂了怎麼辦?MQ挂了,整套系統崩潰了,業務也就停頓了。
系統複雜性提高
硬生生加個MQ進來,怎麼保證消息沒有重複消費?怎麼處理消息丢失的情況?怎麼保證消息傳遞的順序性?
一緻性問題
A系統處理完了直接傳回成功了,人都以為你這個請求就成功了;但是問題是,要是BCD三個系統那裡,BD兩個系統寫庫成功了,結果C系統寫庫失敗了,你這資料就不一緻了。
是以消息隊列實際是一種非常複雜的架構,你引入它有很多好處,但是也得針對它帶來的壞處做各種額外的技術方案和架構來規避掉。
常見消息隊列的比較

消息的重複
原因
第一類原因
消息發送端應用的消息重複發送,有以下幾種情況。
l 消息發送端發送消息給消息中間件,消息中間件收到消息并成功存儲,而這時消息中間件出現了問題,導緻應用端沒有收到消息發送成功的傳回因而進行重試産生了重複。
l 消息中間件因為負載高響應變慢,成功把消息存儲到消息存儲中後,傳回“成功”這個結果時逾時。
l 消息中間件将消息成功寫入消息存儲,在傳回結果時網絡出現問題,導緻應用發送端重試,而重試時網絡恢複,由此導緻重複。
可以看到,通過消息發送端産生消息重複的主要原因是消息成功進入消息存儲後,因為各種原因使得消息發送端沒有收到“成功”的傳回結果,并且又有重試機制,因而導緻重複。
第二類原因
消息到達了消息存儲,由消息中間件進行向外的投遞時産生重複,有以下幾種情況。
l 消息被投遞到消息接收者應用進行處理,處理完畢後應用出問題了,消息中間件不知道消息處理結果,會再次投遞。
l 消息被投遞到消息接收者應用進行處理,處理完畢後網絡出現問題了,消息中間件沒有收到消息處理結果,會再次投遞。
l 消息被投遞到消息接收者應用進行處理,處理時間比較長,消息中間件因為消息逾時會再次投遞。
l 消息被投遞到消息接收者應用進行處理,處理完畢後消息中間件出問題了,沒能收到消息結果并處理,會再次投遞
l 消息被投遞到消息接收者應用進行處理,處理完畢後消息中間件收到結果但是遇到消息存儲故障,沒能更新投遞狀态,會再次投遞。
可以看到,在投遞過程中産生的消息重複接收主要是因為消息接收者成功處理完消息後,消息中間件不能及時更新投遞狀态造成的。
如何解決重複消費
那麼有什麼辦法可以解決呢?主要是要求消息接收者來處理這種重複的情況,也就是要求消息接收者的消息處理是幂等操作。
什麼是幂等性?
對于消息接收端的情況,幂等的含義是采用同樣的輸入多次調用處理函數,得到同樣的結果。例如,一個SQL操作
update stat_table set count= 10 where id =1
這個操作多次執行,id等于1的記錄中的 count字段的值都為10,這個操作就是幂等的,我們不用擔心這個操作被重複。
再來看另外一個SQL操作
update stat_table set count= count +1 where id= 1;
這樣的SQL操作就不是幂等的,一旦重複,結果就會産生變化。
常見辦法
是以應對消息重複的辦法是,使消息接收端的處理是一個幂等操作。這樣的做法降低了消息中間件的整體複雜性,不過也給使用消息中間件的消息接收端應用帶來了一定的限制和門檻。
1. MVCC:
多版本并發控制,樂觀鎖的一種實作,在生産者發送消息時進行資料更新時需要帶上資料的版本号,消費者去更新時需要去比較持有資料的版本号,版本号不一緻的操作無法成功。例如部落格點贊次數自動+1的接口:
public boolean addCount(Long id, Long version);
update blogTable set count= count+1,version=version+1 where id=321 and version=123
每一個version隻有一次執行成功的機會,一旦失敗了生産者必須重新擷取資料的最新版本号再次發起更新。
2. 去重表:
利用資料庫表單的特性來實作幂等,常用的一個思路是在表上建構唯一性索引,保證某一類資料一旦執行完畢,後續同樣的請求不再重複處理了(利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那麼就不再處理這條消息。)
以電商平台為例子,電商平台上的訂單id就是最适合的token。當使用者下單時,會經曆多個環節,比如生成訂單,減庫存,減優惠券等等。每一個環節執行時都先檢測一下該訂單id是否已經執行過這一步驟,對未執行的請求,執行操作并緩存結果,而對已經執行過的id,則直接傳回之前的執行結果,不做任何操作。這樣可以在最大程度上避免操作的重複執行問題,緩存起來的執行結果也能用于事務的控制等。
消息的可靠性傳輸
ActiveMQ
要保證消息的可靠性,除了消息的持久化,還包括兩個方面,一是生産者發送的消息可以被ActiveMQ收到,二是消費者收到了ActiveMQ發送的消息。
生産者
非持久化又不在事務中的消息,可能會有消息的丢失。為保證消息可以被ActiveMQ收到,我們應該采用事務消息或持久化消息。
消費者
對消息的确認有4種機制
1、 AUTO_ACKNOWLEDGE = 1 自動确認
2、 CLIENT_ACKNOWLEDGE = 2 用戶端手動确認
3、 DUPS_OK_ACKNOWLEDGE = 3 自動批量确認
4、 SESSION_TRANSACTED = 0 事務送出并确認
ACK_MODE描述了Consumer與broker确認消息的方式(時機),比如當消息被Consumer接收之後,Consumer将在何時确認消息。是以ack_mode描述的不是producer于broker之間的關系,而是customer于broker之間的關系。
對于broker而言,隻有接收到ACK指令,才會認為消息被正确的接收或者處理成功了,通過ACK,可以在consumer與Broker之間建立一種簡單的“擔保”機制.
AUTO_ACKNOWLEDGE
自動确認
“同步”(receive)方法傳回message給消息時會立即确認。
在"異步"(messageListener)方式中,将會首先調用listener.onMessage(message),如果onMessage方法正常結束,消息将會正常确認。如果onMessage方法異常,将導緻消費者要求ActiveMQ重發消息。
CLIENT_ACKNOWLEDGE :
用戶端手動确認,這就意味着AcitveMQ将不會“自作主張”的為你ACK任何消息,開發者需要自己擇機确認。
我們可以在目前消息處理成功之後,立即調用message.acknowledge()方法來"逐個"确認消息,這樣可以盡可能的減少因網絡故障而導緻消息重發的個數;當然也可以處理多條消息之後,間歇性的調用acknowledge方法來一次确認多條消息,減少ack的次數來提升consumer的效率,不過需要自行權衡。
DUPS_OK_ACKNOWLEDGE
類似于AUTO_ACK确認機制,為自動批量确認而生,而且具有“延遲”确認的特點,ActiveMQ會根據内部算法,在收到一定數量的消息自動進行确認。在此模式下,可能會出現重複消息,什麼時候?當consumer故障重新開機後,那些尚未ACK的消息會重新發送過來。
SESSION_TRANSACTED
當session使用事務時,就是使用此模式。當決定事務中的消息可以确認時,必須調用session.commit()方法,commit方法将會導緻目前session的事務中所有消息立即被确認。在事務開始之後的任何時機調用rollback(),意味着目前事務的結束,事務中所有的消息都将被重發。當然在commit之前抛出異常,也會導緻事務的rollback。
RabbitMQ
(1)生産者弄丢了資料
生産者将資料發送到RabbitMQ的時候,可能資料就在半路給搞丢了,因為網絡啥的問題,都有可能。此時可以選擇用RabbitMQ提供的事務功能,就是生産者發送資料之前開啟RabbitMQ事務(channel.txSelect),然後發送消息,如果消息沒有成功被RabbitMQ接收到,那麼生産者會收到異常報錯,此時就可以復原事務(channel.txRollback),然後重試發送消息;如果收到了消息,那麼可以送出事務(channel.txCommit)。但是問題是,RabbitMQ事務機制一搞,基本上吞吐量會下來,因為太耗性能。
是以一般來說,如果要確定RabbitMQ的消息别丢,可以開啟confirm模式,在生産者那裡設定開啟confirm模式之後,你每次寫的消息都會配置設定一個唯一的id,然後如果寫入了RabbitMQ中,RabbitMQ會給你回傳一個ack消息,告訴你說這個消息ok了。如果RabbitMQ沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在記憶體裡維護每個消息id的狀态,如果超過一定時間還沒接收到這個消息的回調,那麼你可以重發。
事務機制和cnofirm機制最大的不同在于,事務機制是同步的,你送出一個事務之後會阻塞在那兒,但是confirm機制是異步的,你發送個消息之後就可以發送下一個消息,然後那個消息RabbitMQ接收了之後會異步回調你一個接口通知你這個消息接收到了。
是以一般在生産者這塊避免資料丢失,都是用confirm機制的。
(2)RabbitMQ弄丢了資料
就是RabbitMQ自己弄丢了資料,這個你必須開啟RabbitMQ的持久化,就是消息寫入之後會持久化到磁盤,哪怕是RabbitMQ自己挂了,恢複之後會自動讀取之前存儲的資料,一般資料不會丢。除非極其罕見的是,RabbitMQ還沒持久化,自己就挂了,可能導緻少量資料會丢失的,但是這個機率較小。
設定持久化有兩個步驟,第一個是建立queue和交換器的時候将其設定為持久化的,這樣就可以保證RabbitMQ持久化相關的中繼資料,但是不會持久化queue裡的資料;第二個是發送消息的時候将消息的deliveryMode設定為2,就是将消息設定為持久化的,此時RabbitMQ就會将消息持久化到磁盤上去。必須要同時設定這兩個持久化才行,RabbitMQ哪怕是挂了,再次重新開機,也會從磁盤上重新開機恢複queue,恢複這個queue裡的資料。
而且持久化可以跟生産者那邊的confirm機制配合起來,隻有消息被持久化到磁盤之後,才會通知生産者ack了,是以哪怕是在持久化到磁盤之前,RabbitMQ挂了,資料丢了,生産者收不到ack,你也是可以自己重發的。
哪怕是你給RabbitMQ開啟了持久化機制,也有一種可能,就是這個消息寫到了RabbitMQ中,但是還沒來得及持久化到磁盤上,結果不巧,此時RabbitMQ挂了,就會導緻記憶體裡的一點點資料會丢失。
(3)消費端弄丢了資料
RabbitMQ如果丢失了資料,主要是因為你消費的時候,剛消費到,還沒處理,結果程序挂了,比如重新開機了,那麼就尴尬了,RabbitMQ認為你都消費了,這資料就丢了。
這個時候得用RabbitMQ提供的ack機制,簡單來說,就是你關閉RabbitMQ自動ack,可以通過一個api來調用就行,然後每次你自己代碼裡確定處理完的時候,再程式裡ack一把。這樣的話,如果你還沒處理完,不就沒有ack?那RabbitMQ就認為你還沒處理完,這個時候RabbitMQ會把這個消費配置設定給别的consumer去處理,消息是不會丢的。
Kafka
(1)消費端弄丢了資料
唯一可能導緻消費者弄丢資料的情況,就是說,你那個消費到了這個消息,然後消費者那邊自動送出了offset,讓kafka以為你已經消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你自己就挂了,此時這條消息就丢咯。
大家都知道kafka會自動送出offset,那麼隻要關閉自動送出offset,在處理完之後自己手動送出offset,就可以保證資料不會丢。但是此時确實還是會重複消費,比如你剛處理完,還沒送出offset,結果自己挂了,此時肯定會重複消費一次,自己保證幂等性就好了。
生産環境碰到的一個問題,就是說我們的kafka消費者消費到了資料之後是寫到一個記憶體的queue裡先緩沖一下,結果有的時候,你剛把消息寫入記憶體queue,然後消費者會自動送出offset。
然後此時我們重新開機了系統,就會導緻記憶體queue裡還沒來得及處理的資料就丢失了
(2)kafka弄丢了資料
這塊比較常見的一個場景,就是kafka某個broker當機,然後重新選舉partiton的leader時。大家想想,要是此時其他的follower剛好還有些資料沒有同步,結果此時leader挂了,然後選舉某個follower成leader之後,他不就少了一些資料?這就丢了一些資料啊。
是以此時一般是要求起碼設定如下4個參數:
給這個topic設定replication.factor參數:這個值必須大于1,要求每個partition必須有至少2個副本。
在kafka服務端設定min.insync.replicas參數:這個值必須大于1,這個是要求一個leader至少感覺到有至少一個follower還跟自己保持聯系,沒掉隊,這樣才能確定leader挂了還有一個follower吧。
在producer端設定acks=all:這個是要求每條資料,必須是寫入所有replica之後,才能認為是寫成功了。
在producer端設定retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裡了。
(3)生産者會不會弄丢資料
如果按照上述的思路設定了ack=all,一定不會丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之後,才認為本次寫成功了。如果沒滿足這個條件,生産者會自動不斷的重試,重試無限次。
消息的順序性
從根本上說,異步消息是不應該有順序依賴的。在MQ上估計是沒法解決。要實作嚴格的順序消息,簡單且可行的辦法就是:保證生産者 - MQServer - 消費者是一對一對一的關系。
ActiveMQ
1、通過進階特性consumer獨有消費者(exclusive consumer)
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
當在接收資訊的時候,有多個獨占消費者的時候,隻有一個獨占消費者可以接收到消息。
獨占消息就是在有多個消費者同時消費一個queue時,可以保證隻有一個消費者可以消費消息,這樣雖然保證了消息的順序問題,不過也帶來了一個問題,就是這個queue的所有消息将隻會在這一個主消費者上消費,其他消費者将閑置,達不到負載均衡配置設定,而實際業務我們可能更多的是這樣的場景,比如一個訂單會發出一組順序消息,我們隻要求這一組消息是順序消費的,而訂單與訂單之間又是可以并行消費的,不需要順序,因為順序也沒有任何意義,有沒有辦法做到呢?可以利用activemq的另一個進階特性之messageGroup
2、利用Activemq的進階特性:messageGroups
Message Groups特性是一種負載均衡的機制。在一個消息被分發到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那麼broker會檢查是否有某個consumer擁有這個message group。如果沒有,那麼broker會選擇一個consumer,并将它關聯到這個message group。此後,這個consumer會接收這個message group的所有消息,直到:Consumer被關閉。Message group被關閉,通過發送一個消息,并設定這個消息的JMSXGroupSeq為-1
bytesMessage.setStringProperty("JMSXGroupID", "constact-20100000002");
bytesMessage.setIntProperty("JMSXGroupSeq", -1);
如上圖所示,同一個queue中,擁有相同JMSXGroupID的消息将發往同一個消費者,解決順序問題,不同分組的消息又能被其他消費者并行消費,解決負載均衡的問題。
RabbitMQ
如果有順序依賴的消息,要保證消息有一個hashKey,類似于資料庫表分區的的分區key列。保證對同一個key的消息發送到相同的隊列。A使用者産生的消息(包括建立消息和删除消息)都按A的hashKey分發到同一個隊列。隻需要把強相關的兩條消息基于相同的路由就行了,也就是說經過m1和m2的在路由表裡的路由是一樣的,那自然m1會優先于m2去投遞。而且一個queue隻對應一個consumer。
Kafka
一個topic,一個partition,一個consumer,内部單線程消費
如何解決消息隊列的延時以及過期失效問題?
rabbitmq,rabbitmq是可以設定過期時間的,就是TTL,如果消息在queue中積壓超過一定的時間,而又沒有設定死信隊列機制,就會被rabbitmq給清理掉,這個資料就沒了。
ActiveMQ則通過更改配置,支援消息的定時發送。
有幾百萬消息持續積壓幾小時怎麼解決?
發生了線上故障,幾千萬條資料在MQ裡積壓很久。是修複consumer的問題,讓他恢複消費速度,然後等待幾個小時消費完畢?這是個解決方案。不過有時候我們還會進行臨時緊急擴容。
一個消費者一秒是1000條,一秒3個消費者是3000條,一分鐘是18萬條。1000多萬條,是以如果積壓了幾百萬到上千萬的資料,即使消費者恢複了,也需要大概1小時的時間才能恢複過來。
一般這個時候,隻能操作臨時緊急擴容了,具體操作步驟和思路如下:
先修複consumer的問題,確定其恢複消費速度,然後将現有cnosumer都停掉。
建立一個topic,partition是原來的10倍,臨時建立好原先10倍或者20倍的queue數量。然後寫一個臨時的分發資料的consumer程式,這個程式部署上去消費積壓的資料,消費之後不做耗時的處理,直接均勻輪詢寫入臨時建立好的10倍數量的queue。
接着臨時征用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的資料。
這種做法相當于是臨時将queue資源和consumer資源擴大10倍,以正常的10倍速度來消費資料。
等快速消費完積壓資料之後,再恢複原先部署架構,重新用原先的consumer機器來消費消息。
Kafka是如何實作高性能的?
宏觀架構層面利用Partition實作并行處理
Kafka中每個Topic都包含一個或多個Partition,不同Partition可位于不同節點。同時Partition在實體上對應一個本地檔案夾,每個Partition包含一個或多個Segment,每個Segment包含一個資料檔案和一個與之對應的索引檔案。在邏輯上,可以把一個Partition當作一個非常長的數組,可通過這個“數組”的索引(offset)去通路其資料。
一方面,由于不同Partition可位于不同機器,是以可以充分利用叢集優勢,實作機器間的并行處理。另一方面,由于Partition在實體上對應一個檔案夾,即使多個Partition位于同一個節點,也可通過配置讓同一節點上的不同Partition置于不同的disk drive上,進而實作磁盤間的并行處理,充分發揮多磁盤的優勢。
利用多磁盤的具體方法是,将不同磁盤mount到不同目錄,然後在server.properties中,将log.dirs設定為多目錄(用逗号分隔)。Kafka會自動将所有Partition盡可能均勻配置設定到不同目錄也即不同目錄(也即不同disk)上。
Partition是最小并發粒度,Partition個數決定了可能的最大并行度。。
ISR實作可用性與資料一緻性的動态平衡
常用資料複制及一緻性方案
Master-Slave
- RDBMS的讀寫分離即為典型的Master-Slave方案
- 同步複制可保證強一緻性但會影響可用性
- 異步複制可提供高可用性但會降低一緻性
WNR
- 主要用于去中心化的分布式系統中。
- N代表總副本數,W代表每次寫操作要保證的最少寫成功的副本數,R代表每次讀至少要讀取的副本數
- 當W+R>N時,可保證每次讀取的資料至少有一個副本擁有最新的資料
- 多個寫操作的順序難以保證,可能導緻多副本間的寫操作順序不一緻。Dynamo通過向量時鐘保證最終一緻性
Paxos及其變種
- Google的Chubby,Zookeeper的原子廣播協定(Zab),RAFT等
基于ISR的資料複制方案
Kafka的資料複制是以Partition為機關的。而多個備份間的資料複制,通過Follower向Leader拉取資料完成。從一這點來講,Kafka的資料複制方案接近于上文所講的Master-Slave方案。不同的是,Kafka既不是完全的同步複制,也不是完全的異步複制,而是基于ISR的動态複制方案。
ISR,也即In-sync Replica。每個Partition的Leader都會維護這樣一個清單,該清單中,包含了所有與之同步的Replica(包含Leader自己)。每次資料寫入時,隻有ISR中的所有Replica都複制完,Leader才會将其置為Commit,它才能被Consumer所消費。
這種方案,與同步複制非常接近。但不同的是,這個ISR是由Leader動态維護的。如果Follower不能緊“跟上”Leader,它将被Leader從ISR中移除,待它又重新“跟上”Leader後,會被Leader再次加加ISR中。每次改變ISR後,Leader都會将最新的ISR持久化到Zookeeper中。
由于Leader可移除不能及時與之同步的Follower,故與同步複制相比可避免最慢的Follower拖慢整體速度,也即ISR提高了系統可用性。
ISR中的所有Follower都包含了所有Commit過的消息,而隻有Commit過的消息才會被Consumer消費,故從Consumer的角度而言,ISR中的所有Replica都始終處于同步狀态,進而與異步複制方案相比提高了資料一緻性。
ISR可動态調整,極限情況下,可以隻包含Leader,極大提高了可容忍的當機的Follower的數量。與Majority Quorum方案相比,容忍相同個數的節點失敗,所要求的總節點數少了近一半。
具體實作層面高效使用磁盤特性和作業系統特性
将寫磁盤的過程變為順序寫
Kafka的整個設計中,Partition相當于一個非常長的數組,而Broker接收到的所有消息順序寫入這個大數組中。同時Consumer通過Offset順序消費這些資料,并且不删除已經消費的資料,進而避免了随機寫磁盤的過程。
由于磁盤有限,不可能儲存所有資料,實際上作為消息系統Kafka也沒必要儲存所有資料,需要删除舊的資料。而這個删除過程,并非通過使用“讀-寫”模式去修改檔案,而是将Partition分為多個Segment,每個Segment對應一個實體檔案,通過删除整個檔案的方式去删除Partition内的資料。這種方式清除舊資料的方式,也避免了對檔案的随機寫操作。
在存儲機制上,使用了Log Structured Merge Trees(LSM) 。
注:Log Structured Merge Trees(LSM),谷歌 “BigTable” 的論文,中提出,LSM是目前被用在許多産品的檔案結構政策:HBase, Cassandra, LevelDB, SQLite,Kafka。LSM被設計來提供比傳統的B+樹或者ISAM更好的寫操作吞吐量,通過消去随機的本地更新操作來達到這個目标。這個問題的本質還是磁盤随機操作慢,順序讀寫快。這二種操作存在巨大的差距,無論是磁盤還是SSD,而且快至少三個數量級。
充分利用Page Cache
使用Page Cache的好處如下
- I/O Scheduler會将連續的小塊寫組裝成大塊的實體寫進而提高性能
- I/O Scheduler會嘗試将一些寫操作重新按順序排好,進而減少磁盤頭的移動時間
- 充分利用所有空閑記憶體(非JVM記憶體)。如果使用應用層Cache(即JVM堆記憶體),會增加GC負擔
- 讀操作可直接在Page Cache内進行。如果消費和生産速度相當,甚至不需要通過實體磁盤(直接通過Page Cache)交換資料
- 如果程序重新開機,JVM内的Cache會失效,但Page Cache仍然可用
Broker收到資料後,寫磁盤時隻是将資料寫入Page Cache,并不保證資料一定完全寫入磁盤。從這一點看,可能會造成機器當機時,Page Cache内的資料未寫入磁盤進而造成資料丢失。但是這種丢失隻發生在機器斷電等造成作業系統不工作的場景,而這種場景完全可以由Kafka層面的Replication機制去解決。如果為了保證這種情況下資料不丢失而強制将Page Cache中的資料Flush到磁盤,反而會降低性能。也正因如此,Kafka雖然提供了flush.messages和flush.ms兩個參數将Page Cache中的資料強制Flush到磁盤,但是Kafka并不建議使用。
如果資料消費速度與生産速度相當,甚至不需要通過實體磁盤交換資料,而是直接通過Page Cache交換資料。同時,Follower從Leader Fetch資料時,也可通過Page Cache完成。
注:Page Cache,又稱pcache,其中文名稱為頁高速緩沖存儲器,簡稱頁高緩。page cache的大小為一頁,通常為4K。在linux讀寫檔案時,它用于緩存檔案的邏輯内容,進而加快對磁盤上映像和資料的通路。 是Linux作業系統的一個特色。
支援多Disk Drive
Broker的log.dirs配置項,允許配置多個檔案夾。如果機器上有多個Disk Drive,可将不同的Disk挂載到不同的目錄,然後将這些目錄都配置到log.dirs裡。Kafka會盡可能将不同的Partition配置設定到不同的目錄,也即不同的Disk上,進而充分利用了多Disk的優勢。
零拷貝
Kafka中存在大量的網絡資料持久化到磁盤(Producer到Broker)和磁盤檔案通過網絡發送(Broker到Consumer)的過程。這一過程的性能直接影響Kafka的整體吞吐量。
傳統模式下的四次拷貝與四次上下文切換
以将磁盤檔案通過網絡發送為例。傳統模式下,一般使用如下僞代碼所示的方法先将檔案資料讀入記憶體,然後通過Socket将記憶體中的資料發送出去。
buffer = File.readSocket.send(buffer)
這一過程實際上發生了四次資料拷貝。首先通過系統調用将檔案資料讀入到核心态Buffer(DMA拷貝),然後應用程式将記憶體态Buffer資料讀入到使用者态Buffer(CPU拷貝),接着使用者程式通過Socket發送資料時将使用者态Buffer資料拷貝到核心态Buffer(CPU拷貝),最後通過DMA拷貝将資料拷貝到NIC Buffer。同時,還伴随着四次上下文切換。
而Linux 2.4+核心通過sendfile系統調用,提供了零拷貝。資料通過DMA拷貝到核心态Buffer後,直接通過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減少資料拷貝外,因為整個讀檔案-網絡發送由一個sendfile調用完成,整個過程隻有兩次上下文切換,是以大大提高了性能。
從具體實作來看,Kafka的資料傳輸通過Java NIO的FileChannel的transferTo和transferFrom方法實作零拷貝。
注: transferTo和transferFrom并不保證一定能使用零拷貝。實際上是否能使用零拷貝與作業系統相關,如果作業系統提供sendfile這樣的零拷貝系統調用,則這兩個方法會通過這樣的系統調用充分利用零拷貝的優勢,否則并不能通過這兩個方法本身實作零拷貝。
減少網絡開銷批處理
批處理是一種常用的用于提高I/O性能的方式。對Kafka而言,批處理既減少了網絡傳輸的Overhead,又提高了寫磁盤的效率。
Kafka 的send方法并非立即将消息發送出去,而是通過batch.size和linger.ms控制實際發送頻率,進而實作批量發送。
由于每次網絡傳輸,除了傳輸消息本身以外,還要傳輸非常多的網絡協定本身的一些内容(稱為Overhead),是以将多條消息合并到一起傳輸,可有效減少網絡傳輸的Overhead,進而提高了傳輸效率。
資料壓縮降低網絡負載
Kafka從0.7開始,即支援将資料壓縮後再傳輸給Broker。除了可以将每條消息單獨壓縮然後傳輸外,Kafka還支援在批量發送時,将整個Batch的消息一起壓縮後傳輸。資料壓縮的一個基本原理是,重複資料越多壓縮效果越好。是以将整個Batch的資料一起壓縮能更大幅度減小資料量,進而更大程度提高網絡傳輸效率。
Broker接收消息後,并不直接解壓縮,而是直接将消息以壓縮後的形式持久化到磁盤。Consumer Fetch到資料後再解壓縮。是以Kafka的壓縮不僅減少了Producer到Broker的網絡傳輸負載,同時也降低了Broker磁盤操作的負載,也降低了Consumer與Broker間的網絡傳輸量,進而極大得提高了傳輸效率,提高了吞吐量。
高效的序列化方式
Kafka消息的Key和Payload(或者說Value)的類型可自定義,隻需同時提供相應的序列化器和反序列化器即可。是以使用者可以通過使用快速且緊湊的序列化-反序列化方式(如Avro,Protocal Buffer)來減少實際網絡傳輸和磁盤存儲的資料規模,進而提高吞吐率。這裡要注意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。