天天看點

利用Redis keyspace notification 實作定時執行Redis Keyspace Notifications(鍵空間回調通知):

需求場景: 

比如當使用者在商城下單之後, 對于半小時未支付的訂單進行自動取消, 再例如,商品定時上架,下架等需求。

解決方案: 

  1. 使用排程架構,或者背景線程對資料庫或者其他存儲DB進行輪詢,找出需要處理的資料,進行排程
  2. 使用MQ延遲隊列, 比如Rocket 隊列的延遲消息, 再者通過自己實作的僞延遲消息, 如部落格前面文章Rabbit延遲隊列實作
  3. 使用redis的key event 政策,對key失效事件訂閱,進行處理。

優缺點:

  1. 排程方式,優點是實作最簡單,簡單粗暴,弊端也是非常大,受資料量影響,對大資料量輪詢,給資料庫造成壓力,輪詢時間間隔太短,容易造成job堆積,且暫時不考慮業務處理阻塞情況。
  2. MQ隊列,優點,相對比較穩定,不會對資料庫造成壓力,時間控制比較精确,隻需要做好消費者的消費能力保證即可。推薦。當環境沒有MQ時可以使用第三種方案替代。
  3. Redis, 優點,不會對資料庫造成壓力,時間控制比較精确,缺點:redis作為緩存資料庫,那麼首先從程式運作角度來說,資料主要是存儲在檔案資料庫,如MYSQL,而對于redis作用,更多是把其放在一個不影響程式業務邏輯的位置,簡單的比喻:假如redis伺服器挂掉,那麼程式是否會執行不下去,或者程式業務無法正常執行? 是以筆者認為此時redis的穩定性并沒有保障(注意:沒有保障并不是說在redis本身運作穩定性,而是對于程式業務來說),另外,redis key event 實作方式是通過訂閱頻道接收推送消息,那麼分布式情況下,就必須要做好相應的防止重複處理措施。 

Redis Keyspace Notifications(鍵空間回調通知):

鍵空間通知使得用戶端可以通過訂閱頻道或模式, 來接收那些以某種方式改動了 Redis 資料集的事件。事件通過 Redis 的訂閱與釋出功能(pub/sub)來進行分發, 是以所有支援訂閱與釋出功能的用戶端都可以在無須做任何修改的情況下, 直接使用鍵空間通知功能。

因為開啟鍵空間通知功能需要消耗一些 CPU , 是以在預設配置下, 該功能處于關閉狀态。可以通過修改 

redis.conf

 檔案, 或者直接使用 

CONFIG SET

 指令來開啟或關閉鍵空間通知功能(notify-keyspace-events)。

notify-keyspace-events, 當參數值為空串表示關閉, 不為空串為開啟, 具體可配置參數值如下:

字元 發送的通知

K

鍵空間通知,所有通知以 

[email protected]<db>__

 為字首

E

鍵事件通知,所有通知以 

[email protected]<db>__

 為字首

g

DEL

 、 

EXPIRE

 、 

RENAME

 等類型無關的通用指令的通知

$

字元串指令的通知

l

清單指令的通知

s

集合指令的通知

h

哈希指令的通知

z

有序集合指令的通知

x

過期事件:每當有過期鍵被删除時發送

e

驅逐(evict)事件:每當有鍵因為 

maxmemory

 政策而被删除時發送

A

參數 

g$lshzxe

 的别名

相關參考文章   英文    中文翻譯

   一句話概括: Redis提供了對存儲的KEY 一些事件回調通知功能, 比如KEY 過期,删除等, 回調的方式通過訂閱指定頻道, 接受回調消息,  由于此功能會消耗一些CPU, 預設是關閉的, 可以通過修改配置檔案或者直接在在用戶端使用指令進行修改開啟。

實作思路: 1.  開啟redis的key事件回調功能,并配置回調類型為KEY 失效事件 2.  編寫程式訂閱對應的頻道, 接受回調。 3.  接受到回調,使用分布式鎖控制重複執行控制,分發到對應的業務。

接下來,主要是操作過程以及示例, 隻會提供部分主要代碼樣例, 具體請下載下傳例子自行檢視代碼, 如有不妥,敬請指出, 筆者野路子出身菜鳥一枚,多多指教:

首先,修改Redis 配置檔案, 配置參數并重新開機redis,使配置生效:

notify-keyspace-events Ex
           
