天天看點

基于Spring接口,內建Caffeine+Redis兩級緩存

作者:碼農參上
原創:微信公衆号 碼農參上,歡迎分享,轉載請保留出處。

在上一篇文章Redis+Caffeine兩級緩存,讓通路速度縱享絲滑中,我們介紹了3種整合Caffeine和Redis作為兩級緩存使用的方法,雖然說能夠實作功能,但實作手法還是太粗糙了,并且遺留了一些問題沒有處理。本文将在上一篇的基礎上,圍繞兩個方面進行進一步的改造:

  • JSR107定義了緩存使用規範,spring中提供了基于這個規範的接口,是以我們可以直接使用spring中的接口進行Caffeine和Redis兩級緩存的整合改造
  • 在分布式環境下,如果一台主機的本地緩存進行修改,需要通知其他主機修改本地緩存,解決分布式環境下本地緩存一緻性問題

好了,在明确了需要的改進問題後,下面我們開始正式修改。

改造

在上篇文章的v3版本中,我們使用自定義注解的方式實作了兩級緩存通過一個注解管理的功能。本文我們換一種方式,直接通過擴充spring提供的接口來實作這個功能,在進行整合之前,我們需要簡單了解一下JSR107緩存規範。

JSR107 規範

在JSR107緩存規範中定義了5個核心接口,分别是CachingProvider,CacheManager,Cache, Entry和Expiry,參考下面這張圖,可以看到除了Entry和Expiry以外,從上到下都是一對多的包含關系。

基于Spring接口,內建Caffeine+Redis兩級緩存

從上面這張圖我們可以看出,一個應用可以建立并管理多個CachingProvider,同樣一個CachingProvider也可以管理多個CacheManager,緩存管理器CacheManager中則維護了多個Cache。

Cache是一個類似Map的資料結構,Entry就是其中存儲的每一個key-value資料對,并且每個Entry都有一個過期時間Expiry。而我們在使用spring內建第三方的緩存時,隻需要實作Cache和CacheManager這兩個接口就可以了,下面分别具體來看一下。

Cache

spring中的Cache接口規範了緩存元件的定義,包含了緩存的各種操作,實作具體緩存操作的管理。例如我們熟悉的RedisCache、EhCacheCache等,都實作了這個接口。

基于Spring接口,內建Caffeine+Redis兩級緩存

在Cache接口中,定義了get、put、evict、clear等方法,分别對應緩存的存入、取出、删除、清空操作。不過我們這裡不直接使用Cache接口,上面這張圖中的AbstractValueAdaptingCache是一個抽象類,它已經實作了Cache接口,是spring在Cache接口的基礎上幫助我們進行了一層封裝,是以我們直接繼承這個類就可以。

繼承AbstractValueAdaptingCache抽象類後,除了建立Cache的構造方法外,還需要實作下面的幾個方法:

// 在緩存中實際執行查找的操作,父類的get()方法會調用這個方法
protected abstract Object lookup(Object key);

// 通過key擷取緩存值,如果沒有找到,會調用valueLoader的call()方法
public <T> T get(Object key, Callable<T> valueLoader);

// 将資料放入緩存中
public void put(Object key, Object value);

// 删除緩存
public void evict(Object key);

// 清空緩存中所有資料
public void clear();

// 擷取緩存名稱,一般在CacheManager建立時指定
String getName();

// 擷取實際使用的緩存
Object getNativeCache();
           

因為要整合RedisTemplate和Caffeine的Cache,是以這些都需要在緩存的構造方法中傳入,除此之外構造方法中還需要再傳出緩存名稱cacheName,以及在配置檔案中實際配置的一些緩存參數。先看一下構造方法的實作:

public class DoubleCache extends AbstractValueAdaptingCache {
    private String cacheName;
    private RedisTemplate<Object, Object> redisTemplate;
    private Cache<Object, Object> caffeineCache;
    private DoubleCacheConfig doubleCacheConfig;

    protected DoubleCache(boolean allowNullValues) {
        super(allowNullValues);
    }

    public DoubleCache(String cacheName,RedisTemplate<Object, Object> redisTemplate,
                       Cache<Object, Object> caffeineCache,
                       DoubleCacheConfig doubleCacheConfig){
        super(doubleCacheConfig.getAllowNull());
        this.cacheName=cacheName;
        this.redisTemplate=redisTemplate;
        this.caffeineCache=caffeineCache;
        this.doubleCacheConfig=doubleCacheConfig;
    }
    //...
}
           

抽象父類的構造方法中隻有一個boolean類型的參數allowNullValues,表示是否允許緩存對象為null。除此之外,AbstractValueAdaptingCache中還定義了兩個包裝方法來配合這個參數進行使用,分别是toStoreValue和fromStoreValue,特殊用途是用于在緩存null對象時進行包裝、以及在擷取時進行解析并傳回。

我們之後會在CacheManager中調用後面這個自己實作的構造方法,來執行個體化Cache對象,參數中DoubleCacheConfig是使用@ConfigurationProperties讀取的yml配置檔案封裝的資料對象,會在後面使用。

當一個方法添加了@Cacheable注解時,執行時會先調用父類AbstractValueAdaptingCache中的get(key)方法,它會再調用我們自己實作的lookup方法。在實際執行查找操作的lookup方法中,我們的邏輯仍然是先查找Caffeine、沒有找到時再查找Redis:

@Override
protected Object lookup(Object key) {
    // 先從caffeine中查找
    Object obj = caffeineCache.getIfPresent(key);
    if (Objects.nonNull(obj)){
        log.info("get data from caffeine");
        return obj;
    }

    //再從redis中查找
    String redisKey=this.name+":"+ key;
    obj = redisTemplate.opsForValue().get(redisKey);
    if (Objects.nonNull(obj)){
        log.info("get data from redis");
        caffeineCache.put(key,obj);
    }
    return obj;
}
           

如果lookup方法的傳回結果不為null,那麼就會直接傳回結果給調用方。如果傳回為null時,就會執行原方法,執行完成後調用put方法,将資料放入緩存中。接下來我們實作put方法:

@Override
public void put(Object key, Object value) {
    if(!isAllowNullValues() && Objects.isNull(value)){
        log.error("the value NULL will not be cached");
        return;
    }

    //使用 toStoreValue(value) 包裝,解決caffeine不能存null的問題
    caffeineCache.put(key,toStoreValue(value));

    // null對象隻存在caffeine中一份就夠了,不用存redis了
    if (Objects.isNull(value))
        return;

    String redisKey=this.cacheName +":"+ key;
    Optional<Long> expireOpt = Optional.ofNullable(doubleCacheConfig)
            .map(DoubleCacheConfig::getRedisExpire);
    if (expireOpt.isPresent()){
        redisTemplate.opsForValue().set(redisKey,toStoreValue(value),
                expireOpt.get(), TimeUnit.SECONDS);
    }else{
        redisTemplate.opsForValue().set(redisKey,toStoreValue(value));
    }
}
           

上面我們對于是否允許緩存空對象進行了判斷,能夠緩存空對象的好處之一就是可以避免緩存穿透。需要注意的是,Caffeine中是不能直接緩存null的,是以可以使用父類提供的toStoreValue()方法,将它包裝成一個NullValue類型。在取出對象時,如果是NullValue,也不用我們自己再去調用fromStoreValue()将這個包裝類型還原,父類的get方法中已經幫我們做好了。

另外,上面在put方法中緩存空對象時,隻在Caffeine緩存中一份即可,可以不用在Redis中再存一份。

緩存的删除方法evict()和清空方法clear()的實作就比較簡單了,直接删除一條或全部資料即可:

@Override
public void evict(Object key) {
    redisTemplate.delete(this.cacheName +":"+ key);
    caffeineCache.invalidate(key);
}

@Override
public void clear() {
    Set<Object> keys = redisTemplate.keys(this.cacheName.concat(":*"));
    for (Object key : keys) {
        redisTemplate.delete(String.valueOf(key));
    }
    caffeineCache.invalidateAll();
}
           

擷取緩存cacheName和實際緩存的方法實作:

@Override
public String getName() {
    return this.cacheName;
}
@Override
public Object getNativeCache() {
    return this;
}
           

最後,我們再來看一下帶有兩個參數的get方法,為什麼把這個方法放到最後來說呢,因為如果我們隻是使用注解來管理緩存的話,那麼這個方法不會被調用到,簡單看一下實作:

@Override
public <T> T get(Object key, Callable<T> valueLoader) {
    ReentrantLock lock=new ReentrantLock();
    try{
        lock.lock();//加鎖
        Object obj = lookup(key);
        if (Objects.nonNull(obj)){
            return (T)obj;
        }
        //沒有找到
        obj = valueLoader.call();
        put(key,obj);//放入緩存
        return (T)obj;
    }catch (Exception e){
        log.error(e.getMessage());
    }finally {
        lock.unlock();
    }
    return null;
}
           

方法的實作比較容易了解,還是先調用lookup方法尋找是否已經緩存了對象,如果沒有找到那麼就調用Callable中的call方法進行擷取,并在擷取完成後存入到緩存中去。至于這個方法如何使用,具體代碼我們放在後面使用這一塊再看。

需要注意的是,這個方法的接口注釋中強調了需要我們自己來保證方法同步,是以這裡使用了ReentrantLock進行了加鎖操作。到這裡,Cache的實作就完成了,下面我們接着看另一個重要的接口CacheManager。

CacheManager

從名字就可以看出,CacheManager是一個緩存管理器,它可以被用來管理一組Cache。在上一篇文章的v2版本中,我們使用的CaffeineCacheManager就實作了這個接口,除此之外還有RedisCacheManager、EhCacheCacheManager等也都是通過這個接口實作。

下面我們要自定義一個類實作CacheManager接口,管理上面實作的DoubleCache作為spring中的緩存使用。接口中需要實作的方法隻有下面兩個:

//根據cacheName擷取Cache執行個體,不存在時進行建立
Cache getCache(String name);

//傳回管理的所有cacheName
Collection<String> getCacheNames();
           

在自定義的緩存管理器中,我們要使用ConcurrentHashMap維護一組不同的Cache,再定義一個構造方法,在參數中傳入已經在spring中配置好的RedisTemplate,以及相關的緩存配置參數:

public class DoubleCacheManager implements CacheManager {
    Map<String, Cache> cacheMap = new ConcurrentHashMap<>();
    private RedisTemplate<Object, Object> redisTemplate;
    private DoubleCacheConfig dcConfig;

    public DoubleCacheManager(RedisTemplate<Object, Object> redisTemplate,
                              DoubleCacheConfig doubleCacheConfig) {
        this.redisTemplate = redisTemplate;
        this.dcConfig = doubleCacheConfig;
    }
    //...
}
           

然後實作getCache方法,邏輯很簡單,先根據name從Map中查找對應的Cache,如果找到則直接傳回,這個參數name就是上一篇文章中提到的cacheName,CacheManager根據它實作不同Cache的隔離。

如果沒有根據名稱找到緩存的話,那麼建立一個DoubleCache對象,并放入Map中。這裡使用的ConcurrentHashMap的putIfAbsent()方法放入,避免重複建立Cache以及造成Cache内資料的丢失。具體代碼如下:

@Override
public Cache getCache(String name) {
    Cache cache = cacheMap.get(name);
    if (Objects.nonNull(cache)) {
        return cache;
    }

    cache = new DoubleCache(name, redisTemplate, createCaffeineCache(), dcConfig);
    Cache oldCache = cacheMap.putIfAbsent(name, cache);
    return oldCache == null ? cache : oldCache;
}
           

在上面建立DoubleCache對象的過程中,需要先建立一個Caffeine的Cache對象作為參數傳入,這一過程主要是根據實際項目的配置檔案中的具體參數進行初始化,代碼如下:

private com.github.benmanes.caffeine.cache.Cache createCaffeineCache(){
    Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder();
    Optional<DoubleCacheConfig> dcConfigOpt = Optional.ofNullable(this.dcConfig);
    dcConfigOpt.map(DoubleCacheConfig::getInit)
            .ifPresent(init->caffeineBuilder.initialCapacity(init));
    dcConfigOpt.map(DoubleCacheConfig::getMax)
            .ifPresent(max->caffeineBuilder.maximumSize(max));
    dcConfigOpt.map(DoubleCacheConfig::getExpireAfterWrite)
            .ifPresent(eaw->caffeineBuilder.expireAfterWrite(eaw,TimeUnit.SECONDS));
    dcConfigOpt.map(DoubleCacheConfig::getExpireAfterAccess)
            .ifPresent(eaa->caffeineBuilder.expireAfterAccess(eaa,TimeUnit.SECONDS));
    dcConfigOpt.map(DoubleCacheConfig::getRefreshAfterWrite)
            .ifPresent(raw->caffeineBuilder.refreshAfterWrite(raw,TimeUnit.SECONDS));
    return caffeineBuilder.build();
}
           

getCacheNames方法很簡單,直接傳回Map的keySet就可以了,代碼如下:

@Override
public Collection<String> getCacheNames() {
    return cacheMap.keySet();
}
           

配置&使用

在application.yml檔案中配置緩存的參數,代碼中使用@ConfigurationProperties接收到DoubleCacheConfig類中:

doublecache:
  allowNull: true
  init: 128
  max: 1024
  expireAfterWrite: 30  #Caffeine過期時間
  redisExpire: 60      #Redis緩存過期時間
           

配置自定義的DoubleCacheManager作為預設的緩存管理器:

@Configuration
public class CacheConfig {
    @Autowired
    DoubleCacheConfig doubleCacheConfig;

    @Bean
    public DoubleCacheManager cacheManager(RedisTemplate<Object,Object> redisTemplate,
                                           DoubleCacheConfig doubleCacheConfig){
        return new DoubleCacheManager(redisTemplate,doubleCacheConfig);
    }
}
           

Service中的代碼還是老樣子,不需要在代碼中手動操作緩存,隻要直接在方法上使用@Cache相關注解即可:

@Service @Slf4j
@AllArgsConstructor
public class OrderServiceImpl implements OrderService {
    private final OrderMapper orderMapper;

    @Cacheable(value = "order",key = "#id")
    public Order getOrderById(Long id) {
        Order myOrder = orderMapper.selectOne(new LambdaQueryWrapper<Order>()
                .eq(Order::getId, id));
        return myOrder;
    }

    @CachePut(cacheNames = "order",key = "#order.id")
    public Order updateOrder(Order order) {
        orderMapper.updateById(order);
        return order;
    }

    @CacheEvict(cacheNames = "order",key = "#id")
    public void deleteOrder(Long id) {
        orderMapper.deleteById(id);
    }
    
    //沒有注解,使用get(key,callable)方法
    public Order getOrderById2(Long id) {
        DoubleCacheManager cacheManager = SpringContextUtil.getBean(DoubleCacheManager.class);
        Cache cache = cacheManager.getCache("order");
        Order order =(Order) cache.get(id, (Callable<Object>) () -> {
            log.info("get data from database");
            Order myOrder = orderMapper.selectOne(new LambdaQueryWrapper<Order>()
                    .eq(Order::getId, id));
            return myOrder;
        });
        return order;
    }    
}
           

注意最後這個沒有添加任何注解的方法,隻有以這種方式調用時才會執行我們在DoubleCache中自己實作的get(key,callable)方法。到這裡,基于JSR107規範和spring接口的兩級緩存改造就完成了,下面我們看一下遺漏的第二個問題。

分布式環境改造

前面我們說了,在分布式環境下,可能會存在各個主機上一級緩存不一緻的問題。當一台主機修改了本地緩存後,其他主機是沒有感覺的,仍然保持了之前的緩存,那麼這種情況下就可能取到髒資料。既然我們在項目中已經使用了Redis,那麼就可以使用它的釋出/訂閱功能來使各個節點的緩存進行同步。

定義消息體

在使用Redis發送消息前,需要先定義一個消息對象。其中的資料包括消息要作用于的Cache名稱、操作類型、資料以及發出消息的源主機辨別:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CacheMassage implements Serializable {
    private static final long serialVersionUID = -3574997636829868400L;

    private String cacheName;
    private CacheMsgType type;  //辨別更新或删除操作
    private Object key;   
    private Object value;
    private String msgSource;   //源主機辨別,用來避免重複操作
}
           

定義一個枚舉來辨別消息的類型,是要進行更新還是删除操作:

public enum CacheMsgType {
    UPDATE,
    DELETE;
}
           

消息體中的msgSource是添加的一個消息源主機的辨別,添加這個是為了避免收到目前主機發送的消息後,再進行重複操作,也就是說收到本機發出的消息直接丢掉什麼都不做就可以了。源主機辨別這裡使用的是主機ip加項目端口的方式,擷取方法如下:

public static String getMsgSource() throws UnknownHostException {
    String host = InetAddress.getLocalHost().getHostAddress();
    Environment env = SpringContextUtil.getBean(Environment.class);
    String port = env.getProperty("server.port");
    return host+":"+port;
}
           

這樣消息體的定義就完成了,之後隻要調用redisTemplate的convertAndSend方法就可以把這個對象釋出到指定的主題上了。

Redis消息配置

要使用Redis的消息監聽功能,需要配置兩項内容:

  • MessageListenerAdapter:消息監聽擴充卡,可以在其中指定自定義的監聽代理類,并且可以自定義使用哪個方法處理監聽邏輯
  • RedisMessageListenerContainer: 一個可以為消息監聽器提供異步行為的容器,并且提供消息轉換和分派等底層功能
@Configuration
public class MessageConfig {
    public static final String TOPIC="cache.msg";
    
    @Bean
    RedisMessageListenerContainer container(MessageListenerAdapter listenerAdapter,
                                            RedisConnectionFactory redisConnectionFactory){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(TOPIC));
        return container;
    }
    
    @Bean
    MessageListenerAdapter adapter(RedisMessageReceiver receiver){
        return new MessageListenerAdapter(receiver,"receive");
    }   
}
           

在上面的監聽擴充卡MessageListenerAdapter中,我們傳入了一個自定義的RedisMessageReceiver接收并處理消息,并指定使用它的receive方法來處理監聽到的消息,下面我們就來看看它如何接收消息并消費。

消息消費邏輯

定義一個類RedisMessageReceiver來接收并消費消息,需要在它的方法中實作以下功能:

  • 反序列化接收到的消息,轉換為前面定義的CacheMassage類型對象
  • 根據消息的主機辨別判斷這條消息是不是本機發出的,如果是那麼直接丢棄,隻有接收到其他主機發出的消息才進行處理
  • 使用cacheName得到具體使用的那一個DoubleCache執行個體
  • 根據消息的類型判斷要執行的是更新還是删除操作,調用對應的方法
@Slf4j @Component
@AllArgsConstructor
public class RedisMessageReceiver {
    private final RedisTemplate redisTemplate;
    private final DoubleCacheManager manager;

    //接收通知,進行處理
    public void receive(String message) throws UnknownHostException {
        CacheMassage msg = (CacheMassage) redisTemplate
                .getValueSerializer().deserialize(message.getBytes());
        log.info(msg.toString());

        //如果是本機發出的消息,那麼不進行處理
        if (msg.getMsgSource().equals(MessageSourceUtil.getMsgSource())){
            log.info("收到本機發出的消息,不做處理");
            return;
        }

        DoubleCache cache = (DoubleCache) manager.getCache(msg.getCacheName());
        if (msg.getType()== CacheMsgType.UPDATE) {
            cache.updateL1Cache(msg.getKey(),msg.getValue());
            log.info("更新本地緩存");
        }

        if (msg.getType()== CacheMsgType.DELETE) {
            log.info("删除本地緩存");
            cache.evictL1Cache(msg.getKey());
        }
    }
}
           

在上面的代碼中,調用了DoubleCache中更新一級緩存方法updateL1Cache、删除一級緩存方法evictL1Cache,我們會後面在DoubleCache中進行添加。

修改DoubleCache

在DoubleCache中先添加上面提到的兩個方法,由CacheManager擷取到具體緩存後調用,進行一級緩存的更新或删除操作:

// 更新一級緩存
public void updateL1Cache(Object key,Object value){
    caffeineCache.put(key,value);
}

// 删除一級緩存
public void evictL1Cache(Object key){
    caffeineCache.invalidate(key);
}
           

好了,完事具備隻欠東風,我們要在什麼場合發送消息呢?答案是在DoubleCache中存入緩存的put方法和移除緩存的evict方法中。首先修改put方法,方法中前面的邏輯不變,在最後添加發送消息通知其他節點更新一級緩存的邏輯:

public void put(Object key, Object value) {
 // 省略前面的不變代碼...

    //發送資訊通知其他節點更新一級緩存 
 CacheMassage cacheMassage
   = new CacheMassage(this.cacheName, CacheMsgType.UPDATE,
   key,value, MessageSourceUtil.getMsgSource());
 redisTemplate.convertAndSend(MessageConfig.TOPIC,cacheMassage);
}
           

然後修改evict方法,同樣保持前面的邏輯不變,在最後添加發送消息的代碼:

public void evict(Object key) {
 // 省略前面的不變代碼...

    //發送資訊通知其他節點删除一級緩存   
    CacheMassage cacheMassage
            = new CacheMassage(this.cacheName, CacheMsgType.DELETE,
            key,null, MessageSourceUtil.getMsgSource());
    redisTemplate.convertAndSend(MessageConfig.TOPIC,cacheMassage);
}
           

适配分布式環境的改造工作到此結束,下面進行一下簡單的測試工作。

測試

我們可以用idea的Allow parallel run功能同時啟動兩個一樣的springboot項目,來模拟分布式環境下的兩台主機,注意在啟動參數中添加-Dserver.port參數來啟動到不同端口。

首先測試更新操作,使用接口修改某一個主機的本地緩存,可以看到發出消息的主機在收到消息後,直接丢棄不做任何處理:

基于Spring接口,內建Caffeine+Redis兩級緩存

檢視另一台主機的日志,收到消息并更新了本地緩存:

基于Spring接口,內建Caffeine+Redis兩級緩存

再看一下緩存的删除情況,同樣本地删除後再收到消息不做處理:

基于Spring接口,內建Caffeine+Redis兩級緩存

看另一台主機收到消息後,會删除本地的一級緩存:

基于Spring接口,內建Caffeine+Redis兩級緩存

可以看到,分布式環境下本地緩存通過Redis消息的釋出訂閱機制保證了一級緩存的一緻性。

另外,如果更加嚴謹一些的話,其實還應該處理一下緩存更新失敗的情況,這裡留個坑以後再填。簡單說一下思路,我們應該在代碼中捕獲緩存更新失敗的異常,然後删除二級緩存、本機以及其他主機的一級緩存,再等待下一次通路時直接拉取最新的資料進行緩存。同樣,要想實作緩存失效同時作用于所有單機節點的本地緩存這一功能,也可以使用上面的釋出訂閱來實作。

總結

好了,這次縫縫補補的填坑之旅到這裡就要結束了。可以看到使用基于JSR107規範的spring接口進行修改後,代碼看起來舒服了很多,并且支援直接使用spring的@Cache相關注解。如果想在項目中使用的話,自己封裝一個簡單的starter就可以了,使用起來也非常簡單。

那麼,這次的分享就到這裡,我是Hydra,下篇文章再見。

本文及上一篇文章的示例代碼已合并上傳到了Hydra的Github上,公衆号【碼農參上】背景回複緩存擷取連結,本文代碼在項目的v4 module中,歡迎小夥伴們來給個star啊~
作者簡介,碼農參上,一個熱愛分享的公衆号,有趣、深入、直接,與你聊聊技術。個人微信DrHydra9,歡迎添加好友,進一步交流。