天天看點

基于springboot的redis釋出訂閱

springboot 版本2.2.1.RELEASE
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
</dependency>
           

redis的參數配置

spring.redis.port=6379
spring.redis.host=127.0.0.1
spring.redis.database=0
spring.redis.password=你的密碼
           

關于redis的釋出訂閱,可檢視這個文章,這裡不再贅述

這裡打算從demo展示以及源碼解析兩個步驟來分析

demo展示

@Service
public class RedisPubSubManager {


    private final String channel = "REDIS_CHANNEL";
    @Autowired
    private RedisMessageListenerContainer redisMessageListenerContainer;


    @Autowired
    private StringRedisTemplate redisTemplate;

    @PostConstruct
    public void init() {
        MessageListenerAdapter adapter = new MessageListenerAdapter(this);
        adapter.setSerializer(new StringRedisSerializer());
        adapter.afterPropertiesSet();
        redisMessageListenerContainer.addMessageListener(adapter, new PatternTopic(channel));
    }

	//這裡是往指定的channel上發送消息
    public void sendMsg(String msg) {
        redisTemplate.convertAndSend(channel, msg);
    }
    
    //這個是redis 生産者發送消息之後的回調 預設情況下 方法名是handleMessage
    public void handleMessage(String msg) {
        System.out.println("廣播消息收到訂閱資訊," + msg);
        //這裡會受到生産者的消息,可在這個方法内進行業務處理
    }

}
           
@Configuration
public class Config {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory){
    //redisConnectionFactory對象在autoconfigurate時已經被spring容器注入為bean,這裡可以直接拿來用
        RedisMessageListenerContainer container=new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        return container;
    }

}
           

至此一個簡單的Demo已經搭建完成。本地發送消息或redis-cli發送消息至指定的channel,這裡的

handleMessage

均可以收到消息。

源碼分析

MessageListener

開始來看

/**
 * Listener of messages published in Redis.
 * redis中釋出消息的監聽器
 */
public interface MessageListener {

	/**
	 * Callback for processing received objects through Redis.
	 * 
	 * @param message message
	 * @param pattern pattern matching the channel (if specified) - can be null
	 */
	void onMessage(Message message, byte[] pattern);
}
           

通過上面的描述大緻可以看出 onMessage方法是處理接收消息的回調

它的一個實作是

MessageListenerAdapter

,先看下構造函數

public MessageListenerAdapter(Object delegate) {
		initDefaultStrategies();
		setDelegate(delegate);
	}
public void setDelegate(Object delegate) {
		Assert.notNull(delegate, "Delegate must not be null");
		this.delegate = delegate;
	}
           

結合下我們Demo中的實作

這裡的delegate是我們的RedisPubSubManager類 這點切記

檢視下onMessage的實作

public void onMessage(Message message, byte[] pattern) {
		try {
			
			//this指MessageListenerAdapter 而delegate是RedisPubSubManager
			if (delegate != this) {
			//并且RedisPubSubManager 不是繼承或實作自MessageListener
				if (delegate instanceof MessageListener) {
					((MessageListener) delegate).onMessage(message, pattern);
					return;
				}
			}

			// 擷取發送的消息
			Object convertedMessage = extractMessage(message);
			//擷取發送的頻道channel
			String convertedChannel = stringSerializer.deserialize(pattern);
			// Invoke the handler method with appropriate arguments.
			Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };

			invokeListenerMethod(invoker.getMethodName(), listenerArguments);
		} catch (Throwable th) {
			handleListenerException(th);
		}
	}
           

invoker對象的初始化是在afterPropertiesSet方法中,我們看下

public void afterPropertiesSet() {
		//這裡的methodName是handleMessage,這也是為什麼我們Demo中回調方法叫handleMessage
		String methodName = getDefaultListenerMethod();

		if (!StringUtils.hasText(methodName)) {
			throw new InvalidDataAccessApiUsageException("No default listener method specified: "
					+ "Either specify a non-null value for the 'defaultListenerMethod' property or "
					+ "override the 'getListenerMethodName' method.");
		}
	
		invoker = new MethodInvoker(delegate, methodName);
	}
           

invokeListenerMethod方法

protected void invokeListenerMethod(String methodName, Object[] arguments) {
		try {
		//arguments有兩個參數,分别是消息内容,channel名稱
			invoker.invoke(arguments);
		} catch (InvocationTargetException ex) {
			Throwable targetEx = ex.getTargetException();
			if (targetEx instanceof DataAccessException) {
				throw (DataAccessException) targetEx;
			} else {
				throw new RedisListenerExecutionFailedException("Listener method '" + methodName + "' threw exception",
						targetEx);
			}
		} catch (Throwable ex) {
			throw new RedisListenerExecutionFailedException("Failed to invoke target method '" + methodName
					+ "' with arguments " + ObjectUtils.nullSafeToString(arguments), ex);
		}
	}
           
void invoke(Object[] arguments) throws InvocationTargetException, IllegalAccessException {
			//arguments有兩個參數,分别是消息内容,channel名稱
			//這裡message 是消息内容
			Object[] message = new Object[] { arguments[0] };
			
			//methods即初始化時加載進來的handleMessage
			for (Method m : methods) {
				//擷取handlemessage的參數類型,由于我們隻有一個參數String message
				//是以這裡type是String
				Class<?>[] types = m.getParameterTypes();
				//如果參數數目是2,而且第一個參數的類型可以轉換為第一個參數的類,
				//第二個參數的類型可以轉換為第二個參數的類,那麼args=arguments,否則args=消息内容
				//如java.lang.String isInstance "test"
				Object[] args = //
				types.length == 2 //
						&& types[0].isInstance(arguments[0]) //
						&& types[1].isInstance(arguments[1]) ? arguments : message;
				//如果類型和值不比對 則continue
				if (!types[0].isInstance(args[0])) {
					continue;
				}
				//這裡依賴反射 執行我們聲明的handleMessage方法
				m.invoke(delegate, args);

				return;
			}
		}
           

以上,即可實作一個簡單的基于springboot的redis釋出訂閱功能

繼續閱讀