springboot 版本2.2.1.RELEASE
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
redis的參數配置
spring.redis.port=6379
spring.redis.host=127.0.0.1
spring.redis.database=0
spring.redis.password=你的密碼
關于redis的釋出訂閱,可檢視這個文章,這裡不再贅述
這裡打算從demo展示以及源碼解析兩個步驟來分析
demo展示
@Service
public class RedisPubSubManager {
private final String channel = "REDIS_CHANNEL";
@Autowired
private RedisMessageListenerContainer redisMessageListenerContainer;
@Autowired
private StringRedisTemplate redisTemplate;
@PostConstruct
public void init() {
MessageListenerAdapter adapter = new MessageListenerAdapter(this);
adapter.setSerializer(new StringRedisSerializer());
adapter.afterPropertiesSet();
redisMessageListenerContainer.addMessageListener(adapter, new PatternTopic(channel));
}
//這裡是往指定的channel上發送消息
public void sendMsg(String msg) {
redisTemplate.convertAndSend(channel, msg);
}
//這個是redis 生産者發送消息之後的回調 預設情況下 方法名是handleMessage
public void handleMessage(String msg) {
System.out.println("廣播消息收到訂閱資訊," + msg);
//這裡會受到生産者的消息,可在這個方法内進行業務處理
}
}
@Configuration
public class Config {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory){
//redisConnectionFactory對象在autoconfigurate時已經被spring容器注入為bean,這裡可以直接拿來用
RedisMessageListenerContainer container=new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
return container;
}
}
至此一個簡單的Demo已經搭建完成。本地發送消息或redis-cli發送消息至指定的channel,這裡的
handleMessage
均可以收到消息。
源碼分析
從
MessageListener
開始來看
/**
* Listener of messages published in Redis.
* redis中釋出消息的監聽器
*/
public interface MessageListener {
/**
* Callback for processing received objects through Redis.
*
* @param message message
* @param pattern pattern matching the channel (if specified) - can be null
*/
void onMessage(Message message, byte[] pattern);
}
通過上面的描述大緻可以看出 onMessage方法是處理接收消息的回調
它的一個實作是
MessageListenerAdapter
,先看下構造函數
public MessageListenerAdapter(Object delegate) {
initDefaultStrategies();
setDelegate(delegate);
}
public void setDelegate(Object delegate) {
Assert.notNull(delegate, "Delegate must not be null");
this.delegate = delegate;
}
結合下我們Demo中的實作
這裡的delegate是我們的RedisPubSubManager類 這點切記
檢視下onMessage的實作
public void onMessage(Message message, byte[] pattern) {
try {
//this指MessageListenerAdapter 而delegate是RedisPubSubManager
if (delegate != this) {
//并且RedisPubSubManager 不是繼承或實作自MessageListener
if (delegate instanceof MessageListener) {
((MessageListener) delegate).onMessage(message, pattern);
return;
}
}
// 擷取發送的消息
Object convertedMessage = extractMessage(message);
//擷取發送的頻道channel
String convertedChannel = stringSerializer.deserialize(pattern);
// Invoke the handler method with appropriate arguments.
Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };
invokeListenerMethod(invoker.getMethodName(), listenerArguments);
} catch (Throwable th) {
handleListenerException(th);
}
}
invoker對象的初始化是在afterPropertiesSet方法中,我們看下
public void afterPropertiesSet() {
//這裡的methodName是handleMessage,這也是為什麼我們Demo中回調方法叫handleMessage
String methodName = getDefaultListenerMethod();
if (!StringUtils.hasText(methodName)) {
throw new InvalidDataAccessApiUsageException("No default listener method specified: "
+ "Either specify a non-null value for the 'defaultListenerMethod' property or "
+ "override the 'getListenerMethodName' method.");
}
invoker = new MethodInvoker(delegate, methodName);
}
invokeListenerMethod方法
protected void invokeListenerMethod(String methodName, Object[] arguments) {
try {
//arguments有兩個參數,分别是消息内容,channel名稱
invoker.invoke(arguments);
} catch (InvocationTargetException ex) {
Throwable targetEx = ex.getTargetException();
if (targetEx instanceof DataAccessException) {
throw (DataAccessException) targetEx;
} else {
throw new RedisListenerExecutionFailedException("Listener method '" + methodName + "' threw exception",
targetEx);
}
} catch (Throwable ex) {
throw new RedisListenerExecutionFailedException("Failed to invoke target method '" + methodName
+ "' with arguments " + ObjectUtils.nullSafeToString(arguments), ex);
}
}
void invoke(Object[] arguments) throws InvocationTargetException, IllegalAccessException {
//arguments有兩個參數,分别是消息内容,channel名稱
//這裡message 是消息内容
Object[] message = new Object[] { arguments[0] };
//methods即初始化時加載進來的handleMessage
for (Method m : methods) {
//擷取handlemessage的參數類型,由于我們隻有一個參數String message
//是以這裡type是String
Class<?>[] types = m.getParameterTypes();
//如果參數數目是2,而且第一個參數的類型可以轉換為第一個參數的類,
//第二個參數的類型可以轉換為第二個參數的類,那麼args=arguments,否則args=消息内容
//如java.lang.String isInstance "test"
Object[] args = //
types.length == 2 //
&& types[0].isInstance(arguments[0]) //
&& types[1].isInstance(arguments[1]) ? arguments : message;
//如果類型和值不比對 則continue
if (!types[0].isInstance(args[0])) {
continue;
}
//這裡依賴反射 執行我們聲明的handleMessage方法
m.invoke(delegate, args);
return;
}
}
以上,即可實作一個簡單的基于springboot的redis釋出訂閱功能