- 本地緩存:單體應用時沒有什麼問題,但是當微服務叢集的時候,就會出現資料不一緻性以及每次還需要重複查詢的問題。
- 分布式緩存:可以很好的解決本地緩存的問題,使用緩存中間件。
SpringBoot使用Redis.
1、引入redis-starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、簡單配置redis
redis:
port: 6379
host: 121.43.234.157
3、使用springboot配置好的 StringRedisTemplate 模闆.
項目中自測使用:
//TODO 如果使用低版本的redis, 對redis進行壓力測試時,會産生堆外記憶體洩漏異常: OutOfDirectMemoryError
// 1)springboot2.0以後預設使用lettuce作為操作redis的用戶端.它使用netty進行網絡通信.
// 2)lettuce的bug導緻netty堆外記憶體溢出.netty如果沒有指定堆外記憶體,預設使用-Xmlx300m
//可以通過 -Dio.netty.maxDirectMemory 隻去調大堆外記憶體.
//解決方案:不能使用-Dio.netty.maxDirectMemory隻去調大堆外記憶體.
//1)更新 lettuce用戶端. 2)切換使用jedis
//redisTemplate:
// lettuce、jedis操作redis的底層用戶端,Spring再次封裝了redisTemplate.
/**
*
* 使用redis作為緩存
*
* @return
*/
public Map<String, List<Catelog2Vo>> getCatelogJson() {
String catelogJSON = redisTemplate.opsForValue().get("catelogJSON");
//如果redis中沒有就從資料庫查詢并将查詢結果放入redis中.
if (StringUtils.isEmpty(catelogJSON)) {
Map<String, List<Catelog2Vo>> jsonFromDB = getCatelogJsonFromDB();
//存入轉為json存儲的好處:json是跨言,跨平台相容
redisTemplate.opsForValue().set("catelogJSON", JSON.toJSONString(jsonFromDB));
return jsonFromDB;
}
//如果redis中有,就轉為我們指定對象
Map<String, List<Catelog2Vo>> stringListMap = JSON.parseObject(catelogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
// JSONObject jsonObject = JSON.parseObject(catelogJSON);
return stringListMap;
}
高并發下緩存失效問題
緩存穿透:
- 指查詢一個一定不存在的資料,由于緩存是不命中的,将去查詢資料庫,但是資料庫也無此記錄,我們沒有将這次查詢的null寫入緩存,這将導緻這個不存在的資料每次請求都要到存儲層的去查詢,失去了緩存的意義。
- 風險:利用不存在的資料進行攻擊,資料庫瞬時壓力增大,最終導緻崩潰。
- 解決:将查詢到的null結果緩存,并加入短暫過期時間。布隆過濾器、mvc攔截器
緩存雪崩:
- 緩存雪崩是指我們設定緩存時key采用了全部相同的過期時間,導緻緩存在某一時刻同時失效,請求全部轉發到DB,DB瞬時壓力過重雪崩。
- 解決:
- 再原來失效時間的基礎上增加一個随機值,比如1-5分鐘随機,這樣每一個緩存的過期時間的重複率就會降低,就很難引發集體失效的事件。
- 設定熱點資料永遠不過期。
- 出現雪崩:降級 熔斷
- 事後:利用 redis 持久化機制儲存的資料盡快恢複緩存
緩存擊穿:
- 對于一些設定了過期時間的key,如果這些key可能會在某些時間點被超高并發的通路,是一種非常"熱點"的資料。但是這個key恰巧在大量請求到來之前正好失效,那麼所有對這個key的資料查詢全部落到了DB,我們稱之為緩存擊穿。
- 加鎖:大量并發隻讓一個線程去查詢,其它線程等待,查詢到以後釋放鎖,其它人獲得鎖之後,先查詢緩存中的資料,如果有就不用再去查詢DB。
通過本地鎖實作緩存
public Map<String, List<Catelog2Vo>> getCatelogJson() {
String catelogJSON = redisTemplate.opsForValue().get("catelogJSON");
//如果redis中沒有就從資料庫查詢并将查詢結果放入redis中.
if (StringUtils.isEmpty(catelogJSON)) {
System.out.println("緩存未命中....");
/**
* 解決緩存穿透、緩存雪崩、緩存擊穿
*
*/
Map<String, List<Catelog2Vo>> jsonFromDB = getCatelogJsonLocalLock();
return jsonFromDB;
}
System.out.println("緩存成功命中....");
//如果redis中有,就轉為我們指定對象
Map<String, List<Catelog2Vo>> stringListMap = JSON.parseObject(catelogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return stringListMap;
}
/**
* 本地鎖(适用于單個執行個體,分布式環境下不再适用)
* @return
*/
public Map<String, List<Catelog2Vo>> getCatelogJsonLocalLock() {
/**
* 加鎖:解決緩存擊穿問題(适用單體項目)
* 隻要是同一把鎖就可以鎖住需要鎖住這個鎖的所有線程
* synchronized (this):對于SpringBoot容器中所有元件都是單例的,是以使用this可以鎖住
*/
//TODO 本地鎖:synchronized、JUC(Lock),本地鎖隻可以鎖住目前服務的所有線程,但是分布式下是多個執行個體,是以要想鎖住所有就必須使用分布式鎖.
synchronized (this) {
return getCatelogJsonFromDB();
}
}
/**
* 從資料庫查詢資料
* @return
*/
private Map<String, List<Catelog2Vo>> getCatelogJsonFromDB() {
//加鎖之後還需要再次判斷緩存中是否有緩存
String catelogJSON = redisTemplate.opsForValue().get("catelogJSON");
if (!StringUtils.isEmpty(catelogJSON)) {
Map<String, List<Catelog2Vo>> stringListMap = JSON.parseObject(catelogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return stringListMap;
}
System.out.println("查詢了資料庫.."+Thread.currentThread().getName());
//擷取所有的節點資訊
List<CategoryEntity> levels = baseMapper.selectList(null);
//擷取所有的一級分類節點
List<CategoryEntity> level1Categorys = getParent_cid(levels, 0L);
Map<String, List<Catelog2Vo>> collect1 = null;
if (level1Categorys != null) {
collect1 = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
//還需要繼續封裝, 封裝父節點下面的子節點
List<CategoryEntity> category2List = getParent_cid(levels, v.getCatId());
List<Catelog2Vo> c2List = null;
if (category2List != null) {
c2List = category2List.stream().map(c2 -> {
Catelog2Vo c2Vo = new Catelog2Vo(c2.getCatId().toString(), c2.getName(), v.getCatId().toString(), null);
List<Catalog3Vo> collect = null;
//需要繼續封裝 孫子節點資料
List<CategoryEntity> c3List = getParent_cid(levels, c2.getCatId());
if (c3List != null) {
collect = c3List.stream().map(c3 -> {
Catalog3Vo catalog3Vo = new Catalog3Vo(c3.getCatId().toString(), c3.getName(), c2.getCatId().toString());
return catalog3Vo;
}).collect(Collectors.toList());
}
c2Vo.setCatalog3List(collect);
return c2Vo;
}).collect(Collectors.toList());
}
return c2List;
}));
}
//存入轉為json存儲的好處:json是跨言,跨平台相容
redisTemplate.opsForValue().set("catelogJSON", JSON.toJSONString(collect1), 1, TimeUnit.DAYS);
return collect1;
}
通過上述代碼及圖分析可知,當服務為單體時,即不是分布式環境,本地鎖可以解決緩存擊穿問題。在分布式環境下,如果使用本地鎖,有多少個服務還是要去查詢多少次DB.是以并沒有徹底的解決問題,還需要分布式鎖來實作。
Redis 分布式鎖
實作原理
SET key value [EX seconds] [PX milliseconds] [NX|XX]
代碼實作:
/**
* Redis 分布鎖 基本原理 SET key value [EX seconds] [PX milliseconds] [NX|XX]
* 主要邏輯:加鎖的原理性操作、删鎖的原子性操作
*
* @return
*/
public Map<String, List<Catelog2Vo>> getCatelogJsonRedisLock() {
//分布式鎖,占鎖
Boolean hasLock = redisTemplate.opsForValue().setIfAbsent("lock", "locak");
//占鎖成功
if (hasLock) {
//執行業務邏輯
Map<String, List<Catelog2Vo>> jsonFromDB = getCatelogJsonFromDB();
//删除鎖
redisTemplate.delete("lock");
return jsonFromDB;
} else {
//采用自旋的方式
return getCatelogJsonRedisLock();
}
}
其中占鎖的方法 setIfAbsent()使用的是 SETNX.
代碼此時還存在問題,當執行完業務邏輯業務代碼異常或系統當機此時解鎖代碼并沒有執行,就會造成死鎖。
解決方法:設定鎖的有效時間即自動過期,即使沒有删除,到期後也會自動删除。即
設定key自動過期:
/**
* Redis 分布鎖 基本原理 SET key value [EX seconds] [PX milliseconds] [NX|XX]
* 主要邏輯:加鎖的原理性操作、删鎖的原子性操作
*
* @return
*/
public Map<String, List<Catelog2Vo>> getCatelogJsonRedisLock() {
//分布式鎖,占鎖
Boolean hasLock = redisTemplate.opsForValue().setIfAbsent("lock", "locak");
//占鎖成功
if (hasLock) {
//設定key值自動過期時間
redisTemplate.expire("lock",300,TimeUnit.SECONDS);
//執行業務邏輯
Map<String, List<Catelog2Vo>> jsonFromDB = getCatelogJsonFromDB();
//删除鎖
redisTemplate.delete("lock");
return jsonFromDB;
} else {
//采用自旋的方式
return getCatelogJsonRedisLock();
}
}
設定看key值自動過期時間之後還是存在問題:
過期時間和占鎖必須是原子的
。如果不是原子的話,有可能在設定過期時間之前系統當機,就又會有死鎖問題發生。
解決方法:将占鎖和設定過期時間為原子操作。redis文法支援 set key value EX seconds NX.即
public Map<String, List<Catelog2Vo>> getCatelogJsonRedisLock() {
//分布式鎖,占鎖
// Boolean hasLock = redisTemplate.opsForValue().setIfAbsent("lock", "locak");
Boolean hasLock = redisTemplate.opsForValue().setIfAbsent("lock", "locak", 300, TimeUnit.SECONDS);
//占鎖成功
if (hasLock) {
//設定key值自動過期時間
// redisTemplate.expire("lock",300,TimeUnit.SECONDS);
//執行業務邏輯
Map<String, List<Catelog2Vo>> jsonFromDB = getCatelogJsonFromDB();
//删除鎖
redisTemplate.delete("lock");
return jsonFromDB;
} else {
//采用自旋的方式
return getCatelogJsonRedisLock();
}
}
此時還存在問題: 如果我們直接删除鎖的話,可能會由于業務執行時間過長,而此時鎖又已經自動過期,我們直接執行删除操作,可能會把别人的鎖給删除了。
解決:占鎖的時候,指定一個UUID作為value值,删除鎖之前進行判斷操作。即:
/**
* Redis 分布鎖 基本原理 SET key value [EX seconds] [PX milliseconds] [NX|XX]
* 主要邏輯:加鎖的原理性操作、删鎖的原子性操作
*
* @return
*/
public Map<String, List<Catelog2Vo>> getCatelogJsonRedisLock() {
String s = UUID.randomUUID().toString();
//分布式鎖,占鎖
// Boolean hasLock = redisTemplate.opsForValue().setIfAbsent("lock", "locak");
Boolean hasLock = redisTemplate.opsForValue().setIfAbsent("lock", s, 300, TimeUnit.SECONDS);
//占鎖成功
if (hasLock) {
//設定key值自動過期時間
// redisTemplate.expire("lock",300,TimeUnit.SECONDS);
//執行業務邏輯
Map<String, List<Catelog2Vo>> jsonFromDB = getCatelogJsonFromDB();
//删除鎖
String value = redisTemplate.opsForValue().get("lock");
if (value.equals(s)){
redisTemplate.delete("lock");
}
return jsonFromDB;
} else {
//采用自旋的方式
return getCatelogJsonRedisLock();
}
}
雖然進行了value值的判斷,但是删除key仍然不是原子操作,有可能我們在擷取value值傳回的路上此時key值自動到期,我們還是會删除錯誤。 解決:将删除操作也變為原子操作. 調用 execute()方法.
/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.RedisOperations#execute(org.springframework.data.redis.core.script.RedisScript, java.util.List, java.lang.Object[])
*/
@Override
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
return scriptExecutor.execute(script, keys, args);
}
public Map<String, List<Catelog2Vo>> getCatelogJsonRedisLock() {
String s = UUID.randomUUID().toString();
//分布式鎖,占鎖
// Boolean hasLock = redisTemplate.opsForValue().setIfAbsent("lock", "locak");
Boolean hasLock = redisTemplate.opsForValue().setIfAbsent("lock", s, 300, TimeUnit.SECONDS);
//占鎖成功
if (hasLock) {
//設定key值自動過期時間
// redisTemplate.expire("lock",300,TimeUnit.SECONDS);
//執行業務邏輯
Map<String, List<Catelog2Vo>> jsonFromDB = getCatelogJsonFromDB();
//删除鎖
String value = redisTemplate.opsForValue().get("lock");
// if (value.equals(s)){
// redisTemplate.delete("lock");
// }
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return " +
"redis.call('del', KEYS[1]) else return 0 end";
//方法參數 : 腳本和傳回值類型, 參數,根據key擷取的value值 ,原子操作腳本删除
Long execute = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), value);
return jsonFromDB;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//采用自旋的方式
return getCatelogJsonRedisLock();
}
}
此時,還有鎖得自動續期。即當業務執行過程中,鎖得有限期時間到了,怎麼保證程式可以繼續執行下去。最簡單的辦法,将業務邏輯 try cayth.
自己手寫分布式緩存邏輯 代碼過于啰嗦且還沒有更好的解決鎖的自動續期問題,是以又引出了
Redisson
,它提供了分布式鎖等其它有關分布式的内容.
概述: 官網
Redisson是一個在Redis的基礎上實作的Java駐記憶體資料網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),進而讓使用者能夠将精力更集中地放在處理業務邏輯上。
- 環境搭建
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
使用配置
Redisson 通過RedissonClient 調用.
@Configuration
public class MyredissonConfig {
@Value("${spring.redis.host}")
private String ipAddr;
@Bean(destroyMethod="shutdown")
RedissonClient redisson() throws IOException {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + ipAddr + ":6379");
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
-
可重入鎖(Reentrant Lock)
基于Redis的Redisson分布式可重入鎖RLock Java對象實作了java.util.concurrent.locks.Lock接口。
Redisson内部提供了一個監控鎖的看門狗,它的作用是在Redisson執行個體被關閉前,不斷的延長鎖的有效期。預設情況下,看門狗的檢查鎖的逾時時間是30秒鐘。另外Redisson還通過加鎖的方法提供了leaseTime的參數來指定加鎖的時間。超過這個時間後鎖便自動解開了,此時不會自動續時,是以要注意指定加鎖的時間與業務處理的時間。是以可以通過可重入鎖的看門狗機制解決redis鎖的自動過期時間。
@GetMapping("/test")
@ResponseBody
public String test() throws InterruptedException {
//擷取一把鎖,隻要鎖得名字一樣就是同一把鎖
RLock lock = redisson.getLock("my-lock");
//加鎖
//lock.lock(); //阻塞式等待 RLock 内部提供了一個監控鎖的看門狗,它的作用是在Redisson執行個體被關閉前,不斷的延長鎖的有效期。
//鎖的自動續期,會自動給鎖續上新的時間30s,不用擔心業務時間太長,鎖自動過期。
//加鎖的業務隻要運作完成,就不會給目前鎖續期,即使不手動解鎖,鎖會在預設時間到期後自動删除.
//*******************************
//如果我們傳遞了鎖的逾時時間就給redis發送逾時腳本 預設逾時時間就是我們指定的
//如果我們未指定,就使用 30 * 1000 [LockWatchdogTimeout] Rlock看門狗的預設時間 隻要占鎖成功 就會啟動一個定時任務 任務就是重新給鎖設定過期時間
// 這個時間還是 [LockWatchdogTimeout] 的時間 1/3 看門狗的時間續期一次 續成滿時間
lock.lock(10, TimeUnit.SECONDS); //十秒之後自動解鎖,且不會續期,是以要求 lessTime一定要大于業務執行時間,否則會發生死鎖.
try {
System.out.println("加鎖成功..執行業務邏輯方法..." + Thread.currentThread().getId());
Thread.sleep(20000);
} finally {
//釋放鎖 假設沒有手動釋放鎖,redisson也不會出現死鎖現象.
System.out.println("釋放鎖.." + Thread.currentThread().getId());
lock.unlock();
}
return "test";
}
實戰代碼:
/**
1. 使用 redisson 配置redis分布式鎖
2. */
public Map<String, List<Catelog2Vo>> getCatelogJsonRedissonLock() {
RLock lock = redisson.getLock("catelog-lock");
lock.lock(30000, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> catelogJsonFromDB = null;
try {
catelogJsonFromDB = getCatelogJsonFromDB();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return catelogJsonFromDB;
}
如果傳遞了鎖的逾時時間,就執行腳本,進行占鎖;
如果沒傳遞鎖時間,使用看門狗的時間,占鎖。如果傳回占鎖成功future,調用future.onComplete();
沒異常的話調用scheduleExpirationRenewal(threadId);
重新設定過期時間,定時任務;
看門狗的原理是定時任務:重新給鎖設定過期時間,新的過期時間就是看門狗的預設時間;
鎖時間1/3是定時任務周期,當時間過了預設時間的三分之一 即十秒,就會自動續時間為30s;
-
讀寫鎖(ReadWriteLock)
基于Redis的Redisson分布式可重入讀寫鎖
Java對象實作了RReadWriteLock
java.util.concurrent.locks.ReadWriteLock
接口。其中讀鎖和寫鎖都繼承了RLock接口。
分布式可重入讀寫鎖允許同時有多個讀鎖和一個寫鎖處于加鎖狀态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
/**
* 讀寫鎖
* 保證一定可以讀取到最新資料,修改期間,寫鎖是一個排他鎖(互斥鎖、獨享鎖)。讀鎖是一個共享鎖
* 寫鎖沒有釋放之前,讀就必須等待
* 讀+讀: 相當于無鎖,并發讀,隻會在redis中記錄好
* 寫+讀: 需等待寫鎖釋放
* 寫+寫: 阻塞方式
* 讀+寫:有讀鎖,寫也必須要等待
* 隻要有寫鎖存在,都必須要等待.
*/
@GetMapping("/writeLock")
@ResponseBody
public String writeLock() {
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
//擷取寫鎖
RLock wLock = lock.writeLock();
wLock.lock();
String s = "";
try {
System.out.println("進行寫加鎖.." + Thread.currentThread().getId());
Thread.sleep(10000);
s = UUID.randomUUID().toString();
redisTemplate.opsForValue().set("write-Value", s);
} catch (Exception e) {
} finally {
System.out.println("寫解鎖.." + Thread.currentThread().getId());
wLock.unlock();
}
return s;
}
@GetMapping("/readLock")
@ResponseBody
public String readLock() {
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
//擷取寫鎖
RLock rLock = lock.readLock();
rLock.lock();
String s = "";
try {
System.out.println("進行讀加鎖.." + Thread.currentThread().getId());
// Thread.sleep(10000);
s = (String) redisTemplate.opsForValue().get("write-Value");
} catch (Exception e) {
} finally {
System.out.println("讀解鎖.." + Thread.currentThread().getId());
rLock.unlock();
}
return s;
}
- 信号量(Semaphore)
/**
* 信号量
* 信号量:也可以用作限流
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
//獲得信号量鎖
RSemaphore semaphore = redisson.getSemaphore("park");
// semaphore.acquire(); //擷取一個信号值,擷取一個值,占一個車位
boolean b = semaphore.tryAcquire();
return "擷取車位 =>" + b;
}
@GetMapping("/go/park")
@ResponseBody
public String gopark() throws InterruptedException {
//獲得信号量鎖
RSemaphore semaphore = redisson.getSemaphore("park");
semaphore.release();
return "park車位加一";
}
- 閉鎖(CountDownLatch)
/**
* 閉鎖 CountDownLatch
*/
@GetMapping("/lockDoor")
@ResponseBody
public String CountDownLatch() throws InterruptedException {
RCountDownLatch countDownLatch = redisson.getCountDownLatch("door");
boolean b = countDownLatch.trySetCount(5);
countDownLatch.await();
return "所有年級全部放假了.可以關閉校門了..";
}
@GetMapping("/go/{id}")
@ResponseBody
public String gogogo(@PathVariable("id") Long id) throws InterruptedException {
RCountDownLatch countDownLatch = redisson.getCountDownLatch("door");
//每次通路一次相當一 放假了一個年級.
countDownLatch.countDown();
return id+"年級放假了..";
}
緩存和資料庫一緻性
分布式保持緩存和資料庫一緻性:
-
雙寫模式:寫資料庫後,寫緩存
問題:
- 失效模式:寫完資料庫後,删緩存,等下次查詢更新緩存 解決方案:
SpringCache 官網
spring從3.1開始定義了Cache、CacheManager接口來統一不同的緩存技術。并支援使用JCache(JSR-107)注解簡化我們的開發。
每次調用需要緩存功能的方法時,spring會檢查檢查指定參數的指定的目标方法是否已經被調用過;如果有就直接從緩存中擷取方法調用後的結果,如果沒有就調用方法并緩存結果後傳回給使用者。下次調用直接從緩存中擷取。
常用注解:
- @Cacheable: Triggers cache population. 觸發将資料儲存到緩存的操作.
- @CacheEvict: Triggers cache eviction.觸發将資料從緩存删除的操作. (失效模式)
-
@CachePut: Updates the cache without interfering with the method
execution.不影響方法執行更新緩存.(雙寫模式)
-
@Caching: Regroups multiple cache operations to be applied on a
method. 組合以上操作
-
@CacheConfig: Shares some common cache-related settings at
class-level. 在類級别共享緩存的相同配置.
- 環境
<!--SpringCache 緩存依賴,簡化緩存開發-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
指定緩存類型并在主配置類上加上注解
@EnableCaching
spring:
cache:
type: redis
redis:
time-to-live: 3600000 #指定緩存的存貨時間 ms
#key-prefix: CACHE_ #key的字首
use-key-prefix: true #是否啟用字首
cache-null-values: true #是否緩存Null值防止緩存 穿透
預設使用jdk進行序列化(可讀性差),預設ttl為-1永不過期,自定義序列化方式需要編寫配置類
@EnableConfigurationProperties(CacheProperties.class) //将配置類添加到容器中
@Configuration
@EnableCaching //開啟緩存
public class MyCacheConfig {
@Bean
public RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
org.springframework.data.redis.cache.RedisCacheConfiguration config = org.springframework.data.redis.cache.RedisCacheConfiguration
.defaultCacheConfig();
//指定緩存序列化方式為json
config = config.serializeValuesWith(
RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
//設定配置檔案中的各項配置,如過期時間
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixKeysWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}
自動配置原理:
CacheAutoConfiguration自動配置了RedisCacheConfiguration.
原理:CacheAutoConfiguration->RedisCacheConfiguration自動的配置了RedisCacheManager,初始化所有緩存->每個緩存決定使用什麼配置,redisCacheConfiguration有就用已有的,沒有就用預設配置->想要修改預設的配置,隻需要自定義
RedisCacheConfiguration即可->就會應用到目前RedisCacheManager管理的所有緩存分區中.
- @Cacheable 緩存中有就從緩存中擷取, 緩存中無就調用方法,并将結果放到緩存中.
開啟本地同步鎖sync = true
/**
* 查詢所有一級分類
*
* @Cacheable 緩存中有就從緩存中擷取, 緩存中無就調用方法,并将結果放到緩存中.
* 緩存的資料值 預設使用jdk序列化(可以通過配置 RedisCacheConfiguration 來設定為Json資料)
* 預設ttl時間 -1 (可以通過配置 RedisCacheConfiguration 來設定)
* key: 裡面預設會解析表達式 字元串用 '' SpEl
* value:【可以當做緩存的分區(按照業務類型去分 )】
*/
@Cacheable(value = {"category"}, key = "#root.method.name",sync = true)
@Override
public List<CategoryEntity> getLevel1Categorys() {
long l = System.currentTimeMillis();
List<CategoryEntity> list = categoryDao.getLevel1Categorys();
System.out.println("總耗時:" + (System.currentTimeMillis() - l));
return list;
}
改造後的查詢三級緩存菜單
/**
* 使用SpringCache 改造後的資料緩存, 開始無需判斷緩存中是否包含,因為 SpirngCache 使用了 本地鎖的方式解決緩存擊穿問題
* sync = true: --- 開啟本地同步鎖
* 都是先去緩存中讀取資料,如果沒有在 查詢,最後在将資料放到緩存裡面去。
*/
@Cacheable(value = "category",key = "#root.methodName",sync = true)
public Map<String, List<Catelog2Vo>> getCatelogJson() {
System.out.println("查詢了資料庫.." + Thread.currentThread().getName());
//擷取所有的節點資訊
List<CategoryEntity> levels = baseMapper.selectList(null);
//擷取所有的一級分類節點
List<CategoryEntity> level1Categorys = getParent_cid(levels, 0L);
Map<String, List<Catelog2Vo>> collect1 = null;
if (level1Categorys != null) {
collect1 = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
//還需要繼續封裝, 封裝父節點下面的子節點
List<CategoryEntity> category2List = getParent_cid(levels, v.getCatId());
List<Catelog2Vo> c2List = null;
if (category2List != null) {
c2List = category2List.stream().map(c2 -> {
Catelog2Vo c2Vo = new Catelog2Vo(c2.getCatId().toString(), c2.getName(), v.getCatId().toString(), null);
List<Catalog3Vo> collect = null;
//需要繼續封裝 孫子節點資料
List<CategoryEntity> c3List = getParent_cid(levels, c2.getCatId());
if (c3List != null) {
collect = c3List.stream().map(c3 -> {
Catalog3Vo catalog3Vo = new Catalog3Vo(c3.getCatId().toString(), c3.getName(), c2.getCatId().toString());
return catalog3Vo;
}).collect(Collectors.toList());
}
c2Vo.setCatalog3List(collect);
return c2Vo;
}).collect(Collectors.toList());
}
return c2List;
}));
}
return collect1;
}
- @CacheEvict 緩存失效模式, 觸發就從執行redis中删除操作
-
@Caching(evict = {
@CacheEvict(value = “category”, key = “‘getLevel1Categorys’”),
@CacheEvict(value = “category”, key = “‘getCatelogJson’”)
})
/**
* @CacheEvict 緩存失效模式, 觸發就從執行redis中删除操作
* @CacheEvict(value = "category",allEntries = true) 删除整個分區下面的資料緩存
*/
//@CacheEvict(value = "category", key = "'getLevel1Categorys'")
// @Caching(evict = {
// @CacheEvict(value = "category", key = "'getLevel1Categorys'"),
// @CacheEvict(value = "category", key = "'getCatelogJson'")
// })
@CacheEvict(value = "category",allEntries = true)
@Override
public void updateCascade(CategoryEntity category) {
this.updateById(category);
}
通過Debug分析RedisCache類中的方法,可以知道@Cacheable 使用本地鎖的方式來解決緩存擊穿問題.(先查詢緩存中是否有資料,有就從緩存中拿,沒有就查詢DB,之後将查詢結果放入到緩存.)
SpringCache原理與不足
- 讀模式
- 緩存穿透:查詢一個null的資料。解決方案:将查詢出來的null資料進行緩存,通過
設定.spring.cache.redis.cache-null-values=true
- 緩存擊穿:大量并發同時通路一個剛好過期的緩存資料。解決方案:加鎖,預設sync=false是不加鎖的,誰用sync=true,加本地鎖.
- 緩存雪崩:大量的key同時過期.解決:加随機過期時間。
2.寫模式(緩存與資料庫實時保持一緻):
- 讀寫加鎖.
- 引入Canal,感覺mysql的更新去更新Redis緩存.
- 讀多寫多的資料,直接去查詢資料庫.
3.總結:
正常資料(讀多寫少,即時性,一緻性要求不高的資料,完全可以使用Spring-Cache.
寫模式(隻要緩存的資料有過期時間就足夠了)
特殊資料:特殊設計: