多級緩存
《Redis三 進階篇-2. 多級緩存》
提示: 本材料隻做個人學習參考,不作為系統的學習流程,請注意識别!!!
《Redis三 進階篇-2. 多級緩存》
- 多級緩存
- 《Redis三 進階篇-2. 多級緩存》
- 1. 什麼是多級緩存
- 2. JVM程序緩存
-
- 2.1 導入案例
- 2.2 初識Caffeine
- 2.3 實作JVM程序緩存
-
- 2.3.1 需求
- 2.3.2 實作
- 3. Lua文法入門
-
- 3.0 初識Lua
- 3.1 HelloWorld
- 3.2 變量和循環
-
- 3.2.1 Lua的資料類型
- 3.2.2 聲明變量
- 3.2.3 循環
- 3.3 條件控制、函數
-
- 3.3.1 函數
- 3.3.2 條件控制
- 3.3.3 案例
- 4. 實作多級緩存
-
- 4.1 安裝OpenResty
- 4.2 OpenResty快速入門
-
- 4.2.1 反向代理流程
- 4.2.2 OpenResty監聽請求
- 4.2.3 編寫item.lua
- 4.3 請求參數處理
-
- 4.3.1 擷取參數的API
- 4.3.2 擷取參數并傳回
- 4.4 查詢Tomcat
-
- 4.4.1 發送http請求的API
- 4.4.2 封裝http工具
- 4.4.3 CJSON工具類
- 4.4.4 實作Tomcat查詢
- 4.4.5 基于ID負載均衡
-
- 1)原理
- 2)實作
- 3)測試
- 4.5 Redis緩存預熱
- 4.6 查詢Redis緩存
-
- 4.6.1 封裝Redis工具
- 4.6.2 實作Redis查詢
- 4.7 Nginx本地緩存
-
- 4.7.1 本地緩存API
- 4.7.2 實作本地緩存查詢
- 5. 緩存同步
-
- 5.1 資料同步政策
- 5.2 安裝Canal
-
- 5.2.1 認識Canal
- 5.2.2 安裝Canal
- 5.3 監聽Canal
-
- 5.3.1 引入依賴:
- 5.3.2 編寫配置:
- 5.3.3 修改Item實體類
- 5.3.4 編寫監聽器
1. 什麼是多級緩存
傳統的緩存政策一般是請求到達Tomcat後,先查詢Redis,如果未命中則查詢資料庫,如圖:

