天天看点

基于springboot的redis发布订阅

springboot 版本2.2.1.RELEASE
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
</dependency>
           

redis的参数配置

spring.redis.port=6379
spring.redis.host=127.0.0.1
spring.redis.database=0
spring.redis.password=你的密码
           

关于redis的发布订阅,可查看这个文章,这里不再赘述

这里打算从demo展示以及源码解析两个步骤来分析

demo展示

@Service
public class RedisPubSubManager {


    private final String channel = "REDIS_CHANNEL";
    @Autowired
    private RedisMessageListenerContainer redisMessageListenerContainer;


    @Autowired
    private StringRedisTemplate redisTemplate;

    @PostConstruct
    public void init() {
        MessageListenerAdapter adapter = new MessageListenerAdapter(this);
        adapter.setSerializer(new StringRedisSerializer());
        adapter.afterPropertiesSet();
        redisMessageListenerContainer.addMessageListener(adapter, new PatternTopic(channel));
    }

	//这里是往指定的channel上发送消息
    public void sendMsg(String msg) {
        redisTemplate.convertAndSend(channel, msg);
    }
    
    //这个是redis 生产者发送消息之后的回调 默认情况下 方法名是handleMessage
    public void handleMessage(String msg) {
        System.out.println("广播消息收到订阅信息," + msg);
        //这里会受到生产者的消息,可在这个方法内进行业务处理
    }

}
           
@Configuration
public class Config {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory){
    //redisConnectionFactory对象在autoconfigurate时已经被spring容器注入为bean,这里可以直接拿来用
        RedisMessageListenerContainer container=new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        return container;
    }

}
           

至此一个简单的Demo已经搭建完成。本地发送消息或redis-cli发送消息至指定的channel,这里的

handleMessage

均可以收到消息。

源码分析

MessageListener

开始来看

/**
 * Listener of messages published in Redis.
 * redis中发布消息的监听器
 */
public interface MessageListener {

	/**
	 * Callback for processing received objects through Redis.
	 * 
	 * @param message message
	 * @param pattern pattern matching the channel (if specified) - can be null
	 */
	void onMessage(Message message, byte[] pattern);
}
           

通过上面的描述大致可以看出 onMessage方法是处理接收消息的回调

它的一个实现是

MessageListenerAdapter

,先看下构造函数

public MessageListenerAdapter(Object delegate) {
		initDefaultStrategies();
		setDelegate(delegate);
	}
public void setDelegate(Object delegate) {
		Assert.notNull(delegate, "Delegate must not be null");
		this.delegate = delegate;
	}
           

结合下我们Demo中的实现

这里的delegate是我们的RedisPubSubManager类 这点切记

查看下onMessage的实现

public void onMessage(Message message, byte[] pattern) {
		try {
			
			//this指MessageListenerAdapter 而delegate是RedisPubSubManager
			if (delegate != this) {
			//并且RedisPubSubManager 不是继承或实现自MessageListener
				if (delegate instanceof MessageListener) {
					((MessageListener) delegate).onMessage(message, pattern);
					return;
				}
			}

			// 获取发送的消息
			Object convertedMessage = extractMessage(message);
			//获取发送的频道channel
			String convertedChannel = stringSerializer.deserialize(pattern);
			// Invoke the handler method with appropriate arguments.
			Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };

			invokeListenerMethod(invoker.getMethodName(), listenerArguments);
		} catch (Throwable th) {
			handleListenerException(th);
		}
	}
           

invoker对象的初始化是在afterPropertiesSet方法中,我们看下

public void afterPropertiesSet() {
		//这里的methodName是handleMessage,这也是为什么我们Demo中回调方法叫handleMessage
		String methodName = getDefaultListenerMethod();

		if (!StringUtils.hasText(methodName)) {
			throw new InvalidDataAccessApiUsageException("No default listener method specified: "
					+ "Either specify a non-null value for the 'defaultListenerMethod' property or "
					+ "override the 'getListenerMethodName' method.");
		}
	
		invoker = new MethodInvoker(delegate, methodName);
	}
           

invokeListenerMethod方法

protected void invokeListenerMethod(String methodName, Object[] arguments) {
		try {
		//arguments有两个参数,分别是消息内容,channel名称
			invoker.invoke(arguments);
		} catch (InvocationTargetException ex) {
			Throwable targetEx = ex.getTargetException();
			if (targetEx instanceof DataAccessException) {
				throw (DataAccessException) targetEx;
			} else {
				throw new RedisListenerExecutionFailedException("Listener method '" + methodName + "' threw exception",
						targetEx);
			}
		} catch (Throwable ex) {
			throw new RedisListenerExecutionFailedException("Failed to invoke target method '" + methodName
					+ "' with arguments " + ObjectUtils.nullSafeToString(arguments), ex);
		}
	}
           
void invoke(Object[] arguments) throws InvocationTargetException, IllegalAccessException {
			//arguments有两个参数,分别是消息内容,channel名称
			//这里message 是消息内容
			Object[] message = new Object[] { arguments[0] };
			
			//methods即初始化时加载进来的handleMessage
			for (Method m : methods) {
				//获取handlemessage的参数类型,由于我们只有一个参数String message
				//所以这里type是String
				Class<?>[] types = m.getParameterTypes();
				//如果参数数目是2,而且第一个参数的类型可以转换为第一个参数的类,
				//第二个参数的类型可以转换为第二个参数的类,那么args=arguments,否则args=消息内容
				//如java.lang.String isInstance "test"
				Object[] args = //
				types.length == 2 //
						&& types[0].isInstance(arguments[0]) //
						&& types[1].isInstance(arguments[1]) ? arguments : message;
				//如果类型和值不匹配 则continue
				if (!types[0].isInstance(args[0])) {
					continue;
				}
				//这里依赖反射 执行我们声明的handleMessage方法
				m.invoke(delegate, args);

				return;
			}
		}
           

以上,即可实现一个简单的基于springboot的redis发布订阅功能

继续阅读