天天看点

Redis Pub/Sub 发布订阅模式的深度解析与实现消息队列1 Pub/Sub的概述2 订阅3 取消订阅4 模式匹配5 发布6 Pub/Sub原理7 Pub/Sub缺点

详细介绍了Redis 的Pub/Sub的相关命令和优缺点,以及如何实现简单的消息队列。

文章目录

  • 1 Pub/Sub的概述
  • 2 订阅
  • 3 取消订阅
  • 4 模式匹配
  • 5 发布
  • 6 Pub/Sub原理
    • 6.1 pubsub_channels
    • 6.2 pubsub_patterns
  • 7 Pub/Sub缺点

1 Pub/Sub的概述

我们可以利用Redis的List数据结构实现一

个简单的消息队列,通过lpush命令写入消息,通过rpop 命令拉取消息,也可以使用BRPOP实现阻塞式的拉取消息。

上面的消息队列有一个缺点,那就是不支持消息多播机制,消息多播机制就是生产者生产的一个消息可以被多个消费者消费到,这个功能在分布式系统中非常重要。

Redis单独使用Pub/Sub模块来支持消息多播,即发布/订阅模式(publish/subscribe),它是一种消息通信模式:发布者(pub)发送消息,订阅者(sub)接收消息。

发布者会将的消息发布到一个chanel(通道)中而不是发送给指定的订阅者,发布者也不知道可能有哪些订阅者。

订阅者可以订阅一个或多个channel,只接收来自订阅的channel的消息,并且不知道有哪些(如果有)发布者,这种模式实现了消息发布者和订阅者的解耦。

Pub/Sub 与键空间无关,消息不会被持久化,与数据库也无关,在db10上发布,将可以被 db1 上的订阅者听到。如果我们需要某种范围的范围,那么只能在设置的channel名字上做区分。

2 订阅

客户端使用

SUBSCRIBE channel [channel ...]

命令订阅通道,可以多次执行该命令,也可以一次订阅多个通道,多个客户端可以订阅相同的通道。

该命令返回一个数组,包括三部分,依次是:命令名称(字符串“subscribe”),订阅的通道名称,目前总共订阅的通道数(包含glob通道)。这三个部分对每一个订阅的通道是连续的。

客户端执行订阅以后,除了可以继续订阅(

SUBSCRIBE

或者

PSUBSCRIBE

),取消订阅(

UNSUBSCRIBE

或者

PUNSUBSCRIBE

),

PING

命令和结束连接(QUIT)外, 不能执行其他操作,客户端将阻塞直到订阅通道上发布消息的到来。

如下,表示客户端一次性订阅四个通道:aaa、bba、ccc、ddd:

127.0.0.1:6379> SUBSCRIBE aaa bba ccc ddd
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "aaa"
3) (integer) 1
1) "subscribe"
2) "bba"
3) (integer) 2
1) "subscribe"
2) "ccc"
3) (integer) 3
1) "subscribe"
2) "ddd"
3) (integer) 4
           

请注意,如果使用

redis-cli

一旦进入订阅模式就不会接受任何命令,只能使用 Ctrl-C 退出该模式。

3 取消订阅

客户端使用

UNSUBSCRIBE [channel [channel ...]]

命令取消订阅指定的通道,可以指定一个或者多个取消的订阅通道名称,也可以不带任何参数,此时将取消所有的订阅的通道(不包括glob通道)。

该命令返回一个数组,包括三部分,依次是:命令名称(字符串“unsubscribe”),订阅的通道名称,目前总共订阅的通道数(包含glob通道)。这三个部分对每一个取消订阅的通道是连续的。当最后一个参数为零时,我们不再订阅任何频道,客户端可以发出任何类型的 Redis 命令,因为我们处于 Pub/Sub 状态之外。

如下,表示客户端退出ccccc通道的订阅:

127.0.0.1:6379> UNSUBSCRIBE ccccc
1) "unsubscribe"
2) "cccc"
3) (integer) 0
           

4 模式匹配

Redis Pub/Sub 实现支持模式匹配。客户端可以订阅 glob 通道,这样就能接收发送到通道名称与给定模式匹配的通道的所有消息。

客户端使用

PSUBSCRIBE pattern [pattern ...]

订阅一个或多个glob 通道。

该命令返回一个数组,包括三部分,依次是:命令名称(字符串“psubscribe”),订阅的glob通道名称,目前总共订阅的通道数(包含非glob通道)。这三个部分对每一个订阅的通道是连续的。

例如,订阅a*和*c模式:

127.0.0.1:6379> PSUBSCRIBE a* *c
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "a*"
3) (integer) 1
1) "psubscribe"
2) "*c"
3) (integer) 2
           

客户端使用

PUNSUBSCRIBE [pattern [pattern ...]]

退订一个或多个glob 通道。也可以不带任何参数,此时将取消所有的订阅的通道(不包括非glob通道)。

该命令返回一个数组,包括三部分,依次是:命令名称(字符串“punsubscribe”),取消订阅的glob通道名称,目前总共订阅的通道数(包含非glob通道)。这三个部分对每一个取消订阅的通道是连续的。当最后一个参数为零时,我们不再订阅任何频道,客户端可以发出任何类型的 Redis 命令,因为我们处于 Pub/Sub 状态之外。

