天天看點

有redis釋出訂閱還需要MQ的嗎

本文項目位址: https://github.com/longxiaonan/java-sea/tree/master/javasea-nosql/javasea-nosql-redis-subpub-springdata

釋出者

com.javasea.redis.publish.TestSenderController定時釋出資訊到redis

/**
 * 定時器模拟消息釋出者
 */
@EnableScheduling
@Component
public class TestSenderController {
    @Autowired
        private StringRedisTemplate stringRedisTemplate;

    /** 向redis消息隊列index通道釋出消息*/
    @Scheduled(fixedRate = 2000)
    public void sendMessage(){
        stringRedisTemplate.convertAndSend("pmp",String.valueOf(Math.random()));
        stringRedisTemplate.convertAndSend("channel",String.valueOf(Math.random()));
    }
}           

訂閱者

com.javasea.redis.subscribe.RedisMsg接口的兩個實作類RedisChannelSub和RedisPmpSub會

将收到的資訊列印到控制台

public class RedisChannelSub implements RedisMsg {
    @Override
    public void receiveMessage(String message) {
        //注意通道調用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter參數2相同
        System.out.println("這是RedisChannelSub"+"-----"+message);
    }
}           
public class RedisPmpSub implements RedisMsg{

    /**
     * 接收消息的方法
     * @param message 訂閱消息
     */
    @Override
    public void receiveMessage(String message){
        //注意通道調用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter參數2相同

        System.out.println("這是RedisPmpSub"+"+++++++++++++++++"+message);
    }
}           
/**
 * @Description 普通的消息處理器接口
 * @Author [email protected]
 * @Date 23:50 2020/7/21 0021
 **/
@Component
public interface RedisMsg {

    public void receiveMessage(String message);
}           

路由配置

RedisConfig2要配置listner和topic的路由,topic中的channel和TestSenderController的channel要對應

@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    //訂閱了一個叫pmp和channel 的通道,多通道
    container.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic("pmp"));
    container.addMessageListener(listenerAdapter(new RedisChannelSub()),new PatternTopic("channel"));
    container.addMessageListener(listenerAdapter(new RedisChannelSub()),new PatternTopic("flowMsgChennel"));
    //這個container 可以添加多個 messageListener
    return container;
}           
/**
     * 配置消息接收處理類
     * @param redisMsg  自定義消息接收類
     * @return
     */
    @Bean()
    @Scope("prototype")
    MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {
        //這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“receiveMessage”
        //也有好幾個重載方法,這邊預設調用處理器的方法 叫handleMessage 可以自己到源碼裡面看
        return new MessageListenerAdapter(redisMsg, "receiveMessage");//注意2個通道調用的方法都要為receiveMessage
    }           

啟動程式

控制台輸出:

2020-07-29 11:42:10.716 ERROR 11776 --- [   container-13] o.s.d.r.l.RedisMessageListenerContainer  : Connection failure occurred. Restarting subscription task after 5000 ms
這是RedisPmpSub+++++++++++++++++0.6018044162751559
這是RedisChannelSub-----0.6492059008427755
這是RedisPmpSub+++++++++++++++++0.14009953778676876
這是RedisChannelSub-----0.5201275445287328
這是RedisPmpSub+++++++++++++++++0.2196083162392929
這是RedisChannelSub-----0.3903862134377962
這是RedisPmpSub+++++++++++++++++0.5297280660628917           

繼續閱讀