利用java鎖機制來解決高并發情況下,緩存未命中,大量請求到達資料庫,進而引起資料庫崩潰的情形。
關于緩存的使用,個人經驗還是比較欠缺,對于緩存在應用系統中的使用也隻是前幾個月在公司實習的時候,簡單的使用過,且使用的都是人家把架構搭建好的,至于緩存在并發情況下會産生的一系列問題都已經被架構處理好了,我所做的隻是set和get,至于使用時緩存在并發情況下到底會出現什麼樣的問題,該如何去解決和避免這些問題,沒有去深究。
秉着“學而時習之”的态度(T_T自己太懶,厚着臉皮),這兩天在鼓搗redis,至于redis的基本使用還是挺簡單的,今天要說的是我在這個過程中看到網上部落格一直提的關于緩存使用的各種問題,看到好多前輩在高并發下使用緩存都踩了不少大坑,總結他人的經驗也是給自己以後警醒。今天這篇部落格隻講我對一個問題的了解與思路的想法,并不會去羅列緩存在各種場景下各種解決方案以及各種解決方案之間的優劣,我沒有實際解決緩存問題的經驗,不敢妄自下結論。以下是個人學習過程的記錄,希望各路大俠交流學習。
場景描述:高并發情況緩存未命中進而通路資料庫造成壓力陡增崩潰
最終解決方案:java中利用讀寫鎖處理并發并發情形