存在下面的問題:
- 請求要經過Tomcat處理,Tomcat的性能成為整個系統的瓶頸
- Redis緩存失效時,會對資料庫産生沖擊
多級緩存就是充分利用請求處理的每個環節,分别添加緩存,減輕Tomcat壓力,提升服務性能:
- 浏覽器通路靜态資源時,優先讀取浏覽器本地緩存
- 通路非靜态資源(ajax查詢資料)時,通路服務端
- 請求到達Nginx後,優先讀取Nginx本地緩存
- 如果Nginx本地緩存未命中,則去直接查詢Redis(不經過Tomcat)
- 如果Redis查詢未命中,則查詢Tomcat
- 請求進入Tomcat後,優先查詢JVM程序緩存
- 如果JVM程序緩存未命中,則查詢資料庫
在多級緩存架構中,Nginx内部需要編寫本地緩存查詢、Redis查詢、Tomcat查詢的業務邏輯,是以這樣的nginx服務不再是一個反向代理伺服器,而是一個編寫業務的Web伺服器了。
是以這樣的業務Nginx服務也需要搭建叢集來提高并發,再有專門的nginx服務來做反向代理,如圖:
另外,我們的Tomcat服務将來也會部署為叢集模式:
可見,多級緩存的關鍵有兩個:
- 一個是在nginx中編寫業務,實作nginx本地緩存、Redis、Tomcat的查詢
- 另一個就是在Tomcat中實作JVM程序緩存
其中Nginx程式設計則會用到OpenResty架構結合Lua這樣的語言。
這也是今天課程的難點和重點。
2. JVM程序緩存
為了示範多級緩存的案例,我們先準備一個商品查詢的業務。
2.1 導入案例
參考課前資料的:《案例導入說明.md》
2.2 初識Caffeine
具體Caffeine相關教程,可參考本人另外幾篇部落格。
https://blog.csdn.net/weixin_43695916/article/details/127850545
緩存在日常開發中啟動至關重要的作用,由于是存儲在記憶體中,資料的讀取速度是非常快的,能大量減少對資料庫的通路,減少資料庫的壓力。我們把緩存分為兩類:
- 分布式緩存,例如Redis:
- 優點:存儲容量更大、可靠性更好、可以在叢集間共享
- 缺點:通路緩存有網絡開銷
- 場景:緩存資料量較大、可靠性要求較高、需要在叢集間共享
- 程序本地緩存,例如HashMap、GuavaCache:
- 優點:讀取本地記憶體,沒有網絡開銷,速度更快
- 缺點:存儲容量有限、可靠性較低、無法共享
- 場景:性能要求較高,緩存資料量較小
我們今天會利用Caffeine架構來實作JVM程序緩存。
Caffeine是一個基于Java8開發的,提供了近乎最佳命中率的高性能的本地緩存庫。目前Spring内部的緩存使用的就是Caffeine。GitHub位址:https://github.com/ben-manes/caffeine
Caffeine的性能非常好,下圖是官方給出的性能對比:
可以看到Caffeine的性能遙遙領先!
緩存使用的基本API:
@Test
void testBasicOps() {
// 建構cache對象
Cache<String, String> cache = Caffeine.newBuilder().build();
// 存資料
cache.put("gf", "迪麗熱巴");
// 取資料
String gf = cache.getIfPresent("gf");
System.out.println("gf = " + gf);
// 取資料,包含兩個參數:
// 參數一:緩存的key
// 參數二:Lambda表達式,表達式參數就是緩存的key,方法體是查詢資料庫的邏輯
// 優先根據key查詢JVM緩存,如果未命中,則執行參數二的Lambda表達式
String defaultGF = cache.get("defaultGF", key -> {
// 根據key去資料庫查詢資料
return "柳岩";
});
System.out.println("defaultGF = " + defaultGF);
}
Caffeine既然是緩存的一種,肯定需要有緩存的清除政策,不然的話記憶體總會有耗盡的時候。
Caffeine提供了三種緩存驅逐政策:
- 基于容量:設定緩存的數量上限
// 建立緩存對象 Cache<String, String> cache = Caffeine.newBuilder() .maximumSize(1) // 設定緩存大小上限為 1 .build();
- 基于時間:設定緩存的有效時間
// 建立緩存對象 Cache<String, String> cache = Caffeine.newBuilder() // 設定緩存有效期為 10 秒,從最後一次寫入開始計時 .expireAfterWrite(Duration.ofSeconds(10)) .build();
- 基于引用:設定緩存為軟引用或弱引用,利用GC來回收緩存資料。性能較差,不建議使用。
注意:在預設情況下,當一個緩存元素過期的時候,Caffeine不會自動立即将其清理和驅逐。而是在一次讀或寫操作後,或者在空閑時間完成對失效資料的驅逐。
2.3 實作JVM程序緩存
2.3.1 需求
利用Caffeine實作下列需求:
- 給根據id查詢商品的業務添加緩存,緩存未命中時查詢資料庫
- 給根據id查詢商品庫存的業務添加緩存,緩存未命中時查詢資料庫
- 緩存初始大小為100
- 緩存上限為10000
2.3.2 實作
首先,我們需要定義兩個Caffeine的緩存對象,分别儲存商品、庫存的緩存資料。
在item-service的
com.heima.item.config
包下定義
CaffeineConfig
類:
package com.heima.item.config;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CaffeineConfig {
@Bean
public Cache<Long, Item> itemCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}
@Bean
public Cache<Long, ItemStock> stockCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}
}
然後,修改item-service中的
com.heima.item.web
包下的ItemController類,添加緩存邏輯:
@RestController
@RequestMapping("item")
public class ItemController {
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
@Autowired
private Cache<Long, Item> itemCache;
@Autowired
private Cache<Long, ItemStock> stockCache;
// ...其它略
@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id) {
return itemCache.get(id, key -> itemService.query()
.ne("status", 3).eq("id", key)
.one()
);
}
@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id) {
return stockCache.get(id, key -> stockService.getById(key));
}
}
3. Lua文法入門
Nginx程式設計需要用到Lua語言,是以我們必須先入門Lua的基本文法。
3.0 初識Lua
Lua 是一種輕量小巧的腳本語言,用标準C語言編寫并以源代碼形式開放, 其設計目的是為了嵌入應用程式中,進而為應用程式提供靈活的擴充和定制功能。官網:https://www.lua.org/
Lua經常嵌入到C語言開發的程式中,例如遊戲開發、遊戲插件等。
Nginx本身也是C語言開發,是以也允許基于Lua做拓展。
3.1 HelloWorld
CentOS7預設已經安裝了Lua語言環境,是以可以直接運作Lua代碼。
1)在Linux虛拟機的任意目錄下,建立一個hello.lua檔案
2)添加下面的内容
3)運作
3.2 變量和循環
學習任何語言必然離不開變量,而變量的聲明必須先知道資料的類型。
3.2.1 Lua的資料類型
Lua中支援的常見資料類型包括:
另外,Lua提供了type()函數來判斷一個變量的資料類型:
3.2.2 聲明變量
Lua聲明變量的時候無需指定資料類型,而是用local來聲明變量為局部變量:
-- 聲明字元串,可以用單引号或雙引号,
local str = 'hello'
-- 字元串拼接可以使用 ..
local str2 = 'hello' .. 'world'
-- 聲明數字
local num = 21
-- 聲明布爾類型
local flag = true
Lua中的table類型既可以作為數組,又可以作為Java中的map來使用。數組就是特殊的table,key是數組角标而已:
-- 聲明數組 ,key為角标的 table
local arr = {'java', 'python', 'lua'}
-- 聲明table,類似java的map
local map = {name='Jack', age=21}
Lua中的數組角标是從1開始,通路的時候與Java中類似:
-- 通路數組,lua數組的角标從1開始
print(arr[1])
Lua中的table可以用key來通路:
-- 通路table
print(map['name'])
print(map.name)
3.2.3 循環
對于table,我們可以利用for循環來周遊。不過數組和普通table周遊略有差異。
周遊數組:
-- 聲明數組 key為索引的 table
local arr = {'java', 'python', 'lua'}
-- 周遊數組
for index,value in ipairs(arr) do
print(index, value)
end
周遊普通table
-- 聲明map,也就是table
local map = {name='Jack', age=21}
-- 周遊table
for key,value in pairs(map) do
print(key, value)
end
3.3 條件控制、函數
Lua中的條件控制和函數聲明與Java類似。
3.3.1 函數
定義函數的文法:
function 函數名( argument1, argument2..., argumentn)
-- 函數體
return 傳回值
end
例如,定義一個函數,用來列印數組:
function printArr(arr)
for index, value in ipairs(arr) do
print(value)
end
end
3.3.2 條件控制
類似Java的條件控制,例如if、else文法:
if(布爾表達式)
then
--[ 布爾表達式為 true 時執行該語句塊 --]
else
--[ 布爾表達式為 false 時執行該語句塊 --]
end
與java不同,布爾表達式中的邏輯運算是基于英文單詞:
3.3.3 案例
需求:自定義一個函數,可以列印table,當參數為nil時,列印錯誤資訊
function printArr(arr)
if not arr then
print('數組不能為空!')
end
for index, value in ipairs(arr) do
print(value)
end
end
4. 實作多級緩存
多級緩存的實作離不開Nginx程式設計,而Nginx程式設計又離不開OpenResty。
4.1 安裝OpenResty
OpenResty® 是一個基于 Nginx的高性能 Web 平台,用于友善地搭建能夠處理超高并發、擴充性極高的動态 Web 應用、Web 服務和動态網關。具備下列特點:
- 具備Nginx的完整功能
- 基于Lua語言進行擴充,內建了大量精良的 Lua 庫、第三方子產品
- 允許使用Lua自定義業務邏輯、自定義庫
官方網站: https://openresty.org/cn/
安裝Lua可以參考課前資料提供的《安裝OpenResty.md》:
4.2 OpenResty快速入門
我們希望達到的多級緩存架構如圖:
其中:
- windows上的nginx用來做反向代理服務,将前端的查詢商品的ajax請求代理到OpenResty叢集
- OpenResty叢集用來編寫多級緩存業務
4.2.1 反向代理流程
現在,商品詳情頁使用的是假的商品資料。不過在浏覽器中,可以看到頁面有發起ajax請求查詢真實商品資料。
這個請求如下:
請求位址是localhost,端口是80,就被windows上安裝的Nginx服務給接收到了。然後代理給了OpenResty叢集:
我們需要在OpenResty中編寫業務,查詢商品資料并傳回到浏覽器。
但是這次,我們先在OpenResty接收請求,傳回假的商品資料。
4.2.2 OpenResty監聽請求
OpenResty的很多功能都依賴于其目錄下的Lua庫,需要在nginx.conf中指定依賴庫的目錄,并導入依賴:
1)添加對OpenResty的Lua子產品的加載
修改
/usr/local/openresty/nginx/conf/nginx.conf
檔案,在其中的http下面,添加下面代碼:
#lua 子產品
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c子產品
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
2)監聽/api/item路徑
修改
/usr/local/openresty/nginx/conf/nginx.conf
檔案,在nginx.conf的server下面,添加對/api/item這個路徑的監聽:
location /api/item {
# 預設的響應類型
default_type application/json;
# 響應結果由lua/item.lua檔案來決定
content_by_lua_file lua/item.lua;
}
這個監聽,就類似于SpringMVC中的
@GetMapping("/api/item")
做路徑映射。
而
content_by_lua_file lua/item.lua
則相當于調用item.lua這個檔案,執行其中的業務,把結果傳回給使用者。相當于java中調用service。
4.2.3 編寫item.lua
1)在
/usr/loca/openresty/nginx
目錄建立檔案夾:lua
2)在
/usr/loca/openresty/nginx/lua
檔案夾下,建立檔案:item.lua
3)編寫item.lua,傳回假資料
item.lua中,利用ngx.say()函數傳回資料到Response中
4)重新加載配置
nginx -s reload
重新整理商品頁面:http://localhost/item.html?id=1001,即可看到效果:
4.3 請求參數處理
上一節中,我們在OpenResty接收前端請求,但是傳回的是假資料。
要傳回真實資料,必須根據前端傳遞來的商品id,查詢商品資訊才可以。
那麼如何擷取前端傳遞的商品參數呢?
4.3.1 擷取參數的API
OpenResty中提供了一些API用來擷取不同類型的前端請求參數:
4.3.2 擷取參數并傳回
在前端發起的ajax請求如圖:
可以看到商品id是以路徑占位符方式傳遞的,是以可以利用正規表達式比對的方式來擷取ID
1)擷取商品id
修改
/usr/loca/openresty/nginx/nginx.conf
檔案中監聽/api/item的代碼,利用正規表達式擷取ID:
location ~ /api/item/(\d+) {
# 預設的響應類型
default_type application/json;
# 響應結果由lua/item.lua檔案來決定
content_by_lua_file lua/item.lua;
}
2)拼接ID并傳回
修改
/usr/loca/openresty/nginx/lua/item.lua
檔案,擷取id并拼接到結果中傳回:
-- 擷取商品id
local id = ngx.var[1]
-- 拼接并傳回
ngx.say('{"id":' .. id .. ',"name":"SALSA AIR","title":"RIMOWA 21寸托運箱拉杆箱 SALSA AIR系列果綠色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')
3)重新加載并測試
運作指令以重新加載OpenResty配置:
nginx -s reload
重新整理頁面可以看到結果中已經帶上了ID:
4.4 查詢Tomcat
拿到商品ID後,本應去緩存中查詢商品資訊,不過目前我們還未建立nginx、redis緩存。是以,這裡我們先根據商品id去tomcat查詢商品資訊。我們實作如圖部分:
需要注意的是,我們的OpenResty是在虛拟機,Tomcat是在Windows電腦上。兩者IP一定不要搞錯了。
4.4.1 發送http請求的API
nginx提供了内部API用以發送http請求:
local resp = ngx.location.capture("/path",{
method = ngx.HTTP_GET, -- 請求方式
args = {a=1,b=2}, -- get方式傳參數
})
傳回的響應内容包括:
- resp.status:響應狀态碼
- resp.header:響應頭,是一個table
- resp.body:響應體,就是響應資料
注意:這裡的path是路徑,并不包含IP和端口。這個請求會被nginx内部的server監聽并處理。
但是我們希望這個請求發送到Tomcat伺服器,是以還需要編寫一個server來對這個路徑做反向代理:
location /path {
# 這裡是windows電腦的ip和Java服務端口,需要確定windows防火牆處于關閉狀态
proxy_pass http://192.168.150.1:8081;
}
原理如圖:
4.4.2 封裝http工具
下面,我們封裝一個發送Http請求的工具,基于ngx.location.capture來實作查詢tomcat。
1)添加反向代理,到windows的Java服務
因為item-service中的接口都是/item開頭,是以我們監聽/item路徑,代理到windows上的tomcat服務。
修改
/usr/local/openresty/nginx/conf/nginx.conf
檔案,添加一個location:
location /item {
proxy_pass http://192.168.150.1:8081;
}
以後,隻要我們調用
ngx.location.capture("/item")
,就一定能發送請求到windows的tomcat服務。
2)封裝工具類
之前我們說過,OpenResty啟動時會加載以下兩個目錄中的工具檔案:
是以,自定義的http工具也需要放到這個目錄下。
在
/usr/local/openresty/lualib
目錄下,建立一個common.lua檔案:
vi /usr/local/openresty/lualib/common.lua
内容如下:
-- 封裝函數,發送http請求,并解析響應
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 記錄錯誤資訊,傳回404
ngx.log(ngx.ERR, "http請求查詢失敗, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法導出
local _M = {
read_http = read_http
}
return _M
這個工具将read_http函數封裝到_M這個table類型的變量中,并且傳回,這類似于導出。
使用的時候,可以利用
require('common')
來導入該函數庫,這裡的common是函數庫的檔案名。
3)實作商品查詢
最後,我們修改
/usr/local/openresty/lua/item.lua
檔案,利用剛剛封裝的函數庫實作對tomcat的查詢:
-- 引入自定義common工具子產品,傳回值是common中傳回的 _M
local common = require("common")
-- 從 common中擷取read_http這個函數
local read_http = common.read_http
-- 擷取路徑參數
local id = ngx.var[1]
-- 根據id查詢商品
local itemJSON = read_http("/item/".. id, nil)
-- 根據id查詢商品庫存
local itemStockJSON = read_http("/item/stock/".. id, nil)
這裡查詢到的結果是json字元串,并且包含商品、庫存兩個json字元串,頁面最終需要的是把兩個json拼接為一個json:
這就需要我們先把JSON變為lua的table,完成資料整合後,再轉為JSON。
4.4.3 CJSON工具類
OpenResty提供了一個cjson的子產品用來處理JSON的序列化和反序列化。
官方位址: https://github.com/openresty/lua-cjson/
1)引入cjson子產品:
2)序列化:
local obj = {
name = 'jack',
age = 21
}
-- 把 table 序列化為 json
local json = cjson.encode(obj)
3)反序列化:
local json = '{"name": "jack", "age": 21}'
-- 反序列化 json為 table
local obj = cjson.decode(json);
print(obj.name)
4.4.4 實作Tomcat查詢
下面,我們修改之前的item.lua中的業務,添加json處理功能:
-- 導入common函數庫
local common = require('common')
local read_http = common.read_http
-- 導入cjson庫
local cjson = require('cjson')
-- 擷取路徑參數
local id = ngx.var[1]
-- 根據id查詢商品
local itemJSON = read_http("/item/".. id, nil)
-- 根據id查詢商品庫存
local itemStockJSON = read_http("/item/stock/".. id, nil)
-- JSON轉化為lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 組合資料
item.stock = stock.stock
item.sold = stock.sold
-- 把item序列化為json 傳回結果
ngx.say(cjson.encode(item))
4.4.5 基于ID負載均衡
剛才的代碼中,我們的tomcat是單機部署。而實際開發中,tomcat一定是叢集模式:
是以,OpenResty需要對tomcat叢集做負載均衡。
而預設的負載均衡規則是輪詢模式,當我們查詢/item/10001時:
- 第一次會通路8081端口的tomcat服務,在該服務内部就形成了JVM程序緩存
- 第二次會通路8082端口的tomcat服務,該服務内部沒有JVM緩存(因為JVM緩存無法共享),會查詢資料庫
- …
你看,因為輪詢的原因,第一次查詢8081形成的JVM緩存并未生效,直到下一次再次通路到8081時才可以生效,緩存命中率太低了。
怎麼辦?
如果能讓同一個商品,每次查詢時都通路同一個tomcat服務,那麼JVM緩存就一定能生效了。
也就是說,我們需要根據商品id做負載均衡,而不是輪詢。
1)原理
nginx提供了基于請求路徑做負載均衡的算法:
nginx根據請求路徑做hash運算,把得到的數值對tomcat服務的數量取餘,餘數是幾,就通路第幾個服務,實作負載均衡。
例如:
- 我們的請求路徑是 /item/10001
- tomcat總數為2台(8081、8082)
- 對請求路徑/item/1001做hash運算求餘的結果為1
- 則通路第一個tomcat服務,也就是8081
隻要id不變,每次hash運算結果也不會變,那就可以保證同一個商品,一直通路同一個tomcat服務,確定JVM緩存生效。
2)實作
修改
/usr/local/openresty/nginx/conf/nginx.conf
檔案,實作基于ID做負載均衡。
首先,定義tomcat叢集,并設定基于路徑做負載均衡:
upstream tomcat-cluster {
hash $request_uri;
server 192.168.150.1:8081;
server 192.168.150.1:8082;
}
然後,修改對tomcat服務的反向代理,目标指向tomcat叢集:
location /item {
proxy_pass http://tomcat-cluster;
}
重新加載OpenResty
nginx -s reload
3)測試
啟動兩台tomcat服務:
同時啟動:
清空日志後,再次通路頁面,可以看到不同id的商品,通路到了不同的tomcat服務:
4.5 Redis緩存預熱
Redis緩存會面臨冷啟動問題:
冷啟動:服務剛剛啟動時,Redis中并沒有緩存,如果所有商品資料都在第一次查詢時添加緩存,可能會給資料庫帶來較大壓力。
緩存預熱:在實際開發中,我們可以利用大資料統計使用者通路的熱點資料,在項目啟動時将這些熱點資料提前查詢并儲存到Redis中。
我們資料量較少,并且沒有資料統計相關功能,目前可以在啟動時将所有資料都放入緩存中。
1)利用Docker安裝Redis
2)在item-service服務中引入Redis依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
3)配置Redis位址
spring:
redis:
host: 192.168.150.101
4)編寫初始化類
緩存預熱需要在項目啟動時完成,并且必須是拿到RedisTemplate之後。
這裡我們利用InitializingBean接口來實作,因為InitializingBean可以在對象被Spring建立并且成員變量全部注入後執行。
package com.heima.item.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import com.heima.item.service.IItemService;
import com.heima.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
// 初始化緩存
// 1.查詢商品資訊
List<Item> itemList = itemService.list();
// 2.放入緩存
for (Item item : itemList) {
// 2.1.item序列化為JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}
// 3.查詢商品庫存資訊
List<ItemStock> stockList = stockService.list();
// 4.放入緩存
for (ItemStock stock : stockList) {
// 2.1.item序列化為JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
}
4.6 查詢Redis緩存
現在,Redis緩存已經準備就緒,我們可以再OpenResty中實作查詢Redis的邏輯了。如下圖紅框所示:
當請求進入OpenResty之後:
- 優先查詢Redis緩存
- 如果Redis緩存未命中,再查詢Tomcat
4.6.1 封裝Redis工具
OpenResty提供了操作Redis的子產品,我們隻要引入該子產品就能直接使用。但是為了友善,我們将Redis操作封裝到之前的common.lua工具庫中。
修改
/usr/local/openresty/lualib/common.lua
檔案:
1)引入Redis子產品,并初始化Redis對象
-- 導入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)
2)封裝函數,用來釋放Redis連接配接,其實是放入連接配接池
-- 關閉redis連接配接的工具方法,其實是放入連接配接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 連接配接的空閑時間,機關是毫秒
local pool_size = 100 --連接配接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis連接配接池失敗: ", err)
end
end
3)封裝函數,根據key查詢Redis資料
-- 查詢redis的方法 ip和port是redis位址,key是查詢的key
local function read_redis(ip, port, key)
-- 擷取一個連接配接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "連接配接redis失敗 : ", err)
return nil
end
-- 查詢redis
local resp, err = red:get(key)
-- 查詢失敗處理
if not resp then
ngx.log(ngx.ERR, "查詢Redis失敗: ", err, ", key = " , key)
end
--得到的資料為空處理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查詢Redis資料為空, key = ", key)
end
close_redis(red)
return resp
end
4)導出
-- 将方法導出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M
完整的common.lua:
-- 導入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)
-- 關閉redis連接配接的工具方法,其實是放入連接配接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 連接配接的空閑時間,機關是毫秒
local pool_size = 100 --連接配接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis連接配接池失敗: ", err)
end
end
-- 查詢redis的方法 ip和port是redis位址,key是查詢的key
local function read_redis(ip, port, key)
-- 擷取一個連接配接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "連接配接redis失敗 : ", err)
return nil
end
-- 查詢redis
local resp, err = red:get(key)
-- 查詢失敗處理
if not resp then
ngx.log(ngx.ERR, "查詢Redis失敗: ", err, ", key = " , key)
end
--得到的資料為空處理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查詢Redis資料為空, key = ", key)
end
close_redis(red)
return resp
end
-- 封裝函數,發送http請求,并解析響應
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 記錄錯誤資訊,傳回404
ngx.log(ngx.ERR, "http查詢失敗, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法導出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M
4.6.2 實作Redis查詢
接下來,我們就可以去修改item.lua檔案,實作對Redis的查詢了。
查詢邏輯是:
- 根據id查詢Redis
- 如果查詢失敗則繼續查詢Tomcat
- 将查詢結果傳回
1)修改
/usr/local/openresty/lua/item.lua
檔案,添加一個查詢函數:
-- 導入common函數庫
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 封裝查詢函數
function read_data(key, path, params)
-- 查詢本地緩存
local val = read_redis("127.0.0.1", 6379, key)
-- 判斷查詢結果
if not val then
ngx.log(ngx.ERR, "redis查詢失敗,嘗試查詢http, key: ", key)
-- redis查詢失敗,去查詢http
val = read_http(path, params)
end
-- 傳回資料
return val
end
2)而後修改商品查詢、庫存查詢的業務:
3)完整的item.lua代碼:
-- 導入common函數庫
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 導入cjson庫
local cjson = require('cjson')
-- 封裝查詢函數
function read_data(key, path, params)
-- 查詢本地緩存
local val = read_redis("127.0.0.1", 6379, key)
-- 判斷查詢結果
if not val then
ngx.log(ngx.ERR, "redis查詢失敗,嘗試查詢http, key: ", key)
-- redis查詢失敗,去查詢http
val = read_http(path, params)
end
-- 傳回資料
return val
end
-- 擷取路徑參數
local id = ngx.var[1]
-- 查詢商品資訊
local itemJSON = read_data("item:id:" .. id, "/item/" .. id, nil)
-- 查詢庫存資訊
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)
-- JSON轉化為lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 組合資料
item.stock = stock.stock
item.sold = stock.sold
-- 把item序列化為json 傳回結果
ngx.say(cjson.encode(item))
4.7 Nginx本地緩存
現在,整個多級緩存中隻差最後一環,也就是nginx的本地緩存了。如圖:
4.7.1 本地緩存API
OpenResty為Nginx提供了shard dict的功能,可以在nginx的多個worker之間共享資料,實作緩存功能。
1)開啟共享字典,在nginx.conf的http下添加配置:
# 共享字典,也就是本地緩存,名稱叫做:item_cache,大小150m
lua_shared_dict item_cache 150m;
2)操作共享字典:
-- 擷取本地緩存對象
local item_cache = ngx.shared.item_cache
-- 存儲, 指定key、value、過期時間,機關s,預設為0代表永不過期
item_cache:set('key', 'value', 1000)
-- 讀取
local val = item_cache:get('key')
4.7.2 實作本地緩存查詢
1)修改
/usr/local/openresty/lua/item.lua
檔案,修改read_data查詢函數,添加本地緩存邏輯:
-- 導入共享詞典,本地緩存
local item_cache = ngx.shared.item_cache
-- 封裝查詢函數
function read_data(key, expire, path, params)
-- 查詢本地緩存
local val = item_cache:get(key)
if not val then
ngx.log(ngx.ERR, "本地緩存查詢失敗,嘗試查詢Redis, key: ", key)
-- 查詢redis
val = read_redis("127.0.0.1", 6379, key)
-- 判斷查詢結果
if not val then
ngx.log(ngx.ERR, "redis查詢失敗,嘗試查詢http, key: ", key)
-- redis查詢失敗,去查詢http
val = read_http(path, params)
end
end
-- 查詢成功,把資料寫入本地緩存
item_cache:set(key, val, expire)
-- 傳回資料
return val
end
2)修改item.lua中查詢商品和庫存的業務,實作最新的read_data函數:
其實就是多了緩存時間參數,過期後nginx緩存會自動删除,下次通路即可更新緩存。
這裡給商品基本資訊設定逾時時間為30分鐘,庫存為1分鐘。
因為庫存更新頻率較高,如果緩存時間過長,可能與資料庫差異較大。
3)完整的item.lua檔案:
-- 導入common函數庫
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 導入cjson庫
local cjson = require('cjson')
-- 導入共享詞典,本地緩存
local item_cache = ngx.shared.item_cache
-- 封裝查詢函數
function read_data(key, expire, path, params)
-- 查詢本地緩存
local val = item_cache:get(key)
if not val then
ngx.log(ngx.ERR, "本地緩存查詢失敗,嘗試查詢Redis, key: ", key)
-- 查詢redis
val = read_redis("127.0.0.1", 6379, key)
-- 判斷查詢結果
if not val then
ngx.log(ngx.ERR, "redis查詢失敗,嘗試查詢http, key: ", key)
-- redis查詢失敗,去查詢http
val = read_http(path, params)
end
end
-- 查詢成功,把資料寫入本地緩存
item_cache:set(key, val, expire)
-- 傳回資料
return val
end
-- 擷取路徑參數
local id = ngx.var[1]
-- 查詢商品資訊
local itemJSON = read_data("item:id:" .. id, 1800, "/item/" .. id, nil)
-- 查詢庫存資訊
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)
-- JSON轉化為lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 組合資料
item.stock = stock.stock
item.sold = stock.sold
-- 把item序列化為json 傳回結果
ngx.say(cjson.encode(item))
5. 緩存同步
大多數情況下,浏覽器查詢到的都是緩存資料,如果緩存資料與資料庫資料存在較大差異,可能會産生比較嚴重的後果。
是以我們必須保證資料庫資料、緩存資料的一緻性,這就是緩存與資料庫的同步。
5.1 資料同步政策
緩存資料同步的常見方式有三種:
設定有效期:給緩存設定有效期,到期後自動删除。再次查詢時更新
- 優勢:簡單、友善
- 缺點:時效性差,緩存過期之前可能不一緻
- 場景:更新頻率較低,時效性要求低的業務
同步雙寫:在修改資料庫的同時,直接修改緩存
- 優勢:時效性強,緩存與資料庫強一緻
- 缺點:有代碼侵入,耦合度高;
- 場景:對一緻性、時效性要求較高的緩存資料
**異步通知:**修改資料庫時發送事件通知,相關服務監聽到通知後修改緩存資料
- 優勢:低耦合,可以同時通知多個緩存服務
- 缺點:時效性一般,可能存在中間不一緻狀态
- 場景:時效性要求一般,有多個服務需要同步
而異步實作又可以基于MQ或者Canal來實作:
1)基于MQ的異步通知:
解讀:
- 商品服務完成對資料的修改後,隻需要發送一條消息到MQ中。
- 緩存服務監聽MQ消息,然後完成對緩存的更新
依然有少量的代碼侵入。
2)基于Canal的通知
解讀:
- 商品服務完成商品修改後,業務直接結束,沒有任何代碼侵入
- Canal監聽MySQL變化,當發現變化後,立即通知緩存服務
- 緩存服務接收到canal通知,更新緩存
代碼零侵入
5.2 安裝Canal
5.2.1 認識Canal
Canal [kə’næl],譯意為水道/管道/溝渠,canal是阿裡巴巴旗下的一款開源項目,基于Java開發。基于資料庫增量日志解析,提供增量資料訂閱&消費。GitHub的位址:https://github.com/alibaba/canal
Canal是基于mysql的主從同步來實作的,MySQL主從同步的原理如下:
- 1)MySQL master 将資料變更寫入二進制日志( binary log),其中記錄的資料叫做binary log events
- 2)MySQL slave 将 master 的 binary log events拷貝到它的中繼日志(relay log)
- 3)MySQL slave 重放 relay log 中事件,将資料變更反映它自己的資料
而Canal就是把自己僞裝成MySQL的一個slave節點,進而監聽master的binary log變化。再把得到的變化資訊通知給Canal的用戶端,進而完成對其它資料庫的同步。
5.2.2 安裝Canal
安裝和配置Canal參考課前資料文檔:
5.3 監聽Canal
Canal提供了各種語言的用戶端,當Canal監聽到binlog變化時,會通知Canal的用戶端。
我們可以利用Canal提供的Java用戶端,監聽Canal通知消息。當收到變化的消息時,完成對緩存的更新。
不過這裡我們會使用GitHub上的第三方開源的canal-starter用戶端。位址:https://github.com/NormanGyllenhaal/canal-client
與SpringBoot完美整合,自動裝配,比官方用戶端要簡單好用很多。
5.3.1 引入依賴:
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
5.3.2 編寫配置:
canal:
destination: heima # canal的叢集名字,要與安裝canal時設定的名稱一緻
server: 192.168.150.101:11111 # canal服務位址
5.3.3 修改Item實體類
通過@Id、@Column、等注解完成Item與資料庫表字段的映射:
@Transient//表示不屬于表裡的字段
package com.heima.item.pojo;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import javax.persistence.Column;
import java.util.Date;
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
@Column(name = "name")
private String name;//商品名稱
private String title;//商品标題
private Long price;//價格(分)
private String image;//商品圖檔
private String category;//分類名稱
private String brand;//品牌名稱
private String spec;//規格
private Integer status;//商品狀态 1-正常,2-下架
private Date createTime;//建立時間
private Date updateTime;//更新時間
@TableField(exist = false)
@Transient//表示不屬于表裡的字段
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}
5.3.4 編寫監聽器
通過實作
EntryHandler<T>
接口編寫監聽器,監聽Canal消息。注意兩點:
- 實作類通過
指定監聽的表資訊@CanalTable("tb_item")
- EntryHandler的泛型是與表對應的實體類
package com.heima.item.canal;
import com.github.benmanes.caffeine.cache.Cache;
import com.heima.item.config.RedisHandler;
import com.heima.item.pojo.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {
@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long, Item> itemCache;
@Override
public void insert(Item item) {
// 寫資料到JVM程序緩存
itemCache.put(item.getId(), item);
// 寫資料到redis
redisHandler.saveItem(item);
}
@Override
public void update(Item before, Item after) {
// 寫資料到JVM程序緩存
itemCache.put(after.getId(), after);
// 寫資料到redis
redisHandler.saveItem(after);
}
@Override
public void delete(Item item) {
// 删除資料到JVM程序緩存
itemCache.invalidate(item.getId());
// 删除資料到redis
redisHandler.deleteItemById(item.getId());
}
}
在這裡對Redis的操作都封裝到了RedisHandler這個對象中,是我們之前做緩存預熱時編寫的一個類,内容如下:
package com.heima.item.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import com.heima.item.service.IItemService;
import com.heima.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
// 初始化緩存
// 1.查詢商品資訊
List<Item> itemList = itemService.list();
// 2.放入緩存
for (Item item : itemList) {
// 2.1.item序列化為JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}
// 3.查詢商品庫存資訊
List<ItemStock> stockList = stockService.list();
// 4.放入緩存
for (ItemStock stock : stockList) {
// 2.1.item序列化為JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
public void saveItem(Item item) {
try {
String json = MAPPER.writeValueAsString(item);
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void deleteItemById(Long id) {
redisTemplate.delete("item:id:" + id);
}
}