利用Redis keyspace notification 實作定時執行Redis Keyspace Notifications(鍵空間回調通知):

開啟配置之後redis将會在key失效時,将消息發送到[email protected]<db>__:expired頻道,其中db代表key所在資料庫的index, 比如目前項目使用的dbIndex 是0 ,那麼發送的頻道topic為 [email protected]__:expired    如果你想訂閱所有的dbindex 那麼訂閱配置應該改為通配符方式, topic為 [email protected]*__:expired  那麼你将接受到所有的dbindex中的key過期回調。

到此時很多同學應該就可以完全自己實作, 當然也可以參考下筆者的實作:  

此處有個小細節需要注意: 接收到redis推送的key失效消息, 消息内容裡面隻會有key的值, 但是并沒有key所對應的value, 比如set name zhangsan  那麼收到的消息内容隻能擷取到name ,  并不能擷取到zhangsan, 開始我也很不了解為什麼不把value一起推送過來,仔細一想,這樣貌似也是很合理的,加入一個key存儲的一個巨大無比的value,那麼将value一起推送到訂閱者,後果可想而知。

由此看來redis擷取到key雖然能夠拿到對應的事件,但想要實作定時任務不同類型業務不同處理,還需要做一些文章将失效的key和業務資料關聯起來, 或者實作一個key中存儲自定義data的功能, 既然使用了redis ,那麼實作起來也是非常簡單 , 定義一個map, 将失效的key作為field,業務自定義data作為field的value, 通過key來擷取對應value即可。

redis配置類, 主要是連結配置,以及訂閱配置:

配置檔案: application.properties

#tomcat
server.port=8099
server.session-timeout=1800
server.context-path=/redis/timer

#redis
spring.redis.hostName=xxx
spring.redis.port=6379  
spring.redis.password=xxx
spring.redis.pool.maxActive=128  
spring.redis.pool.maxWait=-1  
spring.redis.pool.maxIdle=-1  
spring.redis.pool.minIdle=0  
spring.redis.timeout=30000
spring.redis.dbIndex=1
           

配置類:

@Configuration
@ConfigurationProperties(prefix = "spring.redis")
public class RedisConfiguration {

	private static Logger logger = Logger.getLogger(RedisConfiguration.class);
	
	private String hostName;

	private int port;

	private String password;

	private int timeout;
	
	private int dbIndex;
	
	@Bean
	public JedisPoolConfig getRedisConfig(){
		JedisPoolConfig config = new JedisPoolConfig();
		return config;
	}
	
	@Bean
	@ConfigurationProperties(prefix = "spring.redis")
	public JedisPool getJedisPool(){
		JedisPoolConfig config = getRedisConfig();
		JedisPool pool = new JedisPool(config,hostName,port,timeout,password,dbIndex);
		logger.info("init JredisPool ...");
		return pool;
	}
	
    @Bean  
    @ConfigurationProperties(prefix = "spring.redis")
    public JedisConnectionFactory getConnectionFactory(){  
        JedisConnectionFactory factory = new JedisConnectionFactory();  
        JedisPoolConfig config = getRedisConfig();  
        factory.setPoolConfig(config);  
        logger.info("JedisConnectionFactory bean init success.");  
        return factory;  
    }  
    
    @Autowired
    private KeyEventMessageListener keyEventMessageListener;//key event 訂閱監聽
    
    @Bean
    public RedisMessageListenerContainer getRedisMessageListenerContainer(){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(getConnectionFactory());
        Topic topic = new ChannelTopic("[email protected]"+ getDbIndex() +"__:expired");
        System.out.println("Redis db index : " + getDbIndex());
        container.addMessageListener(keyEventMessageListener,topic);
        return container;
    }
    

	public String getHostName() {
		return hostName;
	}

	public void setHostName(String hostName) {
		this.hostName = hostName;
	}

	public int getPort() {
		return port;
	}

	public void setPort(int port) {
		this.port = port;
	}

	public String getPassword() {
		return password;
	}

	public void setPassword(String password) {
		this.password = password;
	}

	public int getTimeout() {
		return timeout;
	}

	public void setTimeout(int timeout) {
		this.timeout = timeout;
	}

	public int getDbIndex() {
		return dbIndex;
	}

	public void setDbIndex(int dbIndex) {
		this.dbIndex = dbIndex;
	}
	
}
           

