Redis的常見配置
spring.redis.pool.max-active=8 # 連接配接池最大連接配接數(使用負值表示沒有限制)
spring.redis.pool.max-wait=-1 # 連接配接池最大阻塞等待時間(使用負值表示沒有限制)
spring.redis.pool.max-idle=8 # 連接配接池中的最大空閑連接配接
spring.redis.pool.min-idle=0 # 連接配接池中的最小空閑連接配接
spring.redis.timeout=0 # 接逾時時間(毫秒)
Redis記憶體淘汰機制
簡介:記憶體的淘汰機制的初衷是為了更好地使用記憶體,用一定的緩存miss來換取記憶體的使用效率。我們可以通過配置redis.conf中的maxmemory這個值來開啟記憶體淘汰功能,maxmemory為0的時候表示我們對Redis的記憶體使用沒有限制。
Redis提供了下面幾種淘汰政策供使用者選擇,其中預設的政策為noeviction政策:
noeviction:不做任何的清理工作,在redis的記憶體超過限制之後,所有的寫入操作都會傳回錯誤;但是讀操作都能正常的進行
-
-
- volatile-ttl:在設定了過期時間的鍵空間中,具有更早過期時間的key優先移除
- allkeys-lru:在主鍵空間中,優先移除最近未使用的key
- volatile-lru:在設定了過期時間的鍵空間中,優先移除最近未使用的key
- allkeys-random:在主鍵空間中,随機移除某個key
- volatile-random:在設定了過期時間的鍵空間中,随機移除某個key
- allkeys-lfu:從主鍵空間中,選擇某段時間之内使用頻次最少的鍵值對清除
- volatile-lfu:從設定了過期時間的鍵空間中,選擇某段時間之内使用頻次最小的鍵值對清除掉
- 這裡補充一下主鍵空間和設定了過期時間的鍵空間,舉個例子,假設我們有一批鍵存儲在Redis中,則有那麼一個哈希表用于存儲這批鍵及其值,如果這批鍵中有一部分設定了過期時間,那麼這批鍵還會被存儲到另外一個哈希表中,這個哈希表中的值對應的是鍵被設定的過期時間。設定了過期時間的鍵空間為主鍵空間的子集。
- 淘汰政策的選擇可以通過下面的配置指定:# maxmemory-policy noeviction
-
- Redis的高可用
- Redis-Sentinell叢集機制
- Redis-Sentinel是在master-slave機制上加入監控機制哨兵Sentinel實作的。
- Sentinel主要功能就是為Redis Master-Slave叢集提供:
- 監控(Monitoring): Sentinel以每秒鐘一次的頻率向它所知的Master,Slave以及其他Sentinel執行個體發送一個PING
- 提醒(Notification): 當被監控的某個 Redis 伺服器出現問題時, Sentinel 可以通過 API 向管理者或者其他應用程式發送通知。
- 自動故障遷移(Automatic failover): 當一個主伺服器不能正常工作時, Sentinel 會開始一次自動故障遷移操作, 它會将失效主伺服器的其中一個從伺服器更新為新的主伺服器, 并讓失效主伺服器的其他從伺服器改為複制新的主伺服器; 當用戶端試圖連接配接失效的主伺服器時, 叢集也會向用戶端傳回新主伺服器的位址, 使得叢集可以使用新主伺服器代替失效伺服器。
- PS:寫操作隻能在主伺服器中,從伺服器不允許寫,從伺服器隻允許讀
- PS:當主伺服器挂掉後,從伺服器中進行投票機制選舉出主伺服器
- PS:哨兵模式最好是鍊式結構,這樣隻有一個從伺服器向主伺服器發送同步指令的請求,其他從伺服器向各自上遊從伺服器發送請求
- Redis-Sentinel原理(執行步驟)
- 從資料庫向主資料庫發送sync指令。
- 主資料庫接收sync指令後,執行BGSAVE指令(儲存快照),建立一個RDB檔案,在建立RDB檔案期間的指令将儲存在緩沖區中。
- 當主資料庫執行完BGSAVE時,會向從資料庫發送RDB檔案,而從資料庫會接收并載入該檔案。
- 主資料庫将緩沖區的所有寫指令發給從伺服器執行。
- 以上處理完之後,之後主資料庫每執行一個寫指令,都會将被執行的寫指令發送給從資料庫。
- Redis-Sentinel的缺點
- 是資料庫容量受到實體記憶體的限制,不能用作海量資料的高性能讀寫,是以Redis适合的場景主要局限在較小資料量的高性能操作和運算上。
- 隻能主庫寫,從庫不能寫,不能水準擴容
- Redis-Cluster叢集
- 工作原理:
- 用戶端與Redis節點直連,不需要中間的Proxy層,直接連接配接任意一個Master節點
- 根據公式計算映射到相應的節點
- 優點:
- 無需Sentinel哨兵監控,如果Master挂了,内部會自動将Slave切換Master
- 可以水準的擴容
- 支援自動化遷移,當出現某個Slave當機了,那麼就隻有Master,萬一Master在當機了呢?針對這種情況,如果說其他Master有多餘的Slave,叢集自動把多餘的Slave遷移到沒有Slave的Master
- redis cluster 為了保證資料的高可用性,加入了主從模式,一個主節點對應一個或多個從節點,主節點提供資料存取,從節點則是從主節點拉取資料備份,當這個主節點挂掉後,就會有這個從節點選取一個來充當主節點,進而保證叢集不會挂掉。
- PS:叢集有ABC三個主節點, 如果這3個節點都沒有加入從節點,如果B挂掉了,我們就無法通路整個叢集了。A和C的slot也無法通路。
- PS:是以我們在叢集建立的時候,一定要為每個主節點都添加了從節點, 比如像這樣, 叢集包含主節點A、B、C, 以及從節點A1、B1、C1, 那麼即使B挂掉系統也可以繼續正确工作。
- PS:B1節點替代了B節點,是以Redis叢集将會選擇B1節點作為新的主節點,叢集将會繼續正确地提供服務。 當B重新開啟後,它就會變成B1的從節點。
- PS:不過需要注意,如果節點B和B1同時挂了,Redis叢集就無法繼續正确地提供服務了。
- 缺點
- 批量操作是個坑(批量操作是分批次發送的,再加上根據算法不同的key會進入不同的片段)
- 資源隔離性較差,容易出現互相影響的情況
- 工作原理:
- Redis-Sentinell叢集機制
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5CZhNjM5YWY4EWZ1E2Y1MWN0ITYwATNiZmZyEmZyYDZ58CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
- 隻使用單個sentinel程序來監控redis叢集是不可靠的,當sentinel程序宕掉後(sentinel本身也有單點問題,single-point-of-failure)整個叢集系統将無法按照預期的方式運作。是以有必要将sentinel叢集,這樣有幾個好處:
- 即使有一些sentinel程序宕掉了,依然可以進行redis叢集的主備切換;
- 如果隻有一個sentinel程序,如果這個程序運作出錯,或者是網絡堵塞,那麼将無法實作redis叢集的主備切換(單點問題);
- 如果有多個sentinel,redis的用戶端可以随意地連接配接任意一個sentinel來獲得關于redis叢集中的資訊。
- Redis持久化
- RDB 持久化(原理是将 Reids 在記憶體中的資料庫記錄定時 dump 到磁盤上的 RDB 持久化)
- RDB 持久化是指在指定的時間間隔内将記憶體中的資料集快照寫入磁盤,實際操作過程是 fork 一個子程序,先将資料集寫入臨時檔案,寫入成功後,再替換之前的檔案,用二進制壓縮存儲。
- AOF持久化(原理是将 Reids 的記錄檔以追加的方式寫入檔案)
- AOF 持久化以日志的形式記錄伺服器所處理的每一個寫、删除操作,查詢操作不會記錄,以文本的方式記錄,可以打開檔案看到詳細的操作記錄。
- 二者優缺點
- RDB 存在哪些優勢呢?
- 一旦采用該方式,那麼你的整個 Redis 資料庫将隻包含一個檔案,這對于檔案備份而言是非常完美的。比如,你可能打算每個小時歸檔一次最近 24 小時的資料,同時還要每天歸檔一次最近 30 天的資料。通過這樣的備份政策,一旦系統出現災難性故障,我們可以非常容易的進行恢複。
- 對于災難恢複而言,RDB 是非常不錯的選擇。因為我們可以非常輕松的将一個單獨的檔案壓縮後再轉移到其它存儲媒體上。
- 性能最大化。對于 Redis 的服務程序而言,在開始持久化時,它唯一需要做的隻是 fork 出子程序,之後再由子程序完成這些持久化的工作,這樣就可以極大的避免服務程序執行 IO 操作了。
- 相比于 AOF 機制,如果資料集很大,RDB 的啟動效率會更高。
- RDB 又存在哪些劣勢呢?
- 如果你想保證資料的高可用性,即最大限度的避免資料丢失,那麼 RDB 将不是一個很好的選擇。因為系統一旦在定時持久化之前出現當機現象,此前沒有來得及寫入磁盤的資料都将丢失。
- 由于 RDB 是通過 fork 子程序來協助完成資料持久化工作的,是以,如果當資料集較大時,可能會導緻整個伺服器停止服務幾百毫秒,甚至是 1 秒鐘。
- AOF 的優勢有哪些呢?
- 該機制可以帶來更高的資料安全性,即資料持久性。Redis 中提供了 3 中同步政策,即每秒同步、每修改同步和不同步。事實上,每秒同步也是異步完成的,其效率也是非常高的,所差的是一旦系統出現當機現象,那麼這一秒鐘之内修改的資料将會丢失。而每修改同步,我們可以将其視為同步持久化,即每次發生的資料變化都會被立即記錄到磁盤中。可以預見,這種方式在效率上是最低的。至于無同步,無需多言,我想大家都能正确的了解它。
- 由于該機制對日志檔案的寫入操作采用的是 append 模式,是以在寫入過程中即使出現當機現象,也不會破壞日志檔案中已經存在的内容。然而如果我們本次操作隻是寫入了一半資料就出現了系統崩潰問題,不用擔心,在 Redis 下一次啟動之前,我們可以通過 redis-check-aof 工具來幫助我們解決資料一緻性的問題。
- 如果日志過大,Redis 可以自動啟用 rewrite 機制。即 Redis 以 append 模式不斷的将修改資料寫入到老的磁盤檔案中,同時 Redis 還會建立一個新的檔案用于記錄此期間有哪些修改指令被執行。是以在進行 rewrite 切換時可以更好的保證資料安全性。
- AOF 包含一個格式清晰、易于了解的日志檔案用于記錄所有的修改操作。事實上,我們也可以通過該檔案完成資料的重建。
- AOF 的劣勢有哪些呢?
- 對于相同數量的資料集而言,AOF 檔案通常要大于 RDB 檔案。RDB 在恢複大資料集時的速度比 AOF 的恢複速度要快。
- 根據同步政策的不同,AOF 在運作效率上往往會慢于 RDB。總之,每秒同步政策的效率是比較高的,同步禁用政策的效率和 RDB 一樣高效。
- 二者選擇的标準,就是看系統是願意犧牲一些性能,換取更高的緩存一緻性(aof),還是願意寫操作頻繁的時候,不啟用備份來換取更高的性能,待手動運作 save 的時候,再做備份(rdb)。rdb 這個就更有些 eventually consistent 的意思了。
- RDB 存在哪些優勢呢?
- RDB 持久化(原理是将 Reids 在記憶體中的資料庫記錄定時 dump 到磁盤上的 RDB 持久化)
- 緩存穿透
- 正常情況:是先查詢緩存,緩存有的不查詢資料庫
- 異常情況:查詢一個資料庫不存在的資料,會一直穿透緩存,導緻DB挂掉
- 解決緩存穿透:
- 方法一:參數校驗,不合法的不僅如此資料庫查詢
- 方法二:當我們從資料庫找不到的時候,我們也将這個空對象設定到緩存裡邊去。下次再請求的時候,就可以從緩存裡邊擷取了。這種情況我們一般會将空對象設定一個較短的過期時間
- 并發量不算高緩存與資料庫雙寫一緻問題
- 錯誤辦法:先更新資料庫,在删除緩存。這樣如果出現更新資料庫成功,删除緩存失敗,就會出現資料庫和緩存不一緻
- 正确辦法:先删除緩存,在更新資料庫,這樣及時出現删除緩存成功,資料庫更新失敗,下次讀取緩存沒有,在讀取資料庫,立刻反寫緩存,也不會出現資料庫和緩存不一緻的問題
- 為什麼是删除緩存而不是更新緩存呢
- 因為在某些場景,更新緩存并不僅僅是更新某個字段,而是需要更新資料庫中某張表的字段,然後和其他幾張表聯合計算出來的,這種情況如果我們删除緩存,下次計算走現有邏輯重新計算放入緩存,簡單高效。如果更新就需要在這一次更新的時候還需要增加計算的邏輯。删除更新可以看成一次懶加載,下次需要用到的時候在加載資料
- 并發量很高(一天上億并發)的緩存與資料庫雙寫一緻問題
- 先删除緩存,在更新資料庫,在更改資料庫但是沒有更新完的時候,這是又來了一個讀請求,這時候查詢資料庫,計入緩存,這時候資料庫更新完成,出現不一緻的情況了。
- 解決辦法一個隊列對應一個工作線程,每個工作線程串行拿到對應的操作,然後一條一條執行。這樣的話,一個資料變更操作,先删除緩存,然後更新資料庫,但是資料庫還沒有完成更新,此時進來一個讀請求過來,沒有讀到緩存,那麼先将這次請求發送到隊列中,此時隊列中會積壓,然後串行執行每個請求。
- 這裡有一個優化點,一個隊列中,其實多個更新緩存請求串在一起是沒有意義的(可以多個相同的業務id),如果發現隊列中已經包含,不需要放入隊列中。等待前面操作完成即可。操作完成之後放入緩存
- 高并發可能衍生的問題就是讀請求長時阻塞,大量更新請求讀不到緩存,積壓到隊列,直接導緻很多查詢直接通路資料庫
- 第二個問題多個服務,必須保證更新操作和緩存操作,都通過nginx伺服器路由到相同的服務執行個體上,比如同一個商品,可以通過某個參數hash路由,使他進入同一台機器上。
- 先删除緩存,在更新資料庫,在更改資料庫但是沒有更新完的時候,這是又來了一個讀請求,這時候查詢資料庫,計入緩存,這時候資料庫更新完成,出現不一緻的情況了。
- Redis支援的五種資料結構以及應用場景
- String
- set key value 【ex seconds】【px millionseconds】【nx | xx】
- set wzc 911024 ex 50(設定這個值有效時間是50秒)
- set wzc 911024 px 50(設定這個值有效時間是50毫秒)
- set wzc 911024 nx(設定的這個key不存在,才會成功)分布式鎖
- set wzc 911024 xx(設定的這個key存在,才會成功)更新操作
- get key(擷取值)
- mset key val key2 val key3 val(批量設定)
- mget key key2 key3(批量擷取)
- incr key(自增),decr key(自減),incrby key increment(指定數字自增),decrby key decrement(指定數字自減)
- 四種情況
- 如果key對應的value不是整數,會報錯
- 如果key對應的value是整數,則傳回自增或自減後的結果
- 如果key不存在,設定這個key的value是0并自增或自減1(指定了increment或decrement除外),傳回自增後的結果
- 若key對應的value值自增後超過integer的最大值也就是2的64次方,也會報錯
- 四種情況
- strlen key(擷取字元串長度)
- 應用場景
- 緩存個人資料資訊,緩存權限資訊
- session共享,token
- 正常計數,比如點贊數,收藏數incresby,decreaseby
- 設定驗證碼有效期setex
- 通路限制
- set key value 【ex seconds】【px millionseconds】【nx | xx】
- String
//60秒内短信驗證碼
使用者請求了我們的發送短信驗證碼服務
先從換從裡取出這個使用者對應的key(可以是電話号碼拼接點東西)
mKey = redisCli.get(key);
if(mKey != null){
如果緩存中存在了,就是已經請求過驗證碼了,比如我系統參數設定的至少60秒發送一次短信
那就代表60之内再次請求了,傳回false阻止
return false;
}else{
否則就是第一次請求短信驗證碼或者等了60再次請求的驗證碼
發送驗證碼
sendMsg()
放入redis緩存并設定失效時間60秒
redisCli.set(key,value,60)
}
//一天内限制五次
try {
//先判斷Redis中是否有該key值
if (RedisHelper.exists(key)) {
//取出通路次數
int times = Integer.parseInt(RedisHelper.get(key));
//判斷通路是否大于最大次數
if (times >= maxTimes) {
return true;
}
//若不大于則将通路次數加1
RedisHelper.incr(key, 1L);
} else {
//若沒有則建立并設定失效時間
RedisHelper.set(key, "1", days, TimeUnit.DAYS);
}
}
catch (Exception e) {
throw new Exception
return false;
}
- hash
- hmset user:001 name "李三" age 18 birthday "20010101" (key是user:001,value是map)
- hmset user:001 age 30
- 應用場景:購物車ID,map(filed是name,count)
- list
- 常用指令:
- RPUSH key value [value ...]:從隊列的右邊入隊一個元素或多個元素,複雜度O(1)。将所有指定的值插入存于key的清單的尾部(從右側插入)。如果key不存在,那麼PUSH之前,會先自動建立一個空的清單。如果key對應的值不是一個list的話,則會傳回一個錯誤。如果同時push多個值的話,值會依次從左到右PUSH從尾部進入list。
- 常用指令:
redis> rpush queue a
(integer) 1
redis> rpush queue b
(integer) 2
redis> rpush queue c
(integer) 3
redis> rpush queue d e f
(integer) 6
redis> lrange queue 0 -1
1) "a"
2) "b"
3) "c"
4) "d"
5) "e"
6) "f"
-
- LPUSH key value [value ...]
- 從隊列的左邊入隊一個或多個元素,複雜度O(1)。這個指令和RPUSH幾乎一樣,隻是插入節點的方向相反了,是從list的頭部(左側)進行插入的。
- RPUSHX key value
- 從隊列的右邊入隊一個元素,僅隊列存在時有效。當隊列不存在時,不進行任何操作。
- LPUSH key value [value ...]
# 之前queue已經存在,且有a、b、c、d、e、f、g這6個元素
redis> rpushx queue z
(integer) 7
redis> del queue
(integer) 1
redis> rpushx queue z
(integer) 0
可以看出,最開始rpushx向queue中新增了一個節點,但當我們删掉了queue時,再rpushx,就沒有插入成功(傳回值為0)。
-
- LPUSHX key value
- 從隊列的左邊入隊一個元素,僅隊列存在時有效。當隊列不存在時,不進行任何操作。
- LPOP:從隊列的左邊出隊一個元素,複雜度O(1)。如果list為空,則傳回nil。
- LPUSHX key value
redis> del queue
(integer) 0
redis> rpush queue a b c d e f
(integer) 6
redis> lrange queue 0 -1
1) "a"
2) "b"
3) "c"
4) "d"
5) "e"
6) "f"
redis> lpop queue
"a"
redis> lpop queue
"b"
redis> lrange queue 0 -1
1) "c"
2) "d"
3) "e"
4) "f"
redis> rpop queue
"f"
redis> rpop queue
"e"
redis> lrange queue 0 -1
1) "c"
2) "d"
-
- RPOP key
- 從隊列的右邊出隊一個元素,複雜度O(1)。如果list為空,則傳回nil。
- BLPOP key [key ...] timeout
- 删除,并獲得該清單中的第一進制素,或阻塞,直到有一個可用。這是LPOP的阻塞版本。在LPOP的時候,如果隊列中沒有值,則會傳回一個nil。而BLPOP則會等待一段時間,如果list中有值(等待的時候,被添加的),則傳回對應值;如果在給定時間内仍沒有得到結果,則傳回nil。
- RPOP key
redis> lrange queue 0 -1
1) "c"
2) "d"
redis> BLPOP queue 1
1) "queue"
2) "c"
redis> BLPOP queue 1
1) "queue"
2) "d"
redis> BLPOP queue 1
(nil)
(1.10s)
redis> LPOP queue
(nil)
我們仍接着上面的實驗繼續,這時queue裡面隻有2個元素了,我們使用BLPOP取值,前兩次都成功地得到了值,效果和LPOP一樣。但第三次的時候,由于list已經為空,但是BLPOP并沒有立刻傳回nil,而是阻塞了一點時間(timeout的時間),之後才傳回了nil。最後,我們試驗了一下LPOP,證明了LPOP是立刻傳回結果的。
timeout表示等待的時間,機關是秒。當設為0時,表示永遠阻塞,非0時,表示等待的最長時間。
要注意的是,LBPOP支援多個key,也就是說可以同時監聽多個list,并按照key的順序,依次檢查list是否為空,如果不為空,則傳回最優先的list中的值。如果都為空,則阻塞,直到有一個list不為空,那麼傳回這個list對應的值。
-
- BRPOP key [key ...] timeout
- 删除,并獲得該清單中的最後一個元素,或阻塞,直到有一個可用。參考BLPOP。
- RPOPLPUSH source destination
- 删除清單中的最後一個元素,将其追加到另一個清單。這個指令可以原子性地傳回并删除source對應的清單的最後一個元素(尾部元素),并将鈣元素放入destination對應的清單的第一個元素位置(清單頭部)。
- BRPOP key [key ...] timeout
redis> rpush q1 1 2 3 4 5
(integer) 5
redis> lrange q1 0 -1
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
redis> rpoplpush q1 q2
"5"
redis> rpoplpush q1 q2
"4"
redis> lrange q1 0 -1
1) "1"
2) "2"
3) "3"
redis> lrange q2 0 -1
1) "4"
2) "5"
我們簡單的看一下上述的例子,首先我們初始化一個q1,内容是{1, 2, 3, 4, 5}。這是q2沒有定義,可以了解是一個空的list {}。
之後使用rpoplpush,從q1右邊pop出一個元素5,然後在q2左側push進。則現在的q1為{1, 2, 3, 4},q2為{5}。
再進行一次rpoplpush,從q1右邊pop出一個元素4,然後在q2左側push進。則現在的q1為{1, 2, 3},q2為{4, 5}。
-
- BRPOPLPUSH source destination timeout
- 彈出一個清單的值,将它推到另一個清單,并傳回它;或阻塞,直到有一個可用。RPOPLPUSH的阻塞版本。timeout的機關是秒,當timeout為0的時候,表示無限期阻塞。
- LLEN key
- 獲得隊列(List)的長度
- LRANGE key start stop
- 從清單中擷取指定傳回的元素。我們在前面用到了很多次。LRANGE可以擷取list的指定範圍的值。範圍用start和stop表示。負數表示從右向左數。需要注意的是,超出範圍的下标不會産生錯誤:如果start>end,會得到空清單,如果end超過隊尾,則Redis會将其當做清單的最後一個元素。
- LINDEX key index
- 擷取一個元素,通過其索引清單
- 我們之前介紹的操作都是對list的兩端進行的,是以算法複雜度都隻有O(1)。而這個操作是指定位置來進行的,每次操作,list都得找到對應的位置,是以算法複雜度為O(N)。list的下表是從0開始的,index為負的時候是從右向左數。-1表示最後一個元素。當下标超出的時候,會傳回nul。是以不用像操作數組一樣擔心範圍越界的情況。
- LSET key index value
- 設定隊列裡面一個元素的值
- 這個操作和LINDEX類似,隻不過LINDEX是讀,而LSET是寫。下标的用法和LINDX相同。不過當index越界的時候,這裡會報異常。
- LREM key count value
- 從清單中删除元素
- 該指令用于從key對應的list中,移除前count次出現 的值為value的元素。count參數有三種情況:
- count > 0: 表示從頭向尾(左到右)移除值為value的元素。
- count < 0: 表示從尾向頭(右向左)移除值為value的元素。
- count = 0: 表示移除所有值為value的元素。
- LTRIM key start stop
- 修剪到指定範圍内的清單
- 這個指令和LRANGE十分相似,LRANGE會将指定範圍的元素傳回給用戶端,而LTRIM會對list進行修剪,使其隻包含指定範圍的元素。start和stop表示範圍。超出範圍的下标不會産生錯誤:如果start>end,會得到空清單,如果end超過隊尾,則Redis會将其當做清單的最後一個元素。
- LINSERT key BEFORE|AFTER pivot value
- 在清單中的另一個元素之前或之後插入一個元素
- 該指令将value插入值key對應的清單的基準值pivot的前面或是後面。
- 當key不存在時,這個list被視為空清單,任何操作都不會發生。
- 當key存在,但儲存的不是list,則會報error。
- 該指令會傳回修改之後的list的長度,如果找不到pivot,則會傳回-1。
- BRPOPLPUSH source destination timeout
sadd <key> <value1> <value2>
将多個元素加入到key中,重複值忽略
smembers <key>
取出該集合的所有值
sismember <key> <value>
判斷集合key中是否有該value值 有就1 沒有0
scard <key>
傳回該集合的元素個數
srem <key> <value1> <value2>
删除集合中的某個元素
spop <key>
随機吐出該集合一個值
srandmember <key> <n>
随機從集合中取出n個值,不會從集合中删除
smove <key1> <key2> <value>
将key1中的value 移動到key2 中
sinter <key1> <key2>
傳回兩個集合的交集元素
sunion <key1> <key2>
傳回兩個集合的并集
1. “共同好友清單”
社交類應用中,擷取兩個人或多個人的共同好友,兩個人或多個人共同關注的微網誌這樣類似的功能,用 MySQL 的話操作很複雜,可以把每個人的好友 id 存到集合中,擷取共同好友的操作就可以簡單到一個取交集的指令就搞定
// 這裡為了友善閱讀,把 id 替換成姓名
sadd user:wade james melo paul kobe
sadd user:james wade melo paul kobe
sadd user:paul wade james melo kobe
sadd user:melo wade james paul kobe
// 擷取 wade 和 james 的共同好友
sinter user:wade user:james
/* 輸出:
* 1) "kobe"
* 2) "paul"
* 3) "melo"
*/
// 擷取香蕉四兄弟的共同好友
sinter user:wade user:james user:paul user:melo
/* 輸出:
* 1) "kobe"
*/
/*
類似的需求還有很多 , 必須把每個标簽下的文章 id 存到集合中,可以很容易的求出幾個不同标簽下的共同文章;
把每個人的愛好存到集合中,可以很容易的求出幾個人的共同愛好。
*/
2.抽獎
# 添加使用者
sadd key {userId1,userId2}
# 擷取對應的集合的所有成員
smembers key
# 從集合的右側(尾部)移除一個成員,并将其傳回
spop(key2)
# 從key2對應的集合中随機擷取 numbers 個元素
srandmember(name, numbers)
zadd<key><score1><value1><score2><value2>
将一個或多個元素以及score加入zset
zrange<key><start><stop> withscore
傳回下标在區間内的集合,帶有score
zrangebyscore <key> <min> <max>[withscore] [limit offset count]
傳回key中 score介于min和max中的成員,升序排列
zrevrangerbyscore <key> <min> <max> [withscore] [limit offset count]
降序
zincrby <key> <increment> <value>
在key集合中的value上增加increment
zrem <key> <value>
删除key集合下的指定元素
zcount <key> <min><max>
統計 區間内的元素個數
zcard <key>
擷取集合中的元素個數
zrank <key><value>
查詢value在key中的排名,從0開始
// 用元素的分數(score)表示與好友的親密度
zadd user:kobe 80 james 90 wade 85 melo 90 paul
// 根據“親密度”給好友排序
zrevrange user:kobe 0 -1
/**
* 輸出:
* 1) "wade"
* 2) "paul"
* 3) "melo"
* 4) "james"
*/
// 增加好友的親密度
zincrby user:kobe 15 james
// 再次根據“親密度”給好友排序
zrevrange user:kobe 0 -1
/**
* 輸出:
* 1) "james"
* 2) "wade"
* 3) "paul"
* 2) "melo"
*/
//類似的需求還出現在根據文章的閱讀量或點贊量對文章清單排序
//使用者登入次數
//将登入次數和使用者統一存儲在一個sorted set裡
zadd login:login_times 5 1
zadd login:login_times 1 2
zadd login:login_times 2 3
//當使用者登入時,對該使用者的登入次數自增1
ret = r.zincrby("login:login_times", 1, uid)
//那麼如何獲得登入次數最多的使用者呢,逆序排列取得排名前N的使用者
ret = r.zrevrange("login:login_times", 0, N-1)
防止連續點贊
String key = "document-collect-" + userId;
Set<ZSetOperations.TypedTuple<String>> tuples = new HashSet<>();
ZSetOperations.TypedTuple<String> tuple0 = new DefaultTypedTuple<String>(String.valueOf(itemId) + "-" + String.valueOf(sourceType), 1d);
tuples.add(tuple0);
Long i = vo.add(key, tuples);
redisTemplate.expire(key, 1, TimeUnit.HOURS);
//大于0,才會進入操作
if (i > 0) {
}else{
}
-
- 應用場景
- 評論系統:我們在看完一條微網誌之後,常常會評論一番,或者看看其他人的吐槽。每條評論的記錄都是按照時間順序排序的。我們讀的時候也是這個順序。這時,隊列就是一個很好的存儲結構。每送出一次評論,都向list的末尾添加一個新的節點。
- 并行轉串行:使用者每時每刻都可能送出請求,而且請求的頻率經常變化。這時,背景的程式不可能立刻響應每一個使用者的請求,尤其是請求特别占資源的服務的時候。我們需要一個排隊系統。根據使用者的請求時間,将使用者的請求放入隊列中,背景程式依次從隊列中擷取任務,處理并将結果傳回到結果隊列。通過隊列,我們将并行的請求轉換成串行的任務隊列,之後依次處理(當然背景的程式也可以多核并行處理)。
- 消息清單:兩個老師,一個學生
- A老師發送消息 給學生
- lpush msg::studnet_001 100(消息編号100)
- B老師發送消息
- lpush msg::studnet_001 200(消息編号200)
- 假如想拿最近的10條消息就可以執行如下指令(最新的消息一定在list的最左邊)
- lrange msg::studnet_001 0 9 #下标從0開始,閉區間都包含
- 需求作業清單查詢學生最新的作業
- 在Redis中我們的根據時間查詢的最新清單每個ID使用了常駐緩存,這是一直更新的。但是需要限制不能超過5000個ID,是以我們的擷取ID函數會一直詢問Redis。隻有在start/count參數超出了這個範圍的時候,才需要去通路資料庫。
- bitmap
- 文法:redis.setbit(key,offset,1)
- 文法:redis.getbit(key,offset)
- 文法:redis.bitCount(key)
- 文法:jedis.bitop(BitOP. AND , destkey, key1, key2);//求邏輯并
- 文法:jedis.bitop(BitOP.OR, destkey, key1, key2);//求邏輯或
- 文法:jedis.bitop(BitOP. XOR , destkey, key1, key2);//異或:不相同是1,相同時0,最後合計
- 應用場景
- 統計日活量:
- RedisUtils.setbit("userlogin2019-01-10", 1L, "1");
- RedisUtils.bitCount("userlogin2019-01-10");
- 統計一首歌 每天被多少使用者聽
- RedisUtils.setbit("musicId2019-01-10", userId, "1");
- RedisUtils.bitCount("musicId2019-01-10");
- 連續兩天登入的使用者
- RedisUtils.bitopAnd("activeUserlogin2019-01-09/10", "userlogin2019-01-10", "userlogin2019-01-09");
- 統計日活量:
- Redis分布式鎖實作原理(悲觀鎖)
- Redis因為是單線程的,是以本身沒有鎖的概念。是以分布式鎖的實作原理是往Redis當中寫入一個key(調用方法setnx),寫入成功相當于擷取鎖成功傳回1,擷取鎖成功;寫入失敗也即是setnx方法傳回0,擷取鎖失敗。注意鎖的失效時間,否則容易造成死鎖。
- String requestid = UUID.randomUUID().toString().trim().replaceAll("-", "");
- String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
- System.out.println(result);
- if ("OK".equals(result)) {
- return true;
- }
- NX代表隻有key不存在才會成功
- PX代表在過期時間後會自動釋放
- 使用随機值的原因是如果某個擷取到鎖的用戶端阻塞了很長時間,導緻它擷取到的鎖自動釋放。此時可能其他用戶端已經擷取到鎖,如果直接删除就會出現問題,是以需要随機值
- Redis因為是單線程的,是以本身沒有鎖的概念。是以分布式鎖的實作原理是往Redis當中寫入一個key(調用方法setnx),寫入成功相當于擷取鎖成功傳回1,擷取鎖成功;寫入失敗也即是setnx方法傳回0,擷取鎖失敗。注意鎖的失效時間,否則容易造成死鎖。
- redis樂觀鎖
- multi:開啟redis事務,置用戶端為事務态
- exec:送出事務,執行從multi到此之前的指令隊列,置用戶端為非事務态
- discard:取消事務,置用戶端為非事務态
- watch:監視鍵值對,作用送出事務exec時,事務是否發生變化,發生變化,事務取消
- 應用場景
jedis.watch(key);
Transaction transaction = jedis.multi();
transaction.set(key, String.valueOf(prdNum - 1));
List<Object> result = transaction.exec();
if (result == null || result.isEmpty()) {
System.out.println("悲劇了,顧客:" + clientName + "沒有搶到商品");// 可能是watch-key被外部修改,或者是資料操作被駁回
} else {
jedis.sadd(clientList, clientName);// 搶到商品記錄一下
System.out.println("好高興,顧客:" + clientName + "搶到商品");
break;
}