天天看點

高并發系統 隊列 減庫存 不知道誰依賴“我”的處理結果、不關心其他系統如何處理後續業務

聊聊高并發系統之隊列術

隊列在資料結構中是一種線性表,從一端插入資料,然後從另一端删除資料。本文目的不是講解各種隊列算法,而是在應用層面講述使用隊列能解決哪些場景問題。

在我開發過的系統中,不是所有的業務都必須實時處理、不是所有的請求都必須實時回報結果給使用者、不是所有的請求/處理都必須100%處理成功、不知道誰依賴“我”的處理結果、不關心其他系統如何處理後續業務、不需要強一緻性,隻需保證最終一緻性即可、想要保證資料處理的有序性;此時你應該考慮使用隊列來解決這些問題。在實際開發時我們經常使用隊列進行異步處理、系統解耦、資料同步、流量削峰、緩沖、限流等。

應用場景

異步處理:使用隊列的一個主要原因是進行異步處理,比如使用者注冊成功後需要發送注冊成功郵件/新使用者積分/優惠券等等、緩存過期時先傳回老的資料,然後異步更新緩存、異步寫日志等;通過異步處理,可以提升主流程響應速度,而非主流程/非重要業務可以異步集中處理,這樣還可以将任務聚合然後批量處理;是以可以使用消息隊列/任務隊列來進行異步處理。

系統解耦:比如使用者成功支付完成訂單後,需要通知生産配貨系統、發票系統、庫存系統、推薦系統、搜尋系統、風控系統等進行業務處理;而未來需要添加/支援哪些業務是不清楚的,而且這些業務處理不需要實時處理、不需要強一緻,隻需要最終一緻性即可,是以可以通過消息隊列/任務隊列進行系統解耦。

資料同步:比如想把Mysql變更的資料同步到Redis、或者将Mysql資料同步到Mongodb、或者機房間資料同步、或者主從資料同步等,此時可以考慮使用如databus、canal、otter。使用資料總線隊列進行資料同步的好處是可以保證資料修改的有序性。

流量削峰:系統瓶頸一般在資料庫上,比如扣減庫存、下單等;此時可以考慮使用隊列将變更請求暫時放入隊列,通過緩存+隊列暫存的方式将資料庫流量削峰;還有如秒殺系統,下單服務會是該系統的瓶頸,此時會使用隊列進行排隊和限流,進而保護下單服務。通過隊列暫存或者隊列限流來削峰。

比如減庫存,可以考慮這樣設計:

高并發系統 隊列 減庫存 不知道誰依賴“我”的處理結果、不關心其他系統如何處理後續業務

直接在Redis中扣減,然後記錄下扣減日志(FIFO隊列),通過Worker去同步到DB。

實際隊列的應用場景還是非常多的,本文列舉了筆者遇到過比較多的場景。

緩沖區隊列

典型的如Log4j的日志緩沖區,當我們使用log4j記錄日志時,可以配置位元組緩沖區,位元組緩存區滿時會立即同步到磁盤(flush操作)。Log4j使用BufferedWriter實作的;此模式不是異步寫,在緩沖區滿的時候還是會阻塞主線程。如果需要異步模式可以使用AsyncAppender,然後通過bufferSize控制日志事件緩沖區大小。

通過緩沖區隊列可以實作:批量處理、異步處理。

任務隊列

使用任務隊列将一些不需要與主線程同步執行的任務扔到任務隊列異步處理即可;筆者用的最多的是線程池任務隊列(預設LinkedBlockingQueue)和Disruptor任務隊列(RingBuffer)。如刷資料時,将任務扔到隊列異步處理即可,處理成功後再異步通知使用者;還有如删除SKU操作,使用者請求時直接将任務分解并扔到隊列,異步處理,處理成功後異步通知使用者即可;還有如查詢聚合,将多個可并行處理的任務扔到隊列然後等待最慢的一個傳回。如果使用的是記憶體任務隊列請記住可能存在系統重新開機等問題造成的資料丢失。

通過任務隊列可以實作:異步處理、任務分解/聚合處理。

注:JDK7提供了ExecutorService的新的實作ForkJoinPool,其提供了Work-stealing機制,可以更好地提升并發效率。

在使用Executors.newFixedThreadPool時,其沒有設定隊列大小(預設Integer.MAX_VALUE),如果有大量任務被緩存到LinkedBlockingQueue中等待線程執行,會出現GC慢等問題,造成系統響應慢甚至OOM。是以在使用線程池時候,要指定隊列大小并設定合理的RejectedExecutionHandler;要記錄請求來源的參數友善定位引發問題的源頭。

消息隊列

筆者所在公司使用的是自研的JMQ;開源的有ActiveMQ、Kafka、Redis。使用消息隊列存儲各業務資料,其他系統根據需要訂閱即可。常見的模式是:點對點(一個消息隻有一個消費者)、釋出訂閱(一個消息可以有多個消費者);而常用的是釋出訂閱模式。

比如使用者注冊成功、修改商品資料、訂單狀态變更等都應該将變更發送到消息隊列,進而其他系統根據需要訂閱該消息,然後按照自己的需求進行業務邏輯開發。

在添加新功能時,消息消費者隻需要訂閱該消息,然後開發相應的業務邏輯,消息生産者根本不關心你怎麼使用消息和你做什麼業務處理。