上面主要配置的連接配接池和工廠, 訂閱 topic 為 [email protected]<dbIndex>__:expired   頻道, 其中dbIndex 通過配置檔案中的db拼接, 具體訂閱類KeyEventMessageListener類, 再看KeyEventMessageListener類之前,先說明下幾個常量類:

EventConstants類, 主要是用來定義事件相關常量

/**
 * 事件相關常量
 * @author victor
 *
 */
public class EventConstants {
	
	public static final String KEY_EVENT_AUTO_ID = "KEY_EVENT_AUTO_ID"; // event 計數器, 用于生成EVENT_ID

	public static final String KEY_EVENT_DATA_MAP = "KEY_EVENT_DATA_MAP"; //用來存儲使用者自定義data的 Hash
	
	public static final String EVENT_KEY = "KEY_EVENT"; // 用來做event redis唯一辨別字首
	
	public static final String EVENT_DATA_KEY = "DATA";// hash  中存儲data字首
	
	public static final String DEFAULT_EMPTY_DATA = "KEY_EVENT_DATA_EMPTY"; // 空data時常量替換串
	
	public static final String EVENT_META_KEY = "META";//用來存儲 事件類型字首
}
           

Event 枚舉類, 用來區分事件類型:

/**
 * 事件類型枚舉
 * @author victor
 *
 */
public enum EventEnum implements IndexedEnum<EventEnum>{
	
	EXAMPLE(1,"example","示例事件");
	
	EventEnum(int index,String code,String name){
		this.index = index;
		this.name = name;
		this.code = code;
	}

	private int index;
	
	private String name;
	
	private String code;
	
	
	private static final ImmutableMap<Integer, EventEnum> INDEXS = IndexedEnumUtil.toIndexes(values());
	
	public static EventEnum indexOf(int index){
		return INDEXS.get(index);
	}


	public int getIndex() {
		return index;
	}

	public String getName() {
		return name;
	}

	public String getCode() {
		return code;
	}
	
}
           

消息注冊Service以及接口:負責注冊事件,移出事件等實作:

/**
 * 事件服務接口
 * @author victor
 *
 */
public interface IEventService {

	/**
	 * 注冊事件,相同僚件不會覆寫,注冊成功将傳回全局唯一事件ID
	 * @param event  事件
	 * @param data	業務資料
	 * @return 事件ID,全局唯一
	 */
	public String register(EventEnum event,String data);
	
	/**
	 * 注冊事件,相同僚件不會覆寫,注冊成功将傳回全局唯一事件ID
	 * @param event  事件
	 * @param data	業務資料
	 * @param seconds 事件執行時間, 機關秒
	 * @return 事件ID,全局唯一
	 */
	public String register(EventEnum event,String data,int seconds);
	
	/**
	 * 注冊事件,相同僚件不會覆寫,注冊成功将傳回全局唯一事件ID
	 * @param event  事件
	 * @param data	業務資料
	 * @param seconds 事件執行時間
	 * @return 事件ID,全局唯一
	 */
	public String register(EventEnum event,String data,Date date);
	
	
	/**
	 * 移除事件,如果事件已經被執行,移除無意義,如果事件未執行,則傳回移除成功或者失敗
	 * @param eventId  事件ID
	 * @return 是否移除成功,true 表示成功,false表示失敗(如事件正在被執行)
	 */
	public boolean remove(String eventId);
	
}
           

實作類:

@Service
public class EventServiceImpl implements IEventService{
	
	private static Logger logger = Logger.getLogger(EventServiceImpl.class);
	
	@Autowired
	private IRedisService redisService;
	
	@Autowired
	private IDistributedLock distributedLock;

	@Override
	public String register(EventEnum event, String data) {
		return register(event, data,1);
	}

	@Override
	public String register(EventEnum event, String data, int seconds) {
		seconds = seconds <= 0 ? 1 : seconds;
		data = StringUtils.isNullOrEmpty(data) ? EventConstants.DEFAULT_EMPTY_DATA : data;
		String eventId = String.valueOf(redisService.incyby(EventConstants.KEY_EVENT_AUTO_ID));
		Map<String,String> hash = new HashMap<>();
		hash.put(EventUtils.getDataKey(eventId), data);
		hash.put(EventUtils.getMetaKey(eventId), event.getIndex() + "");
		String res = redisService.hmset(EventConstants.KEY_EVENT_DATA_MAP,hash);
		if(StringUtils.isNullOrEmpty(data) || StringUtils.isNullOrEmpty(hash) || StringUtils.isNullOrEmpty(res)){
			logger.error("Set " + EventConstants.KEY_EVENT_DATA_MAP + " error . eventId:" + eventId + ",data: " + data + ",meta:" + event.getIndex());
		}
		redisService.setex(EventUtils.getEventKey(eventId), event.getIndex() + "", seconds);
		logger.info("register event :" + eventId);
		/*ThreadPoolUtil.executor(ThreadPoolEnum.KEY_EVENT_REGISTER).execute(new KeyEventRegisterRunable(redisService,eventId,event,data,seconds));*/
		return eventId;
	}

