我的上家公司是做餐飲系統的,每天中午和晚上用餐高峰期,系統的并發量不容小觑。為了保險起見,公司規定各部門都要在吃飯的時間輪流值班,防止出現線上問題時能夠及時處理。
我當時在後廚顯示系統團隊,該系統屬于訂單的下遊業務。使用者點完菜下單後,訂單系統會通過發kafka消息給我們系統,系統讀取消息後,做業務邏輯處理,持久化訂單和菜品資料,然後展示到劃菜用戶端。這樣廚師就知道哪個訂單要做哪些菜,有些菜做好了,就可以通過該系統出菜。系統自動通知服務員上菜,如果服務員上完菜,修改菜品上菜狀态,使用者就知道哪些菜已經上了,哪些還沒有上。這個系統可以大大提高後廚到使用者的效率。

事實證明,這一切的關鍵是消息中間件:kafka,如果它有問題,将會直接影響到後廚顯示系統的功能。
接下來,我跟大家一起聊聊使用kafka兩年時間踩過哪些坑?
最近無意間獲得一份BAT大廠大佬寫的刷題筆記,一下子打通了我的任督二脈,越來越覺得算法沒有想象中那麼難了。
BAT大佬寫的刷題筆記,讓我offer拿到手軟
剛開始我們系統的商戶很少,為了快速實作功能,我們沒想太多。既然是走消息中間件kafka通信,訂單系統發消息時将訂單詳細資料放在消息體,我們後廚顯示系統隻要訂閱topic,就能擷取相關消息資料,然後處理自己的業務即可。
不過這套方案有個關鍵因素:要保證消息的順序。
為什麼呢?
訂單有很多狀态,比如:下單、支付、完成、撤銷等,不可能下單的消息都沒讀取到,就先讀取支付或撤銷的消息吧,如果真的這樣,資料不是會産生錯亂?
好吧,看來保證消息順序是有必要的。
我們都知道kafka的topic是無序的,但是一個topic包含多個partition,每個partition内部是有序的。
如此一來,思路就變得清晰了:隻要保證生産者寫消息時,按照一定的規則寫到同一個partition,不同的消費者讀不同的partition的消息,就能保證生産和消費者消息的順序。
我們剛開始就是這麼做的,同一個商戶編号的消息寫到同一個partition,topic中建立了4個partition,然後部署了4個消費者節點,構成消費者組,一個partition對應一個消費者節點。從理論上說,這套方案是能夠保證消息順序的。
一切規劃得看似“天衣無縫”,我們就這樣”順利“上線了。
該功能上線了一段時間,剛開始還是比較正常的。
但是,好景不長,很快就收到使用者投訴,說在劃菜用戶端有些訂單和菜品一直看不到,無法劃菜。
我定位到了原因,公司在那段時間網絡經常不穩定,業務接口時不時報逾時,業務請求時不時會連不上資料庫。
這種情況對順序消息的打擊,可以說是毀滅性的。
為什麼這麼說?
假設訂單系統發了:”下單“、”支付“、”完成“ 三條消息。
而”下單“消息由于網絡原因我們系統處理失敗了,而後面的兩條消息的資料是無法入庫的,因為隻有”下單“消息的資料才是完整的資料,其他類型的消息隻會更新狀态。
加上,我們當時沒有做失敗重試機制,使得這個問題被放大了。問題變成:一旦”下單“消息的資料入庫失敗,使用者就永遠看不到這個訂單和菜品了。
那麼這個緊急的問題要如何解決呢?
最開始我們的想法是:在消費者處理消息時,如果處理失敗了,立馬重試3-5次。但如果有些請求要第6次才能成功怎麼辦?不可能一直重試呀,這種同步重試機制,會阻塞其他商戶訂單消息的讀取。
顯然用上面的這種同步重試機制在出現異常的情況,會嚴重影響消息消費者的消費速度,降低它的吞吐量。
如此看來,我們不得不用異步重試機制了。
如果用異步重試機制,處理失敗的消息就得儲存到重試表下來。
但有個新問題立馬出現:隻存一條消息如何保證順序?
存一條消息的确無法保證順序,假如:”下單“消息失敗了,還沒來得及異步重試。此時,”支付“消息被消費了,它肯定是不能被正常消費的。
此時,”支付“消息該一直等着,每隔一段時間判斷一次,它前面的消息都有沒有被消費?
如果真的這麼做,會出現兩個問題:
”支付“消息前面隻有”下單“消息,這種情況比較簡單。但如果某種類型的消息,前面有N多種消息,需要判斷多少次呀,這種判斷跟訂單系統的耦合性太強了,相當于要把他們系統的邏輯搬一部分到我們系統。
影響消費者的消費速度
這時有種更簡單的方案浮出水面:消費者在處理消息時,先判斷該訂單号在重試表有沒有資料,如果有則直接把目前消息儲存到重試表。如果沒有,則進行業務處理,如果出現異常,把該消息儲存到重試表。
後來我們用elastic-job建立了失敗重試機制,如果重試了7次後還是失敗,則将該消息的狀态标記為失敗,發郵件通知開發人員。
終于由于網絡不穩定,導緻使用者在劃菜用戶端有些訂單和菜品一直看不到的問題被解決了。現在商戶頂多偶爾延遲看到菜品,比一直看不菜品好太多。
随着銷售團隊的市場推廣,我們系統的商戶越來越多。随之而來的是消息的數量越來越大,導緻消費者處理不過來,經常出現消息積壓的情況。對商戶的影響非常直覺,劃菜用戶端上的訂單和菜品可能半個小時後才能看到。一兩分鐘還能忍,半個消息的延遲,對有些暴脾氣的商戶哪裡忍得了,馬上投訴過來了。我們那段時間經常接到商戶投訴說訂單和菜品有延遲。
雖說,加伺服器節點就能解決問題,但是按照公司為了省錢的慣例,要先做系統優化,是以我們開始了消息積壓問題解決之旅。
雖說kafka号稱支援百萬級的TPS,但從producer發送消息到broker需要一次網絡IO,broker寫資料到磁盤需要一次磁盤IO(寫操作),consumer從broker擷取消息先經過一次磁盤IO(讀操作),再經過一次網絡IO。
一次簡單的消息從生産到消費過程,需要經過2次網絡IO和2次磁盤IO。如果消息體過大,勢必會增加IO的耗時,進而影響kafka生産和消費的速度。消費者速度太慢的結果,就會出現消息積壓情況。
除了上面的問題之外,消息體過大,還會浪費伺服器的磁盤空間,稍不注意,可能會出現磁盤空間不足的情況。
此時,我們已經到了需要優化消息體過大問題的時候。
如何優化呢?
我們重新梳理了一下業務,沒有必要知道訂單的中間狀态,隻需知道一個最終狀态就可以了。
如此甚好,我們就可以這樣設計了:
訂單系統發送的消息體隻用包含:id和狀态等關鍵資訊。
後廚顯示系統消費消息後,通過id調用訂單系統的訂單詳情查詢接口擷取資料。
後廚顯示系統判斷資料庫中是否有該訂單的資料,如果沒有則入庫,有則更新。
果然這樣調整之後,消息積壓問題很長一段時間都沒再出現。
還真别高興的太早,有天中午又有商戶投訴說訂單和菜品有延遲。我們一查kafka的topic竟然又出現了消息積壓。
但這次有點詭異,不是所有partition上的消息都有積壓,而是隻有一個。
剛開始,我以為是消費那個partition消息的節點出了什麼問題導緻的。但是經過排查,沒有發現任何異常。
這就奇怪了,到底哪裡有問題呢?
後來,我查日志和資料庫發現,有幾個商戶的訂單量特别大,剛好這幾個商戶被分到同一個partition,使得該partition的消息量比其他partition要多很多。
這時我們才意識到,發消息時按商戶編号路由partition的規則不合理,可能會導緻有些partition消息太多,消費者處理不過來,而有些partition卻因為消息太少,消費者出現空閑的情況。
為了避免出現這種配置設定不均勻的情況,我們需要對發消息的路由規則做一下調整。
我們思考了一下,用訂單号做路由相對更均勻,不會出現單個訂單發消息次數特别多的情況。除非是遇到某個人一直加菜的情況,但是加菜是需要花錢的,是以其實同一個訂單的消息數量并不多。
調整後按訂單号路由到不同的partition,同一個訂單号的消息,每次到發到同一個partition。
調整後,消息積壓的問題又有很長一段時間都沒有再出現。我們的商戶數量在這段時間,增長的非常快,越來越多了。
在高并發的場景中,消息積壓問題,可以說如影随形,真的沒辦法從根本上解決。表面上看,已經解決了,但後面不知道什麼時候,就會冒出一次,比如這次:
有天下午,産品過來說:有幾個商戶投訴過來了,他們說菜品有延遲,快查一下原因。
這次問題出現得有點奇怪。
首先這個時間點就有點奇怪,平常出問題,不都是中午或者晚上用餐高峰期嗎?怎麼這次問題出現在下午?
根據以往積累的經驗,我直接看了kafka的topic的資料,果然上面消息有積壓,但這次每個partition都積壓了十幾萬的消息沒有消費,比以往加壓的消息數量增加了幾百倍。這次消息積壓得極不尋常。
我趕緊查服務監控看看消費者挂了沒,還好沒挂。又查服務日志沒有發現異常。這時我有點迷茫,碰運氣問了問訂單組下午發生了什麼事情沒?他們說下午有個促銷活動,跑了一個JOB批量更新過有些商戶的訂單資訊。
這時,我一下子如夢初醒,是他們在JOB中批量發消息導緻的問題。怎麼沒有通知我們呢?實在太坑了。
雖說知道問題的原因了,倒是眼前積壓的這十幾萬的消息該如何處理呢?
此時,如果直接調大partition數量是不行的,曆史消息已經存儲到4個固定的partition,隻有新增的消息才會到新的partition。我們重點需要處理的是已有的partition。
直接加服務節點也不行,因為kafka允許同組的多個partition被一個consumer消費,但不允許一個partition被同組的多個consumer消費,可能會造成資源浪費。
看來隻有用多線程處理了。
為了緊急解決問題,我改成了用線程池處理消息,核心線程和最大線程數都配置成了50。
調整之後,果然,消息積壓數量不斷減少。
但此時有個更嚴重的問題出現:我收到了報警郵件,有兩個訂單系統的節點down機了。
不久,訂單組的同僚過來找我說,我們系統調用他們訂單查詢接口的并發量突增,超過了預計的好幾倍,導緻有2個服務節點挂了。他們把查詢功能單獨整成了一個服務,部署了6個節點,挂了2個節點,再不處理,另外4個節點也會挂。訂單服務可以說是公司最核心的服務,它挂了公司損失會很大,情況萬分緊急。
為了解決這個問題,隻能先把線程數調小。
幸好,線程數是可以通過zookeeper動态調整的,我把核心線程數調成了8個,核心線程數改成了10個。
後面,運維把訂單服務挂的2個節點重新開機後恢複正常了,以防萬一,再多加了2個節點。為了確定訂單服務不會出現問題,就保持目前的消費速度,後廚顯示系統的消息積壓問題,1小時候後也恢複正常了。
後來,我們開了一次複盤會,得出的結論是:
訂單系統的批量操作一定提前通知下遊系統團隊。
下遊系統團隊多線程調用訂單查詢接口一定要做壓測。
這次給訂單查詢服務敲響了警鐘,它作為公司的核心服務,應4. 對高并發場景做的不夠好,需要做優化。
對消息積壓情況加監控。
順便說一下,對于要求嚴格保證消息順序的場景,可以将線程池改成多個隊列,每個隊列用單線程處理。
為了防止後面再次出現消息積壓問題,消費者後面就一直用多線程處理消息。
但有天中午我們還是收到很多報警郵件,提醒我們kafka的topic消息有積壓。我們正在查原因,此時産品跑過來說:又有商戶投訴說菜品有延遲,趕緊看看。這次她看起來有些不耐煩,确實優化了很多次,還是出現了同樣的問題。
在外行看來:為什麼同一個問題一直解決不了?
其實技術心裡的苦他們是不知道的。
表面上問題的症狀是一樣的,都是出現了菜品延遲,他們知道的是因為消息積壓導緻的。但是他們不知道深層次的原因,導緻消息積壓的原因其實有很多種。這也許是使用消息中間件的通病吧。
我沉默不語,隻能硬着頭皮定位原因了。
後來我查日志發現消費者消費一條消息的耗時長達2秒。以前是500毫秒,現在怎麼會變成2秒呢?
奇怪了,消費者的代碼也沒有做大的調整,為什麼會出現這種情況呢?
查了一下線上菜品表,單表資料量竟然到了幾千萬,其他的劃菜表也是一樣,現在單表儲存的資料太多了。
我們組梳理了一下業務,其實菜品在用戶端隻展示最近3天的即可。
這就好辦了,我們服務端存着多餘的資料,不如把表中多餘的資料歸檔。于是,DBA幫我們把資料做了歸檔,隻保留最近7天的資料。
如此調整後,消息積壓問題被解決了,又恢複了往日的平靜。
别高興得太早了,還有其他的問題,比如:報警郵件經常報出資料庫異常: <code>Duplicate entry '6' for key 'PRIMARY'</code>,說主鍵沖突。
出現這種問題一般是由于有兩個以上相同主鍵的sql,同時插入資料,第一個插入成功後,第二個插入的時候會報主鍵沖突。表的主鍵是唯一的,不允許重複。
我仔細檢查了代碼,發現代碼邏輯會先根據主鍵從表中查詢訂單是否存在,如果存在則更新狀态,不存在才插入資料,沒得問題。
這種判斷在并發量不大時,是有用的。但是如果在高并發的場景下,兩個請求同一時刻都查到訂單不存在,一個請求先插入資料,另一個請求再插入資料時就會出現主鍵沖突的異常。
解決這個問題最正常的做法是:<code>加鎖</code>。
我剛開始也是這樣想的,加資料庫悲觀鎖肯定是不行的,太影響性能。加資料庫樂觀鎖,基于版本号判斷,一般用于更新操作,像這種插入操作基本上不會用。
剩下的隻能用分布式鎖了,我們系統在用redis,可以加基于redis的分布式鎖,鎖定訂單号。
但後面仔細思考了一下:
加分布式鎖也可能會影響消費者的消息處理速度。
消費者依賴于redis,如果redis出現網絡逾時,我們的服務就悲劇了。
是以,我也不打算用分布式鎖。
而是選擇使用mysql的<code>INSERT INTO ...ON DUPLICATE KEY UPDATE</code>文法:
它會先嘗試把資料插入表,如果主鍵沖突的話那麼更新字段。
把以前的insert語句改造之後,就沒再出現過主鍵沖突問題。
不久之後的某天,又收到商戶投訴說下單後,在劃菜用戶端上看得到訂單,但是看到的菜品不全,有時甚至訂單和菜品資料都看不到。
這個問題跟以往的都不一樣,根據以往的經驗先看kafka的topic中消息有沒有積壓,但這次并沒有積壓。
再查了服務日志,發現訂單系統接口傳回的資料有些為空,有些隻傳回了訂單資料,沒傳回菜品資料。
這就非常奇怪了,我直接過去找訂單組的同僚。他們仔細排查服務,沒有發現問題。這時我們不約而同的想到,會不會是資料庫出問題了,一起去找DBA。果然,DBA發現資料庫的主庫同步資料到從庫,由于網絡原因偶爾有延遲,有時延遲有3秒。
如果我們的業務流程從發消息到消費消息耗時小于3秒,調用訂單詳情查詢接口時,可能會查不到資料,或者查到的不是最新的資料。
這個問題非常嚴重,會導緻直接我們的資料錯誤。
為了解決這個問題,我們也加了重試機制。調用接口查詢資料時,如果傳回資料為空,或者隻傳回了訂單沒有菜品,則加入重試表。
調整後,商戶投訴的問題被解決了。
kafka消費消息時支援三種模式:
at most once模式 最多一次。保證每一條消息commit成功之後,再進行消費處理。消息可能會丢失,但不會重複。
at least once模式 至少一次。保證每一條消息處理成功之後,再進行commit。消息不會丢失,但可能會重複。
exactly once模式 精确傳遞一次。将offset作為唯一id與消息同時處理,并且保證處理的原子性。消息隻會處理一次,不丢失也不會重複。但這種方式很難做到。
kafka預設的模式是<code>at least once</code>,但這種模式可能會産生重複消費的問題,是以我們的業務邏輯必須做幂等設計。
而我們的業務場景儲存資料時使用了INSERT INTO ...ON DUPLICATE KEY UPDATE文法,不存在時插入,存在時更新,是天然支援幂等性的。
我們當時線上環境分為:pre(預釋出環境) 和 prod(生産環境),兩個環境共用同一個資料庫,并且共用同一個kafka叢集。
需要注意的是,在配置kafka的topic的時候,要加字首用于區分不同環境。pre環境的以pre_開頭,比如:pre_order,生産環境以prod_開頭,比如:prod_order,防止消息在不同環境中串了。
但有次運維在pre環境切換節點,配置topic的時候,配錯了,配成了prod的topic。剛好那天,我們有新功能上pre環境。結果悲劇了,prod的有些消息被pre環境的consumer消費了,而由于消息體做了調整,導緻pre環境的consumer處理消息一直失敗。
其結果是生産環境丢了部分消息。不過還好,最後生産環境消費者通過重置offset,重新讀取了那一部分消息解決了問題,沒有造成太大損失。
除了上述問題之外,我還遇到過:
kafka的consumer使用自動确認機制,導緻cpu使用率100%。
kafka叢集中的一個broker節點挂了,重新開機後又一直挂。
這兩個問題說起來有些複雜,我就不一一列舉了,有興趣的朋友可以關注我的公衆号,加我的微信找我私聊。
非常感謝那兩年使用消息中間件kafka的經曆,雖說遇到過挺多問題,踩了很多坑,走了很多彎路,但是實打實的讓我積累了很多寶貴的經驗,快速成長了。
其實kafka是一個非常優秀的消息中間件,我所遇到的絕大多數問題,都并非kafka自身的問題(除了cpu使用率100%是它的一個bug導緻的之外)。