一.簡單介紹
1.redis的釋出訂閱功能,很簡單。
消息釋出者和消息訂閱者互相不認得,也不關心對方有誰。
消息釋出者,将消息發送給頻道(channel)。
然後是由 頻道(channel)将消息發送給對自己感興趣的 消息訂閱者們,進行消費。
2.redis的釋出訂閱和專業的MQ相比較
1>redis的釋出訂閱隻是最基本的功能,不支援持久化,消息釋出者将消息發送給頻道。如果沒有訂閱者消費,消息就丢失了。
2>在消息釋出過程中,如果用戶端和伺服器連接配接逾時,MQ會有重試機制,事務復原等。但是Redis沒有提供消息傳輸保障。
3>簡單的釋出訂閱可以使用redis,根據業務需求選擇。
二.spring boot 內建[spring boot 2.x]
1.pom.xml檔案
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--spring2.0內建redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<!-- 使用redis的LUA腳本 需要序列化操作的jar-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2.redis的config
為redis添加消息擴充卡,綁定消息處理器
消息擴充卡 可以添加多個
package com.sxd.swapping.config;
import com.sxd.swapping.redisReceiver.RedisReceiver;
import com.sxd.swapping.redisReceiver.RedisReceiver2;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @author sxd
* @date 2019/5/27 16:13
*/
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedisConfig {
/**
* redis消息監聽器容器
* 可以添加多個監聽不同話題的redis監聽器,隻需要把消息監聽器和相應的消息訂閱處理器綁定,該消息監聽器
* 通過反射技術調用消息訂閱處理器的相關方法進行一些業務處理
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
MessageListenerAdapter listenerAdapter2)
{
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//可以添加多個 messageListener
//可以對 messageListener對應的擴充卡listenerAdapter 指定本擴充卡 适配的消息類型 是什麼
//在釋出的地方 對應釋出的redisTemplate.convertAndSend("user",msg); 那這邊的就對應的可以消費到指定類型的 訂閱消息
container.addMessageListener(listenerAdapter, new PatternTopic("user"));
container.addMessageListener(listenerAdapter2, new PatternTopic("goods"));
return container;
}
/**
* 消息監聽器擴充卡,綁定消息處理器,利用反射技術調用消息處理器的業務方法
*
* receiveMessage 是預設監聽方法 一般不變
* @param redisReceiver redis消息處理器,自定義的
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
System.out.println("消息擴充卡1進來了");
return new MessageListenerAdapter(redisReceiver, "receiveMessage");
}
/**
* 消息監聽器擴充卡,綁定消息處理器,利用反射技術調用消息處理器的業務方法
*
* receiveMessage 是預設監聽方法 一般不變
* @param redisReceiver2 redis消息處理器,自定義的
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter2(RedisReceiver2 redisReceiver2) {
System.out.println("消息擴充卡2進來了");
return new MessageListenerAdapter(redisReceiver2, "receiveMessage");
}
//使用預設的工廠初始化redis操作模闆
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(factory);
RedisSerializer keySerializer = new StringRedisSerializer();
// RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();
//key采用字元串反序列化對象
redisTemplate.setKeySerializer(keySerializer);
//value也采用字元串反序列化對象
//原因:管道操作,是對redis指令的批量操作,各個指令傳回結果可能類型不同
//可能是 Boolean類型 可能是String類型 可能是byte[]類型 是以統一将結果按照String處理
redisTemplate.setValueSerializer(keySerializer);
return redisTemplate;
}
}
View Code
3.建立幾個消息處理器[處理消息訂閱者 接收到消息後的業務]
package com.sxd.swapping.redisReceiver;
import org.springframework.stereotype.Service;
/**
*
* redis 訂閱釋出 消息接收器/處理器
* @author sxd
* @date 2019/5/30 17:12
*/
@Service
public class RedisReceiver {
public void receiveMessage(String message) {
System.out.println("消息處理器1>我處理使用者資訊:"+message);
//這裡是收到通道的消息之後執行的方法
//此處執行接收到消息後的 業務邏輯
}
}
package com.sxd.swapping.redisReceiver;
import org.springframework.stereotype.Service;
/**
* redis 訂閱釋出 消息接收器/處理器2
* @author sxd
* @date 2019/5/30 17:15
*/
@Service
public class RedisReceiver2 {
public void receiveMessage(String message) {
System.out.println("消息處理器2>我處理商品資訊:"+message);
//這裡是收到通道的消息之後執行的方法
//此處執行接收到消息後的 業務邏輯
}
}
4.消息釋出controller
@Autowired
RedisTemplate redisTemplate;
/**
* redis 釋出訂閱pubsub
*/
@RequestMapping(value = "/redisPubSub")
public void redisPubSub(String msg){
if (msg.contains("使用者")){
redisTemplate.convertAndSend("user",msg);
}else {
redisTemplate.convertAndSend("goods",msg);
}
}
測試:
發送請求:http://localhost:9666/redistest/redisPubSub?msg=使用者---德瑪西亞的使用者
結果:
消息處理器1>我處理使用者資訊:使用者---德瑪西亞的使用者
發送請求:http://localhost:9666/redistest/redisPubSub?msg=goods---德瑪西亞的用商品
消息處理器2>我處理商品資訊:goods---德瑪西亞的用商品