高并發系統 隊列 減庫存 不知道誰依賴“我”的處理結果、不關心其他系統如何處理後續業務

同步調用,添加什麼新功能都需要到使用者系統提需求。其中一個服務出現問題了,整個服務就不可用了。

高并發系統 隊列 減庫存 不知道誰依賴“我”的處理結果、不關心其他系統如何處理後續業務

消息隊列,使用者系統隻需要釋出使用者注冊成功的消息即可,相關系統訂閱該消息,然後執行相關的業務邏輯。相關服務出問題不影響到注冊主流程。

通過消息隊列可以實作:異步處理、系統解耦。

請求隊列

請求隊列是指如在Web環境下對使用者請求排隊,進而進行一些特殊控制:流量控制、請求分級、請求隔離;如将請求按照功能劃分到不同的隊列,進而使得不同的隊列出現問題後互相不影響;還可以對請求分級,一些重要請求可以優先處理(發展到一定程度應将功能實體分離);還有伺服器處理能力有限,在接近伺服器瓶頸時需要考慮限流,最簡單的限流時丢棄處理不了的請求,此時可以使用隊列進行流量控制。

資料總線隊列

一般消息隊列中的消息都是業務次元的,比如業務鍵或者業務狀态等,比如哪個SKU變更了,而有些訂閱者需要再查一遍來擷取最新的修改資料(比如緩存同步);通過現有的消息隊列方式的缺點是很難隻進行修改部分的推送和保證資料有序性。而此種場景比較适合使用資料總線隊列實作。如資料庫資料修改後需要同步資料到緩存,或者需要将一個機房資料同步到另一個機房,隻是資料次元的同步,此時應該使用資料總線隊列如canal、otter、databus;使用資料總線隊列的好處是可以保證資料的有序性。

混合隊列

曾介紹過該方式的隊列,使用混合隊列來解決實際問題。

高并發系統 隊列 減庫存 不知道誰依賴“我”的處理結果、不關心其他系統如何處理後續業務

此處MQ是使用京東自研的JMQ,消息是可靠持久化存儲的;應用會按照不同的次元釋出消息到JMQ;下遊應用接收到該消息後會放入到Redis,使用Redis List來存儲這些任務;應用将Redis消息消費處理後,會按照不同的次元聚合商品消息然後再次發送出去。

使用Redis隊列的主要原因是想提升消息堆積能力和并發處理能力。另外在使用Redis建構消息隊列時需要考慮網絡抖動造成的消息丢失問題,因為Redis是沒有復原事務的,或者說是确認機制。我們使用如下方式防止消息丢失:

try {      
id = queueRedis.opsForList().rightPopAndLeftPush(queueName, processingQueueName);      
} catch (Exception e) {      
//發生了網絡異常,需要把processing中的id再放回到waiting queue中
    String msg = queueName + " to " + processingQueueName + " rpoplpush error";      
LOG.error(msg, e);      
//報警代碼      
}      

而對于失敗我們會進行重試三次,重試失敗後放入失敗隊列,而失敗隊列是具有防重功能的(從本地隊列和失敗隊列排重),使用的是Redis Lua腳本實作:

static EventQueueScript ADD_TO_FAIL_QUEUE_REDIS_SCRIPT = new EventQueueScript(      
"redis.call('lrem', KEYS[1], 1, ARGV[1]) redis.call('lrem', KEYS[2], 1, ARGV[1]) return redis.call('lpush', KEYS[2], ARGV[1])");      

Redis作者Antirez開發的記憶體分布式消息隊列Disque是未來更好的記憶體消息隊列選擇。

其他

優先級隊列:在實際開發時肯定有些任務是緊急的,此時應該優先處理緊急的任務;是以請考慮對隊列進行分級。

副本隊列:在進行一些系統重構或者上新的功能時,如果沒有足夠的信心保證業務邏輯正确,可以考慮存儲一份隊列的副本(比如1小時、1天的),進而當業務出現問題時可以對這些消息進行回放。

鏡像隊列:每個隊列不會無限制訂閱數量,一定會有一個極限的;當到達極限時請考慮使用鏡像隊列方式解決該問題。

隊列并發數:不同隊列實作,隊列服務端并發連接配接數是不一樣的;一定不是增大隊列并發連接配接數消費能力也随着增加;也不會因為增加了消費伺服器消費并發能力也随着增加,需要根據實際情況來設定合理的并發連接配接數。

推還是拉:消息體内容不是越全越好,需要根據具體業務設計消息體;如有些系統依賴商品變更消息(隻有一個SKU)、有些系統依賴商品狀态消息(SKU、狀态)、有些系統依賴商品屬性變更消息(SKU、變更的屬性)等,如果讓所有系統都消費商品變更消息,那麼這些系統都會調用商品查詢服務拉一下最新的商品資訊然後進行處理。是以要根據實際情況來決定是使用推送方式(将系統需要的所有資訊推過去)還是拉取方式(隻推送ID,然後再查一遍)。

消息合并:如果消息寫入量非常大,應該考慮将消息合并寫,可以"寫應用本地磁盤隊列"-->“同步本地磁盤隊列到消息中間件”;同步時可以根據需求制定同步政策,如1秒同步1次。

繼續閱讀