Redis 发布订阅
发布订阅:消息发布者发布消息 和 消息订阅者接收消息,两者之间通过某种媒介联系起来
Redis 发布订阅(pub/sub)是一种 消息通信模式 :发送者(pub)发送消息,订阅者(sub)接收消息。
Redis 客户端可以订阅任意数量的频道。
订阅 / 发布消息图:
图中可以看出,所需:
- 消息发送者 、 2. 频道 、 3. 消息订阅者
发布订阅机制
- 当一个客户端通过
命令向订阅者发布消息的时候,称这个客户端为发布者PUBLISH
publisher
- 当一个客户端通过
或者 subscribe
接收消息时,称这个客户端为 订阅者 PSUBSCRIBE
subscriber
- 为了解耦发布者和订阅者之间的关系,Redis 使用了频道
作为两者之间的中介,发布者直接把消息发送给 channel,而 channel 负责把消息发送给订阅者,发布者和订阅者之间没有直接的联系,都不知道对方的存在channel(频道)
订阅者 1,2,3 订阅了频道 channel,当有消息发布给频道时,这个消息就会被发送到三个订阅者客户端
demo 实现
学习参考链接
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
添加配置文件
spring:
redis:
host: 127.0.0.1
database: 5
password:
port: 6379
新建一个监听类,来监听消息
package com.maoxs.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
* 监听发送的消息
*/
@Component
public class CatListener extends MessageListenerAdapter implements MessageListener {
@Autowired
RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("我是Cat监听" + message.toString());
}
}
package com.maoxs.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
/**
* 监听发送的消息
*/
public class FishListener extends MessageListenerAdapter implements MessageListener {
@Autowired
RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("我是Fish监听" + message.toString());
}
}
创建一个监听容器
package com.maoxs.redis;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.maoxs.pojo.MessageReceiver;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import java.util.Arrays;
@Configuration
public class RedisMessageConfig {
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
CatListener catAdapter, FishListener fishAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅了一个叫chat 的通道
container.addMessageListener(catAdapter, new PatternTopic("cat"));
container.addMessageListener(fishAdapter, new PatternTopic("fish"));
//这个container 可以添加多个 messageListener
return container;
}
/**
* 消息监听器适配器,绑定消息处理器
*
* @param receiver
* @return
*/
// @Bean
// MessageListenerAdapter CatAdapter() {
// return new MessageListenerAdapter(new CatListener());
// }
/**
* 消息监听器适配器,绑定消息处理器
*
* @param receiver
* @return
*/
// @Bean
// MessageListenerAdapter FishAdapter() {
// return new MessageListenerAdapter(new FishListener());
// }
/**
* redis 读取内容的template
*/
@Bean
StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
//定义value的序列化方式
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
@RestController
public class TestController {
@Resource
StringRedisTemplate stringRedisTemplate;
@GetMapping("cat")
public void sendCatMessage(){
stringRedisTemplate.convertAndSend("cat","猫");
}
@GetMapping("fish")
public void sendFishMessage(){
stringRedisTemplate.convertAndSend("fish","鱼");
}