如下,取消对a*的glob通道的订阅:

127.0.0.1:6379> PUNSUBSCRIBE a*
1) "punsubscribe"
2) "a*"
3) (integer) 0
           

subscribe, unsubscribe, psubscribe 和punsubscribe

命令的最后都返回当前客户端订阅的glob通道和通道的总数,如果为0,则客户端自动退出Pub/Sub模式。

5 发布

PUBLISH channel message

命令在指定的通道上发布消息。只能在一个通道上发布消息,不能在多个通道上同时发布消息。

将返回通知的接收者数量。这里的接收者数目大于等于订阅该通道的客户端数目,因为一个客户端的glob通道和非glob通道同时匹配发布通道的话,则视为两个接收者。换句话说,如果客户端订阅了多个与已发布消息匹配的模式,或者订阅了与该消息匹配的模式和通道,则该客户端可能会多次收到同一条消息。

在接收端,收到的响应包括三部分,依次是:“message”字符串,匹配的通道名称,发布的消息内容。如果是因为glob模式匹配而接收,那么返回四部分:“pmessage”字符串,匹配的glob通道名称,发送的原始通道名称,发布的消息内容。

如果某个客户端的订阅a*和*c两个模式通道:

127.0.0.1:6379> PSUBSCRIBE a* *c
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "a*"
3) (integer) 1
1) "psubscribe"
2) "*c"
3) (integer) 2
           

如果发送消息的通道为ac,那么将会返回2:

127.0.0.1:6379> PUBLISH ac xxxxx
(integer) 2
           

在客户端,将收到两次消息:

1) "pmessage"
2) "a*"
3) "ac"
4) "xxxxx"
1) "pmessage"
2) "*c"
3) "ac"
4) "xxxxx"
           

6 Pub/Sub原理

每个Redis服务器进程维持着一个标识服务器状态的

redis.h/redisServer

结构,其中就保存着有订阅的频道 以及 订阅模式 的信息:

struct redisServer {
    // ...
    dict *pubsub_channels;  // 订阅频道
    list *pubsub_patterns;  // 订阅模式
    // ...
};
           

6.1 pubsub_channels

pubsub_channels

是一个dict字典结构,key(数组元素)为channel,value就是某个client。当客户端订阅某一个频道之后,Redis 就会往 pubsub_channels 这个字典中新添加一条channel和client数据,不同的client可以订阅相同的channel,client以链表的方式串联起来,这样就能保存多个client对同一个channel的关系,非常的巧妙。

Redis Pub/Sub 发布订阅模式的深度解析与实现消息队列1 Pub/Sub的概述2 订阅3 取消订阅4 模式匹配5 发布6 Pub/Sub原理7 Pub/Sub缺点

了解了这个结构,SUBSCRIBE 、PUBLISH 、UNSUBSCRIBE命令的实现也变得十分简单了。

SUBSCRIBE

就是将channel和client加入到dict中,如果此前没有该channel,那就新增一个channel元素,然后在再增一个client链表节点,如果此前存在,则直接在链表末尾添加一个client节点。

PUBLISH

只需要通过上述字典定位到具体的channel,就能找到所有订阅该channel的客户端,再把消息发送给它们就好了。

UNSUBSCRIBE

也很简单,将对应channel下面的链表中的client删除即可。

6.2 pubsub_patterns

pubsub_patterns

用于存储所有的glob channel,它是一个list结构,节点类型为

redis.h/pubsubPattern

typedefstruct pubsubPattern {
    redisClient *client;  // 订阅模式的客户端
    robj *pattern;        // 订阅的模式
} pubsubPattern;
           

当使用

PSUBSCRIBE

命令订阅一个模式时,程序就创建一个

pubsubPattern

添加到

pubsub_patterns

链表中。如果另一个客户端也订阅一个模式,则向链表的后面新增一个pubsubPattern节点即可。

因此,实际上PUBLISH除了会在pubsub_channels中定位具体的channel之外,还会将指定的channel与pubsub_patterns 中的模式进行对比,如果 指定的channel 和某个模式匹配的话,那么也将 message 发送到订阅那个模式的全部客户端。

PUNSUBSCRIBE

的实现也很简单,就是删除pubsub_patterns中,client和pattern信息对比一致的节点。

7 Pub/Sub缺点

发布的消息在Redis系统中不能持久化,因此,必须先执行订阅,再等待消息发布。如果先发布了消息,那么该消息由于没有订阅者,消息将被直接丢弃。

消息只管发送,不管接收,也没有ACK机制,无法保证消息的消费成功。如果某个消费者中途加入进来,或者挂掉重启,那么这之前丢失的消息也不能再次消费。

以上的缺点导致Redis的Pub/Sub模式就像个小玩具,在生产环境中几乎无用武之地,非常的尴尬!为此,Redis5.0版本新增了Stream数据结构,不但支持多播,还支持数据持久化,相比Pub/Sub更加的强大!

相关文章:

  1. https://redis.io
如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!

继续阅读