	@Override
	public String register(EventEnum event, String data, Date date) {
		int seconds = DateUtils.calculateDifferenceSeconds(date, new Date());
		return register(event, data,seconds);
	}

	@Override
	public boolean remove(String eventId) {
		if(distributedLock.lock(eventId, 5)){
			redisService.del(EventUtils.getEventKey(eventId));
			redisService.hdel(EventConstants.KEY_EVENT_DATA_MAP, EventUtils.getDataKey(eventId),EventUtils.getMetaKey(eventId));
			distributedLock.unlock(eventId);
		}
		logger.info("Remove key event error : event is executed!");
		return false;
	}

}
           

實作比較簡單:

注冊事件: 根據事件類型,向redis中新增key, 并設定相關過期時間,然向hash中插入使用者自定義data.

移出事件: 直接删除事件, 防止删除事件同時事件正在執行,加鎖

訂閱監聽類:

/**
 * Key event 頻道訂閱<br/>
 * 需要redis版本2.8以上,并且修改conf配置檔案參數: <cite>notify-keyspace-events Ex</cite><br/>
 * 開啟配置之後redis将會在key失效時,将消息發送到<cite>[email protected]<db>__:expired</cite>頻道,其中db代表key所在資料庫的index
 * @author victor
 *
 */
@Component
@Configuration
public class KeyEventMessageListener implements MessageListener{
	
	private static Logger logger = Logger.getLogger(KeyEventMessageListener.class);
	
	private final RedisSerializer<String> serializer = new StringRedisSerializer();
	
	@Autowired
	private EventHandlerServiceFacotry eventHandlerServiceFacotry;
	
	@Autowired
	private IRedisService redisService;
	
	@Autowired
	private IRedisLock redisLock;
	
	@Override
	public void onMessage(Message message, byte[] pattern) {
		String key = serializer.deserialize(message.getBody());
		String eventId = EventUtils.getEventId(key);
		if(!StringUtils.isNullOrEmpty(eventId)){
			if(redisLock.lock(eventId, 300)){//must executed in 300S
				logger.info("Handle event :" + eventId + " start !");
				try{
					handler(eventId);
				}catch (Exception ex) {
					logger.error("Key event hadppend bug : eventId:" + eventId + ", message :" + ex.getMessage() , ex);
					ex.printStackTrace();
				}finally{
					//delete data and meta
					redisService.del(EventUtils.getEventKey(eventId));
					redisService.hdel(EventConstants.KEY_EVENT_DATA_MAP, EventUtils.getDataKey(eventId),EventUtils.getMetaKey(eventId));
					redisLock.unlock(eventId);
				}
				logger.info("Handle event :" + eventId + " end!");
			}else{// event was handled or is handling
				logger.debug("Event is handled");
			}
		} 
	}
	
	private void handler(String eventId){
		String data = redisService.hget(EventConstants.KEY_EVENT_DATA_MAP,EventUtils.getDataKey(eventId));
		String meta = redisService.hget(EventConstants.KEY_EVENT_DATA_MAP,EventUtils.getMetaKey(eventId));
		if(StringUtils.isNullOrEmpty(data) || StringUtils.isNullOrEmpty(meta)){// event was handled
			logger.info("Event was handled, eventId:" + eventId + ".");
			return;
		}
		int event = Integer.parseInt(meta);
		data = data.equals(EventConstants.DEFAULT_EMPTY_DATA) ? null : data;
		IEventHandlerService service = eventHandlerServiceFacotry.instance(event);
		if(service == null){
			logger.error("Event handler not found, eventId:" + eventId + ",data:" + data + ",meta:" + meta + "!");
			throw new EventNotMappingHandlerException("Event handler service not mapping!",eventId,event,data);
		}
		try{
			service.handle(EventEnum.indexOf(event), eventId, data);
		}catch (Exception e) {
			logger.error("Handler event error, eventId : " + eventId, e);
			throw new EventHandleException("Handler event error.",eventId,event,data);
		}
	}
}
           