一、少量請求下緩存的簡單使用
首先,我們知道使用緩存的基本流程,首先根據key查詢緩存,查詢成功直接傳回,查詢失敗(緩存未命中),則查詢資料庫得到結果寫入緩存再傳回。根據前面的場景假設和緩存使用邏輯,請看下面的一段代碼:
1 /**
2 * 根據商品id(也是主鍵)查詢商品庫存記錄
3 */
4 public GoodsStock selectByPrimaryKey(Integer id) {
5 GoodsStock result;
6 //緩存中查找
7 String goodsStockJsonStr = RedisCache.get(id);
8
9 //緩存中查找成功
10 if(!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) {
11 logger.info("=====query from cache=====");
12 return JSONObject.parseObject(goodsStockJsonStr,GoodsStock.class);
13 }
14 //沒有命中緩存,資料庫中查找,并将結果寫入緩存
15 logger.info("=====query from DB=====");
16 result = goodsStockMapper.selectByPrimaryKey(id);
17 //查詢結果寫入緩存
18 RedisCache.set(id, JSONArray.toJSONString(result));
19 return result;
20 }
以上代碼運作結果,第一次運作緩存中參照失敗是從資料庫中查找,後面每次運作查找相同的id,都是從緩存中得到(這裡咱先不讨論緩存的失效時間之類),隻查詢了資料庫一次,由于每次運作都是單個請求,這段代碼沒有任何問題,現在在多線程下測試這個查詢服務,看看會出現什麼情況:
1 /**
2 * 10個線程并發調用服務
3 */
4 @Test
5 public void testMultiThreadQuery() throws Exception{
6 for(int i = 0; i < 10; i++) {
7 new Thread(new QueryTask()).start();
8 countDownLatch.countDown(); //啟動線程達到10個時,10個線程同時執行查詢
9 }
10 Thread.sleep(5000);
11 }
12
13 private class QueryTask implements Runnable {
14 @Override
15 public void run() {
16 try {
17 countDownLatch.await();
18 } catch (InterruptedException e) {
19 e.printStackTrace();
20 }
21 GoodsStock goodsStock = goodsStockService.selectByPrimaryKey(GOODS_ID);
22 }
23 }
運作前我們先将緩存清空,讓服務請求緩存是出現緩存未命中的情況,正常情況是隻要有一個請求查詢出現緩存未命中,那麼就回去查詢資料庫,查詢成功後将結果寫入緩存,這樣後續的請求再查詢統一記錄時,就應該直接從緩存傳回,而不再去查詢資料庫。我們來看看運作結果,運作結果也在預料之中。
1918 [Thread-12] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
1918 [Thread-5] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
1918 [Thread-13] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
1918 [Thread-8] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
1918 [Thread-7] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
1918 [Thread-9] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
1918 [Thread-6] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
1918 [Thread-4] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
1918 [Thread-10] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
1918 [Thread-11] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
我們可以看到,10個同時到達的請求基本上都是去查詢的資料庫,這點很好了解,因為10個請求同時到達,同時查詢緩存,同時發現緩存沒命中,同時去查資料庫。在這種情況下,本來後面的請求應該讀取緩存進而達到減輕資料庫壓力的效果,然而在前面這麼多“同時”的情形下,緩存失去了它原有的效果。如果這裡不隻10個請求同時到達,而是在類似秒殺場景下同時有成千上萬個請求到達,那麼資料庫肯定不能承受之重直至崩潰。這種場景就很類似于高并發情況下的緩存擊穿(緩存擊穿是指在高并發情況下,大量請求查詢一個并不存在的資料,由于資料不存在,肯定會出現緩存不命中,然後去查詢資料庫,然後導緻資料庫崩潰。)
既然我們清楚得知道問題出現在同時查詢資料庫這裡,那麼很容易就想到利用鎖機制,隻讓一個請求去查詢資料庫。
二、高并發情況下緩存使用
利用java提供的鎖機制,讓所有請求到達查詢服務時,若緩存沒有命中,就去競争一把鎖,得到鎖的請求才去查詢資料庫,并将查詢結果寫回緩存,後面的請求就直接從緩存中讀取,并發情況下改進代碼如下:
1 /**
2 * 根據商品id(也是主鍵)查詢商品庫存記錄
3 */
4 public GoodsStock selectByPrimaryKey(Integer id) {
5 GoodsStock result;
6 //緩存中查找
7 String goodsStockJsonStr = RedisCache.get(id);
8
9 //緩存中查找成功
10 if(!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) {
11 logger.info("=====query from cache=====");
12 return JSONObject.parseObject(goodsStockJsonStr,GoodsStock.class);
13 }
14 //沒有命中緩存,這裡加鎖去資料庫中查找,并将結果寫入緩存
15 //後續獲得鎖的線程會直接從緩存中讀取,而不再是通路資料庫
16 synchronized(this) {
17 goodsStockJsonStr = RedisCache.get(id);
18 if(!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) {
19 logger.info("=====query from cache=====");
20 return JSONObject.parseObject(goodsStockJsonStr,GoodsStock.class);
21 }
22 logger.info("=====query from DB=====");
23 result = goodsStockMapper.selectByPrimaryKey(id);
24 //查詢結果寫入緩存
25 RedisCache.set(id, JSONArray.toJSONString(result));
26 }
27 return result;
28 }
這裡,我們對緩存未命中查詢資料庫的部分進行加鎖進行同步處理,同步代碼塊中再查詢了一次緩存,這樣就保證了同時到達但未獲得鎖的線程後面會直接讀取緩存中的資料而不再通路資料庫。進而大量減少了同一時刻對資料庫的通路量。
我們看看運作結果,可以發現,隻有第一次查詢是從資料庫中查詢,後續查詢全來自緩存:
1 1907 [Thread-11] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
2 2550 [Thread-12] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache=====
3 2578 [Thread-8] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache=====
4 2579 [Thread-7] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache=====
5 2580 [Thread-10] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache=====
6 2581 [Thread-13] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache=====
7 2581 [Thread-5] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache=====
8 2581 [Thread-4] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache=====
9 2582 [Thread-6] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache=====
10 2582 [Thread-9] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache=====
至此,上面提到的在并發的情況查詢緩存的問題基本上可以解決,但是我們都知道,在java中sychronized屬于重量級鎖,讀寫鎖更适合這樣的場景。
三、高并發情況下緩存使用,利用讀寫鎖提高效率
這個地方為甚麼加上讀寫鎖的性能就更高些,這裡涉及到java中的鎖機制問題,就不展開寫,待後面研究清楚再另外單獨記錄。
1 /**
2 * 根據商品id(也是主鍵)查詢商品庫存記錄
3 */
4 public GoodsStock selectByPrimaryKey(Integer id) {
5 GoodsStock result;
6 readWriteLock.readLock().lock();//添加讀鎖
7 try {
8 //緩存中查找
9 String goodsStockJsonStr = RedisCache.get(id);
10 //緩存中查找成功
11 if (!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) {
12 logger.info("=====query from cache=====");
13 result = JSONObject.parseObject(goodsStockJsonStr, GoodsStock.class);
14 } else {
15 //若緩存讀取失敗,則需要去資料庫中查詢
16 readWriteLock.readLock().unlock();//釋放讀鎖
17 readWriteLock.writeLock().lock();//添加寫鎖
18 try {
19 goodsStockJsonStr = RedisCache.get(id);
20 if (!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) {
21 logger.info("=====query from cache=====");
22 return JSONObject.parseObject(goodsStockJsonStr, GoodsStock.class);
23 }
24 logger.info("=====query from DB=====");
25 result = goodsStockMapper.selectByPrimaryKey(id);
26 //查詢結果寫入緩存
27 RedisCache.set(id, JSONArray.toJSONString(result));
28 } finally {
29 readWriteLock.writeLock().unlock();
30 readWriteLock.readLock().lock();
31 }
32 }
33 } finally {
34 readWriteLock.readLock().unlock();
35 }
36 return result;
37 }
這個地方補充一下,從上面的代碼我們可以看到,其實整個查詢方法,主要的業務代碼隻有一行:
1 result = goodsStockMapper.selectByPrimaryKey(id);
剩餘的其他代碼都是無關于業務的其他處理,我們在業務中應該盡量将非業務的代碼抽離出來包裝,使真正的業務代碼簡單高效。對于類似以上這種場景,我們可以使用模闆方法,在此簡單補充一下:
查詢業務的模闆方法:
1 /**
2 * 并發處理的緩存查詢模闆方法
3 * @param queryKey 查詢鍵值
4 * @param expire 緩存過期時間
5 * @param unit 時間機關
6 * @param typeReference 傳入泛型類型的類對象
7 * @param cacheLoadable 業務回調類
8 * @param <T>
9 * @return
10 */
11 public <T> T queryByCache(String queryKey, long expire, TimeUnit unit,
12 TypeReference<T> typeReference, CacheLoadable<T> cacheLoadable) {
13 T result;
14 readWriteLock.readLock().lock();//添加讀鎖
15 try {
16 //緩存中查找
17 String goodsStockJsonStr = RedisCache.get(queryKey);
18 //緩存中查找成功
19 if (!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) {
20 logger.info("=====query from cache=====");
21 result = JSONObject.parseObject(goodsStockJsonStr, typeReference);
22 } else {
23 //若緩存讀取失敗,則需要去資料庫中查詢
24 readWriteLock.readLock().unlock();//釋放讀鎖
25 readWriteLock.writeLock().lock();//添加寫鎖
26 try {
27 goodsStockJsonStr = RedisCache.get(queryKey);
28 if (!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) {
29 logger.info("=====query from cache=====");
30 return JSONObject.parseObject(goodsStockJsonStr, typeReference);
31 }
32 logger.info("=====query from DB=====");
33 //這裡調用業務傳入的回調方法,真正處理業務的地方隻有這一行
34 result = cacheLoadable.load();
35 RedisCache.set(queryKey, JSONArray.toJSONString(result));
36 } finally {
37 readWriteLock.writeLock().unlock();
38 readWriteLock.readLock().lock();
39 }
40 }
41 } finally {
42 readWriteLock.readLock().unlock();
43 }
44 return result;
45 }
然後我們再業務使用的時候,隻需要像如下調用即可:
1 public GoodsStock queryByTemplate(Integer id) {
2 return cacheServiceTemplate.queryByCache(String.valueOf(id), 0, null,
3 new TypeReference<GoodsStock>() {}, new CacheLoadable<GoodsStock>() {
4 @Override
5 public GoodsStock load() {
6 return goodsStockMapper.selectByPrimaryKey(id);
7 }
8 });
9 }
四、總結
文章中完整源碼:https://github.com/Gonjan/javaPractice/tree/master/src
寫到最後,這篇文章也沒多少幹貨,其實就是将自己動手實踐的一部分記錄下來而已,看來畢竟是部落格寫得太少,沒有啥章法,想到哪兒寫到哪,比較亂,寫着寫着就跑偏了(T_T真是哭死),還需多多練習才行。
By--- Gonjan
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,