天天看點

【spring boot】【redis】spring boot 內建redis的釋出訂閱機制

一.簡單介紹

1.redis的釋出訂閱功能,很簡單。

  消息釋出者和消息訂閱者互相不認得,也不關心對方有誰。

  消息釋出者,将消息發送給頻道(channel)。

  然後是由 頻道(channel)将消息發送給對自己感興趣的 消息訂閱者們,進行消費。

2.redis的釋出訂閱和專業的MQ相比較

  1>redis的釋出訂閱隻是最基本的功能,不支援持久化,消息釋出者将消息發送給頻道。如果沒有訂閱者消費,消息就丢失了。

  2>在消息釋出過程中,如果用戶端和伺服器連接配接逾時,MQ會有重試機制,事務復原等。但是Redis沒有提供消息傳輸保障。

  3>簡單的釋出訂閱可以使用redis,根據業務需求選擇。

二.spring boot 內建[spring boot 2.x]

1.pom.xml檔案

<!-- redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--spring2.0內建redis所需common-pool2-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>
        <!-- 使用redis的LUA腳本 需要序列化操作的jar-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>      

2.redis的config 

為redis添加消息擴充卡,綁定消息處理器

消息擴充卡 可以添加多個 

【spring boot】【redis】spring boot 內建redis的釋出訂閱機制
【spring boot】【redis】spring boot 內建redis的釋出訂閱機制
package com.sxd.swapping.config;

import com.sxd.swapping.redisReceiver.RedisReceiver;
import com.sxd.swapping.redisReceiver.RedisReceiver2;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;



/**
 * @author sxd
 * @date 2019/5/27 16:13
 */
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedisConfig {



    /**
     * redis消息監聽器容器
     * 可以添加多個監聽不同話題的redis監聽器,隻需要把消息監聽器和相應的消息訂閱處理器綁定,該消息監聽器
     * 通過反射技術調用消息訂閱處理器的相關方法進行一些業務處理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter,
                                            MessageListenerAdapter listenerAdapter2)
    {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //可以添加多個 messageListener
        //可以對 messageListener對應的擴充卡listenerAdapter  指定本擴充卡 适配的消息類型  是什麼
        //在釋出的地方 對應釋出的redisTemplate.convertAndSend("user",msg);  那這邊的就對應的可以消費到指定類型的 訂閱消息
        container.addMessageListener(listenerAdapter, new PatternTopic("user"));
        container.addMessageListener(listenerAdapter2, new PatternTopic("goods"));

        return container;
    }


    /**
     * 消息監聽器擴充卡,綁定消息處理器,利用反射技術調用消息處理器的業務方法
     *
     * receiveMessage 是預設監聽方法 一般不變
     * @param redisReceiver redis消息處理器,自定義的
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
        System.out.println("消息擴充卡1進來了");
        return new MessageListenerAdapter(redisReceiver, "receiveMessage");
    }


    /**
     * 消息監聽器擴充卡,綁定消息處理器,利用反射技術調用消息處理器的業務方法
     *
     * receiveMessage 是預設監聽方法 一般不變
     * @param redisReceiver2 redis消息處理器,自定義的
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter2(RedisReceiver2 redisReceiver2) {
        System.out.println("消息擴充卡2進來了");
        return new MessageListenerAdapter(redisReceiver2, "receiveMessage");
    }



    //使用預設的工廠初始化redis操作模闆
    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }


    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        RedisSerializer keySerializer = new StringRedisSerializer();
//        RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();
        //key采用字元串反序列化對象
        redisTemplate.setKeySerializer(keySerializer);
        //value也采用字元串反序列化對象
        //原因:管道操作,是對redis指令的批量操作,各個指令傳回結果可能類型不同
        //可能是 Boolean類型 可能是String類型 可能是byte[]類型 是以統一将結果按照String處理
        redisTemplate.setValueSerializer(keySerializer);
        return redisTemplate;
    }









}      

View Code

3.建立幾個消息處理器[處理消息訂閱者  接收到消息後的業務]

【spring boot】【redis】spring boot 內建redis的釋出訂閱機制
【spring boot】【redis】spring boot 內建redis的釋出訂閱機制
package com.sxd.swapping.redisReceiver;

import org.springframework.stereotype.Service;

/**
 *
 * redis 訂閱釋出  消息接收器/處理器
 * @author sxd
 * @date 2019/5/30 17:12
 */
@Service
public class RedisReceiver {

    public void receiveMessage(String message) {
        System.out.println("消息處理器1>我處理使用者資訊:"+message);
        //這裡是收到通道的消息之後執行的方法
        //此處執行接收到消息後的 業務邏輯
    }
}      
【spring boot】【redis】spring boot 內建redis的釋出訂閱機制
【spring boot】【redis】spring boot 內建redis的釋出訂閱機制
package com.sxd.swapping.redisReceiver;

import org.springframework.stereotype.Service;

/**
 * redis 訂閱釋出  消息接收器/處理器2
 * @author sxd
 * @date 2019/5/30 17:15
 */
@Service
public class RedisReceiver2 {

    public void receiveMessage(String message) {
        System.out.println("消息處理器2>我處理商品資訊:"+message);
        //這裡是收到通道的消息之後執行的方法
        //此處執行接收到消息後的 業務邏輯
    }
}      

4.消息釋出controller

【spring boot】【redis】spring boot 內建redis的釋出訂閱機制
【spring boot】【redis】spring boot 內建redis的釋出訂閱機制
@Autowired
    RedisTemplate redisTemplate;


    /**
     * redis 釋出訂閱pubsub
     */
    @RequestMapping(value = "/redisPubSub")
    public void redisPubSub(String msg){
        if (msg.contains("使用者")){
            redisTemplate.convertAndSend("user",msg);
        }else {
            redisTemplate.convertAndSend("goods",msg);
        }
    }      

測試:

發送請求:http://localhost:9666/redistest/redisPubSub?msg=使用者---德瑪西亞的使用者

結果:

消息處理器1>我處理使用者資訊:使用者---德瑪西亞的使用者

發送請求:http://localhost:9666/redistest/redisPubSub?msg=goods---德瑪西亞的用商品

消息處理器2>我處理商品資訊:goods---德瑪西亞的用商品

繼續閱讀