此類主要完成功能, 接受redis推送的訂閱消息, 并且根據key來找到對應的 事件類型, 事件data,  加鎖防止重複消費, 通過事件執行工廠根據事件類型擷取不同的事件最終業務service.

正常情況, 不同的事件可能各自都有自己不同的事件, 比如訂單取消事件 和 商品自動上架兩種事件執行的業務邏輯就不同, 是以此處使用多态動态擷取不同的事件業務實作執行個體。

事件業務工廠 EventHandlerServiceFactory類:

/**
 * 事件處理工廠
 * @author victor
 *
 */
@Service
public class EventHandlerServiceFacotry {
	
	private List<IEventHandlerService> services;
	
	@Autowired
	public EventHandlerServiceFacotry(List<IEventHandlerService> services){
		this.services = services;
	}
	
	public IEventHandlerService instance(int event){
		for(IEventHandlerService service : services){
			if(service.getEvent().getIndex() == event){
				return service;
			}
		}
		return null;
	}
	
	public IEventHandlerService instance(EventEnum event){
		return instance(event.getIndex());
	}
}
           

IEventHandlerService接口, 定義事件執行接口方法以及擷取時間類型接口:

/**
 * 事件觸發服務 自定義
 * @author victor
 *
 */
public interface IEventHandlerService {

	/**
	 * 執行
	 * @param eventId
	 * @param event
	 * @param data
	 */
	public void handle(EventEnum event,String eventId,String data);
	
	/**
	 * 事件類型
	 * @return
	 */
	public EventEnum getEvent();
}
           

EXAMPLE_EVENT 示例事件實作示例:

@Service
public class ExampleEventHandlerServiceImpl implements IEventHandlerService{
	
	@Override
	public void handle(EventEnum event, String eventId, String data) {
		System.out.println("執行事件:" + event.getName() + ",使用者自定義data:" + data);
	}

	@Override
	public EventEnum getEvent() {
		return EventEnum.EXAMPLE;
	}

}
           

輔助類EventUtils:

public final class EventUtils {

	private EventUtils(){
		
	}
	
	public static String getEventKey(String eventId){
		return EventConstants.EVENT_KEY + "_" + eventId;
	}
	
	public static String getDataKey(String eventId){
		return eventId + "_" + EventConstants.EVENT_DATA_KEY;
	}
	
	public static String getMetaKey(String eventId){
		return eventId + "_" + EventConstants.EVENT_META_KEY;
	}
	
	public static boolean isEvent(String key){
		if(!StringUtils.isNullOrEmpty(key) && key.toUpperCase().indexOf(EventConstants.EVENT_KEY + "_") == 0){
			return true;
		}
		return false;
	}
	
	
	public static String getEventId(String key){
		if(isEvent(key)){
			String eventId = key.replace(EventConstants.EVENT_KEY + "_", "");
			return eventId;
		}
		return null;
	}
	
}
           

上述代碼中, 部分類可能沒有, 比如redisService : 為redis操作指令封裝   distributionLock :分布式鎖,筆者部落格前面有自己實作的redis鎖。

調用注冊事件:注冊一個類型為EXAMPLE類型事件,自定義data為 hello,example字元串, 并且在60秒之後執行:

@Autowired
private  IEventService eventService;
           
eventService.register(EventEnum.EXAMPLE,"hello , example !" ,60);
           
利用Redis keyspace notification 實作定時執行Redis Keyspace Notifications(鍵空間回調通知):

如果增加一種事件, 即可添加枚舉,  并且針對此種事件,編寫對應的service,那麼接受到推送key之後會根據相應的事件類型來調用對應的實作service. 此處不再貼示例代碼, 筆者對目前代碼做過相應測試, 同時像redis注冊6000條事件, 并且指定執行時間為同一時間,同時運作2個項目模仿分布式, 運作結果:  無重複重複執行事件現象, 無丢事件情況, 上述筆者配置大概在1分鐘左右能夠執行完成。 執行能力跟redis配置相關, 上述針對redis操作效率, 線程數等代碼部分沒有做過多的優化。   點選下載下傳示例

武漢指玄網絡科技

繼續閱讀