Redis實作分布式鎖的原理
前面講了Redis在實際業務場景中的應用,那麼下面再來了解一下Redisson功能性場景的應用,也就是大家經常使用的分布式鎖的實作場景。
關于分布式鎖的概念,本文就不做描述。
•引入redisson依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.0</version> </dependency>
•編寫簡單的測試代碼
public class RedissonTest { private static RedissonClient redissonClient; static { Config config=new Config(); config.useSingleServer().setAddress("redis://192.168.221.128:6379"); redissonClient=Redisson.create(config); } public static void main(String[] args) throws InterruptedException { RLock rLock=redissonClient.getLock("updateOrder"); //最多等待100秒、上鎖10s以後自動解鎖 if(rLock.tryLock(100,10,TimeUnit.SECONDS)){ System.out.println("擷取鎖成功"); } Thread.sleep(2000); rLock.unlock(); redissonClient.shutdown(); } }
Redisson分布式鎖的實作原理
你們會發現,通過redisson,非常簡單就可以實作我們所需要的功能,當然這隻是redisson的冰山一角,redisson最強大的地方就是提供了分布式特性的常用工具類。使得原本作為協調單機多線程并發程式的并發程式的工具包獲得了協調分布式多級多線程并發系統的能力,降低了程式員在分布式環境下解決分布式問題的難度,下面分析一下RedissonLock的實作原理
RedissonLock.tryLock
@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); //通過tryAcquire方法嘗試擷取鎖 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { //表示成功擷取到鎖,直接傳回 return true; } //省略部分代碼....}
tryAcquire
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; //leaseTime就是租約時間,就是redis key的過期時間。 if (leaseTime != -1) { //如果設定過期時間 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else {//如果沒設定了過期時間,則從配置中擷取key逾時時間,預設是30s過期 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } //當tryLockInnerAsync執行結束後,觸發下面回調 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { //說明出現異常,直接傳回 return; } // lock acquired if (ttlRemaining == null) { //表示第一次設定鎖鍵 if (leaseTime != -1) { //表示設定過逾時時間,更新internalLockLeaseTime,并傳回 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //leaseTime=-1,啟動Watch Dog scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture;}
tryLockInnerAsync
通過lua腳本來實作加鎖的操作
1.判斷lock鍵是否存在,不存在直接調用hset存儲目前線程資訊并且設定過期時間,傳回nil,告訴用戶端直接擷取到鎖。2.判斷lock鍵是否存在,存在則将重入次數加1,并重新設定過期時間,傳回nil,告訴用戶端直接擷取到鎖。3.被其它線程已經鎖定,傳回鎖有效期的剩餘時間,告訴用戶端需要等待。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
關于Lua腳本,我們稍後再解釋。
unlock釋放鎖流程
釋放鎖的流程,腳本看起來會稍微複雜一點
1.如果lock鍵不存在,通過
publish
指令發送一個消息表示鎖已經可用。2.如果鎖不是被目前線程鎖定,則傳回nil3.由于支援可重入,在解鎖時将重入次數需要減14.如果計算後的重入次數>0,則重新設定過期時間5.如果計算後的重入次數<=0,則發消息說鎖已經可用
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}
RedissonLock有競争的情況
有競争的情況在redis端的lua腳本是相同的,隻是不同的條件執行不同的redis指令。當通過tryAcquire發現鎖被其它線程申請時,需要進入等待競争邏輯中
1.this.await傳回false,說明等待時間已經超出擷取鎖最大等待時間,取消訂閱并傳回擷取鎖失敗2.this.await傳回true,進入循環嘗試擷取鎖。
繼續看RedissonLock.tryLock後半部分代碼如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//省略部分代碼 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); // 訂閱監聽redis消息,并且建立RedissonLockEntry RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 阻塞等待subscribe的future的結果對象,如果subscribe方法調用超過了time,說明已經超過了用戶端設定的最大wait time,則直接傳回false,取消訂閱,不再繼續申請鎖了。 if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { //取消訂閱 subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); //表示搶占鎖失敗 return false; //傳回false } try { //判斷是否逾時,如果等待逾時,傳回獲的鎖失敗 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } //通過while循環再次嘗試競争鎖 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //競争鎖,傳回鎖逾時時間 // lock acquired if (ttl == null) { //如果逾時時間為null,說明獲得鎖成功 return true; } //判斷是否逾時,如果逾時,表示擷取鎖失敗 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 通過信号量(共享鎖)阻塞,等待解鎖消息. (減少申請鎖調用的頻率) // 如果剩餘時間(ttl)小于wait time ,就在 ttl 時間内,從Entry的信号量擷取一個許可(除非被中斷或者一直沒有可用的許可)。 // 否則就在wait time 時間範圍内等待可以通過信号量 currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 更新等待時間(最大等待時間-已經消耗的阻塞時間) time -= System.currentTimeMillis() - currentTime; if (time <= 0) { //擷取鎖失敗 acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); //取消訂閱 }// return get(tryLockAsync(waitTime, leaseTime, unit)); }
鎖過期了怎麼辦?
一般來說,我們去獲得分布式鎖時,為了避免死鎖的情況,我們會對鎖設定一個逾時時間,但是有一種情況是,如果在指定時間内目前線程沒有執行完,由于鎖逾時導緻鎖被釋放,那麼其他線程就會拿到這把鎖,進而導緻一些故障。
為了避免這種情況,Redisson引入了一個Watch Dog機制,這個機制是針對分布式鎖來實作鎖的自動續約,簡單來說,如果目前獲得鎖的線程沒有執行完,那麼Redisson會自動給Redis中目标key延長逾時時間。
預設情況下,看門狗的續期時間是30s,也可以通過修改Config.lockWatchdogTimeout來另行指定。
@Overridepublic boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { return tryLock(waitTime, -1, unit); //leaseTime=-1}
實際上,當我們通過tryLock方法沒有傳遞逾時時間時,預設會設定一個30s的逾時時間,避免出現死鎖的問題。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //當leaseTime為-1時,leaseTime=internalLockLeaseTime,預設是30s,表示目前鎖的過期時間。 //this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { //說明出現異常,直接傳回 return; } // lock acquired if (ttlRemaining == null) { //表示第一次設定鎖鍵 if (leaseTime != -1) { //表示設定過逾時時間,更新internalLockLeaseTime,并傳回 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //leaseTime=-1,啟動Watch Dog scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture;}
由于預設設定了一個30s的過期時間,為了防止過期之後目前線程還未執行完,是以通過定時任務對過期時間進行續約。
•首先,會先判斷在expirationRenewalMap中是否存在了entryName,這是個map結構,主要還是判斷在這個服務執行個體中的加鎖用戶端的鎖key是否存在,•如果已經存在了,就直接傳回;主要是考慮到RedissonLock是可重入鎖。
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else {// 第一次加鎖的時候會調用,内部會啟動WatchDog entry.addThreadId(threadId); renewExpiration(); }}
定義一個定時任務,該任務中調用
renewExpirationAsync
方法進行續約。
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //用到了時間輪機制 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // renewExpirationAsync續約租期 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次間隔租期的1/3時間執行 ee.setTimeout(task);}
執行Lua腳本,對指定的key進行續約。
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId));}
Lua腳本
Lua是一個高效的輕量級腳本語言(和JavaScript類似),用标準C語言編寫并以源代碼形式開放, 其設計目的是為了嵌入應用程式中,進而為應用程式提供靈活的擴充和定制功能。Lua在葡萄牙語中是“月亮”的意思,它的logo形式衛星,寓意是Lua是一個“衛星語言”,能夠友善地嵌入到其他語言中使用;其實在很多常見的架構中,都有嵌入Lua腳本的功能,比如OpenResty、Redis等。
使用Lua腳本的好處:
1.減少網絡開銷,在Lua腳本中可以把多個指令放在同一個腳本中運作2.原子操作,redis會将整個腳本作為一個整體執行,中間不會被其他指令插入。換句話說,編寫腳本的過程中無需擔心會出現競态條件3.複用性,用戶端發送的腳本會永遠存儲在redis中,這意味着其他用戶端可以複用這一腳本來完成同樣的邏輯
Lua的下載下傳和安裝
Lua是一個獨立的腳本語言,是以它有專門的編譯執行工具,下面簡單帶大家安裝一下。
•下載下傳Lua源碼包: https://www.lua.org/download.htmlhttps://www.lua.org/ftp/lua-5.4.3.tar.gz•安裝步驟如下
tar -zxvf lua-5.4.3.tar.gz cd lua-5.4.3 make linux make install
如果報錯,說找不到readline/readline.h, 可以通過yum指令安裝
yum -y install readline-devel ncurses-devel
最後,直接輸入
lua
指令即可進入lua的控制台。Lua腳本有自己的文法、變量、邏輯運算符、函數等,這塊我就不在這裡做過多的說明,用過JavaScript的同學,應該隻需要花幾個小時就可以全部學完,簡單示範兩個案例如下。
array = {"Lua", "mic"}for i= 0, 2 do print(array[i])end
array = {"mic", "redis"}for key,value in ipairs(array)do print(key, value)end
Redis與Lua
Redis中內建了Lua的編譯和執行器,是以我們可以在Redis中定義Lua腳本去執行。同時,在Lua腳本中,可以直接調用Redis的指令,來操作Redis中的資料。
redis.call(‘set’,'hello','world')local value=redis.call(‘get’,’hello’)
redis.call 函數的傳回值就是redis指令的執行結果,前面我們介紹過redis的5中類型的資料傳回的值的類型也都不一樣,redis.call函數會将這5種類型的傳回值轉化對應的Lua的資料類型
在很多情況下我們都需要腳本可以有傳回值,畢竟這個腳本也是一個我們所編寫的指令集,我們可以像調用其他redis内置指令一樣調用我們自己寫的腳本,是以同樣redis會自動将腳本傳回值的Lua資料類型轉化為Redis的傳回值類型。 在腳本中可以使用return 語句将值傳回給redis用戶端,通過return語句來執行,如果沒有執行return,預設傳回為nil。
Redis中執行Lua腳本相關的指令
編寫完腳本後最重要的就是在程式中執行腳本。Redis提供了EVAL指令可以使開發者像調用其他Redis内置指令一樣調用腳本。
EVAL指令-執行腳本
[EVAL] [腳本内容] [key參數的數量] [key …] [arg …]
可以通過key和arg這兩個參數向腳本中傳遞資料,他們的值可以在腳本中分别使用KEYS和ARGV 這兩個類型的全局變量通路。
比如我們通過腳本實作一個set指令,通過在redis用戶端中調用,那麼執行的語句是:
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello
上述腳本相當于使用Lua腳本調用了Redis的
set
指令,存儲了一個key=lua,value=hello到Redis中。
EVALSHA指令
考慮到我們通過eval執行lua腳本,腳本比較長的情況下,每次調用腳本都需要把整個腳本傳給redis,比較占用帶寬。為了解決這個問題,redis提供了EVALSHA指令允許開發者通過腳本内容的SHA1摘要來執行腳本。該指令的用法和EVAL一樣,隻不過是将腳本内容替換成腳本内容的SHA1摘要
1.Redis在執行EVAL指令時會計算腳本的SHA1摘要并記錄在腳本緩存中2.執行EVALSHA指令時Redis會根據提供的摘要從腳本緩存中查找對應的腳本内容,如果找到了就執行腳本,否則傳回“NOSCRIPT No matching script,Please use EVAL”
# 将腳本加入緩存并生成sha1指令script load "return redis.call('get','lua')"# ["13bd040587b891aedc00a72458cbf8588a27df90"]# 傳遞sha1的值來執行該指令evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0
Redisson執行Lua腳本
通過lua腳本來實作一個通路頻率限制功能。
思路,定義一個key,key中包含ip位址。 value為指定時間内的通路次數,比如說是10秒内隻能通路3次。
•定義Lua腳本。
local times=redis.call('incr',KEYS[1]) -- 如果是第一次進來,設定一個過期時間 if times == 1 then redis.call('expire',KEYS[1],ARGV[1]) end -- 如果在指定時間内通路次數大于指定次數,則傳回0,表示通路被限制 if times > tonumber(ARGV[2]) then return 0 end -- 傳回1,允許被通路 return 1
•定義controller,提供通路測試方法
@RestController public class RedissonController { @Autowired RedissonClient redissonClient; private final String LIMIT_LUA= "local times=redis.call('incr',KEYS[1])\n" + "if times == 1 then\n" + " redis.call('expire',KEYS[1],ARGV[1])\n" + "end\n" + "if times > tonumber(ARGV[2]) then\n" + " return 0\n" + "end\n" + "return 1"; @GetMapping("/lua/{id}") public String lua(@PathVariable("id") Integer id) throws ExecutionException, InterruptedException { List<Object> keys= Arrays.asList("LIMIT:"+id); RFuture<Object> future=redissonClient.getScript(). evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3); return future.get().toString(); } }
需要注意,上述腳本執行的時候會有問題,因為redis預設的序列化方式導緻value的值在傳遞到腳本中時,轉成了對象類型,需要修改
redisson.yml
檔案,增加codec的序列化方式。
•application.yml
spring: redis: redisson: file: classpath:redisson.yml
•redisson.yml
singleServerConfig: address: redis://192.168.221.128:6379 codec: !<org.redisson.codec.JsonJacksonCodec> {}
Lua腳本的原子性
redis的腳本執行是原子的,即腳本執行期間Redis不會執行其他指令。所有的指令必須等待腳本執行完以後才能執行。為了防止某個腳本執行時間過程導緻Redis無法提供服務。Redis提供了lua-time-limit參數限制腳本的最長運作時間。預設是5秒鐘。
非事務性操作
當腳本運作時間超過這個限制後,Redis将開始接受其他指令但不會執行(以確定腳本的原子性),而是傳回BUSY的錯誤,下面示範一下這種情況。
打開兩個用戶端視窗,在第一個視窗中執行lua腳本的死循環
eval "while true do end" 0
在第二個視窗中運作
get lua
,會得到如下的異常。
(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.
我們會發現執行結果是Busy, 接着我們通過script kill 的指令終止目前執行的腳本,第二個視窗的顯示又恢複正常了。
存在事務性操作
如果目前執行的Lua腳本對Redis的資料進行了修改(SET、DEL等),那麼通過SCRIPT KILL 指令是不能終止腳本運作的,因為要保證腳本運作的原子性,如果腳本執行了一部分終止,那就違背了腳本原子性的要求。最終要保證腳本要麼都執行,要麼都不執行
同樣打開兩個視窗,第一個視窗運作如下指令
eval "redis.call('set','name','mic') while true do end" 0
在第二個視窗運作
get lua
結果一樣,仍然是busy,但是這個時候通過script kill指令,會發現報錯,沒辦法kill。
(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.
遇到這種情況,隻能通過shutdown nosave指令來強行終止redis。
shutdown nosave和shutdown的差別在于 shutdown nosave不會進行持久化操作,意味着發生在上一次快照後的資料庫修改都會丢失。
Redisson的Lua腳本
了解了lua之後,我們再回過頭來看看Redisson的Lua腳本,就不難了解了。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
Redis中的Pub/Sub機制
下面是Redisson中釋放鎖的代碼,在代碼中我們發現一個publish的指令
redis.call('publish', KEYS[2], ARGV[1])
,這個指令是幹啥的呢?
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}
Redis提供了一組指令可以讓開發者實作“釋出/訂閱”模式(publish/subscribe) . 該模式同樣可以實作程序間的消息傳遞,它的實作原理是:
•釋出/訂閱模式包含兩種角色,分别是釋出者和訂閱者。訂閱者可以訂閱一個或多個頻道,而釋出者可以向指定的頻道發送消息,所有訂閱此頻道的訂閱者都會收到該消息•釋出者釋出消息的指令是PUBLISH, 用法是
PUBLISH channel message
比如向channel.1發一條消息:hello
PUBLISH channel.1 “hello”
這樣就實作了消息的發送,該指令的傳回值表示接收到這條消息的訂閱者數量。因為在執行這條指令的時候還沒有訂閱者訂閱該頻道,是以傳回為0. 另外值得注意的是消息發送出去不會持久化,如果發送之前沒有訂閱者,那麼後續再有訂閱者訂閱該頻道,之前的消息就收不到了
訂閱者訂閱消息的指令是:
SUBSCRIBE channel [channel …]
該指令同時可以訂閱多個頻道,比如訂閱channel.1的頻道:SUBSCRIBE channel.1,執行SUBSCRIBE指令後用戶端會進入訂閱狀态。
一般情況下,我們不會用pub/sub來做消息發送機制,畢竟有這麼多MQ技術在。
爆文推薦閱讀
(全網最詳細最有深度)超過1W字深度剖析JVM常量池