釋出者
com.javasea.redis.publish.TestSenderController定時釋出資訊到redis
/**
* 定時器模拟消息釋出者
*/
@EnableScheduling
@Component
public class TestSenderController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/** 向redis消息隊列index通道釋出消息*/
@Scheduled(fixedRate = 2000)
public void sendMessage(){
stringRedisTemplate.convertAndSend("pmp",String.valueOf(Math.random()));
stringRedisTemplate.convertAndSend("channel",String.valueOf(Math.random()));
}
}
訂閱者
com.javasea.redis.subscribe.RedisMsg接口的兩個實作類RedisChannelSub和RedisPmpSub會
将收到的資訊列印到控制台
public class RedisChannelSub implements RedisMsg {
@Override
public void receiveMessage(String message) {
//注意通道調用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter參數2相同
System.out.println("這是RedisChannelSub"+"-----"+message);
}
}
public class RedisPmpSub implements RedisMsg{
/**
* 接收消息的方法
* @param message 訂閱消息
*/
@Override
public void receiveMessage(String message){
//注意通道調用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter參數2相同
System.out.println("這是RedisPmpSub"+"+++++++++++++++++"+message);
}
}
/**
* @Description 普通的消息處理器接口
* @Author [email protected]
* @Date 23:50 2020/7/21 0021
**/
@Component
public interface RedisMsg {
public void receiveMessage(String message);
}
路由配置
RedisConfig2要配置listner和topic的路由,topic中的channel和TestSenderController的channel要對應
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//訂閱了一個叫pmp和channel 的通道,多通道
container.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic("pmp"));
container.addMessageListener(listenerAdapter(new RedisChannelSub()),new PatternTopic("channel"));
container.addMessageListener(listenerAdapter(new RedisChannelSub()),new PatternTopic("flowMsgChennel"));
//這個container 可以添加多個 messageListener
return container;
}
/**
* 配置消息接收處理類
* @param redisMsg 自定義消息接收類
* @return
*/
@Bean()
@Scope("prototype")
MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {
//這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“receiveMessage”
//也有好幾個重載方法,這邊預設調用處理器的方法 叫handleMessage 可以自己到源碼裡面看
return new MessageListenerAdapter(redisMsg, "receiveMessage");//注意2個通道調用的方法都要為receiveMessage
}
啟動程式
控制台輸出:
2020-07-29 11:42:10.716 ERROR 11776 --- [ container-13] o.s.d.r.l.RedisMessageListenerContainer : Connection failure occurred. Restarting subscription task after 5000 ms
這是RedisPmpSub+++++++++++++++++0.6018044162751559
這是RedisChannelSub-----0.6492059008427755
這是RedisPmpSub+++++++++++++++++0.14009953778676876
這是RedisChannelSub-----0.5201275445287328
這是RedisPmpSub+++++++++++++++++0.2196083162392929
這是RedisChannelSub-----0.3903862134377962
這是RedisPmpSub+++++++++++++++++0.5297280660628917