天天看點

賊好用,冰河開源了這款精準定時任務和延時隊列架構!!

項目簡述

Mykit體系中提供的簡單、穩定、可擴充的延遲消息隊列架構,提供精準的定時任務和延遲隊列處理功能。

項目子產品說明

  • mykit-delay-common: mykit-delay 延遲消息隊列架構通用工具子產品,提供全局通用的工具類
  • mykit-delay-config: mykit-delay 延遲消息隊列架構通用配置子產品,提供全局配置
  • mykit-delay-queue:  mykit-delay 延遲消息隊列架構核心實作子產品,目前所有主要的功能都在此子產品實作
  • mykit-delay-controller: mykit-delay 延遲消息隊列架構Restful接口實作子產品,對外提供Restful接口通路,相容各種語言調用
  • mykit-delay-core: mykit-delay 延遲消息隊列架構的入口,整個架構的啟動程式在此子產品實作
  • mykit-delay-test: mykit-delay 延遲消息隊列架構通用測試子產品,主要提供Junit單元測試用例

需求背景

  • 使用者下訂單後未支付,30分鐘後支付逾時
  • 在某個時間點通知使用者參加系統活動
  • 業務執行失敗之後隔10分鐘重試一次

類似的場景比較多 簡單的處理方式就是使用定時任務 假如資料比較多的時候 有的資料可能延遲比較嚴重,而且越來越多的定時業務導緻任務排程很繁瑣不好管理。

隊列設計

整體架構設計如下圖所示。

賊好用,冰河開源了這款精準定時任務和延時隊列架構!!

開發前需要考慮的問題

  • 及時性 消費端能按時收到
  • 同一時間消息的消費權重
  • 可靠性 消息不能出現沒有被消費掉的情況
  • 可恢複 假如有其他情況 導緻消息系統不可用了 至少能保證資料可以恢複
  • 可撤回 因為是延遲消息 沒有到執行時間的消息支援可以取消消費
  • 高可用 多執行個體 這裡指HA/主備模式并不是多執行個體同時一起工作
  • 消費端如何消費

當然初步選用redis作為資料緩存的主要原因是因為redis自身支援zset的資料結構(score 延遲時間毫秒) 這樣就少了排序的煩惱而且性能還很高,正好我們的需求就是按時間次元去判定執行的順序 同時也支援map list資料結構。

簡單定義一個消息資料結構

private String topic;/***topic**/
private String id;/***自動生成 全局唯一 snowflake**/
private String bizKey;
private long delay;/***延時毫秒數**/
private int priority;//優先級
private long ttl;/**消費端消費的ttl**/
private String body;/***消息體**/
private long createTime=System.currentTimeMillis();
private int status= Status.WaitPut.ordinal();      

運作原理:

  • 用Map來存儲中繼資料。id作為key,整個消息結構序列化(json/…)之後作為value,放入元消息池中。
  • 将id放入其中(有N個)一個zset有序清單中,以createTime+delay+priority作為score。修改狀态為正在延遲中
  • 使用timer實時監控zset有序清單中top 10的資料 。如果資料score<=目前時間毫秒就取出來,根據topic重新放入一個新的可消費清單(list)中,在zset中删除已經取出來的資料,并修改狀态為待消費
  • 用戶端擷取資料隻需要從可消費隊列中擷取就可以了。并且狀态必須為待消費 運作時間需要<=目前時間的 如果不滿足 重新放入zset清單中,修改狀态為正在延遲。如果滿足修改狀态為已消費。或者直接删除中繼資料。

用戶端

因為涉及到不同程式語言的問題,是以目前預設支援http通路方式。

  • 添加延時消息添加成功之後傳回消費唯一ID POST /push {…..消息體}
  • 删除延時消息 需要傳遞消息ID GET /delete?id=
  • 恢複延時消息 GET /reStore?expire=true|false expire是否恢複已過期未執行的消息。
  • 恢複單個延時消息 需要傳遞消息ID GET /reStore/id
  • 擷取消息 需要長連接配接 GET /get/topic

用nginx暴露服務,配置為輪詢 在添加延遲消息的時候就可以流量平均配置設定。

目前系統中用戶端并沒有采用HTTP長連接配接的方式來消費消息,而是采用MQ的方式來消費資料這樣用戶端就可以不用關心延遲消息隊列。隻需要在發送MQ的時候攔截一下 如果是延遲消息就用延遲消息系統處理。

消息可恢複

實作恢複的原理 正常情況下一般都是記錄日志,比如mysql的binlog等。

這裡我們直接采用mysql資料庫作為記錄日志。

目前建立以下2張表:

  • 消息表 字段包括整個消息體
  • 消息流轉表 字段包括消息ID、變更狀态、變更時間、zset掃描線程Name、host/ip

定義zset掃描線程Name是為了更清楚的看到消息被分發到具體哪個zset中。前提是zset的key和監控zset的線程名稱要有點關系 這裡也可以是zset key。

支援消息恢複

假如redis伺服器當機了,重新開機之後發現資料也沒有了。是以這個恢複是很有必要的,隻需要從表1也就是消息表中把消息狀态不等于已消費的資料全部重新分發到延遲隊列中去,然後同步一下狀态就可以了。

當然恢複單個任務也可以這麼幹。

資料表設計

這裡,我就直接給出建立資料表的SQL語句。

DROP TABLE IF EXISTS `mykit_delay_queue_job`;
CREATE TABLE `mykit_delay_queue_job` (
  `id` varchar(128) NOT NULL,
  `bizkey` varchar(128) DEFAULT NULL,
  `topic` varchar(128) DEFAULT NULL,
  `subtopic` varchar(250) DEFAULT NULL,
  `delay` bigint(20) DEFAULT NULL,
  `create_time` bigint(20) DEFAULT NULL,
  `body` text,
  `status` int(11) DEFAULT NULL,
  `ttl` int(11) DEFAULT NULL,
  `update_time` datetime(3) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `mykit_delay_queue_job_ID_STATUS` (`id`,`status`),
  KEY `mykit_delay_queue_job_STATUS` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for mykit_delay_queue_job_log
-- ----------------------------
DROP TABLE IF EXISTS `mykit_delay_queue_job_log`;
CREATE TABLE `mykit_delay_queue_job_log` (
  `id` varchar(128) NOT NULL,
  `status` int(11) DEFAULT NULL,
  `thread` varchar(60) DEFAULT NULL,
  `update_time` datetime(3) DEFAULT NULL,
  `host` varchar(128) DEFAULT NULL,
  KEY `mykit_delay_queue_job_LOG_ID_STATUS` (`id`,`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;      

關于高可用

分布式協調還是選用zookeeper。

如果有多個執行個體最多同時隻能有1個執行個體工作 這樣就避免了分布式競争鎖帶來的壞處,當然如果業務需要多個執行個體同時工作也是支援的,也就是一個消息最多隻能有1個執行個體處理,可以選用zookeeper或者redis就能實作分布式鎖了。

最終做了一下測試多執行個體同時運作,可能因為會涉及到鎖的問題性能有所下降,反而單機效果很好。是以比較推薦基于docker的主備部署模式。

運作模式

  • 支援 master,slave (HA)需要配置

    mykit.delay.registry.serverList

    zk叢集位址清單
  • 支援 cluster 會涉及到分布式鎖競争 效果不是很明顯  分布式鎖采用

    redis

    setNx

    實作
  • StandAlone

目前,經過測試,推薦使用master slave的模式,後期會優化Cluster模式

如何接入

為了提供一個統一的精準定時任務和延時隊列架構,mykit-delay提供了HTTP Rest接口供其他業務系統調用,接口使用簡單友善,隻需要簡單的調用接口,傳遞相應的參數即可。

消息體

以JSON資料格式參數 目前隻提供了

http

協定

  • body                    業務消息體
  • delay                   延時毫秒 距

    createTime

    的間隔毫秒數
  • id                         任務ID 系統自動生成 任務建立成功傳回
  • status                  狀态 預設不填寫
  • topic                     标題
  • subtopic               保留字段
  • ttl                          保留字段
  • createTime           建立任務時間 非必填 系統預設

添加任務

/push  
    POST application/json
{"body":"{hello world}","delay":10000,"id":"20","status":0,"topic":"ces","subtopic":"",ttl":12}      

删除任務

删除任務 需要記錄一個JobId

/delete?jobId=xxx
   GET      

恢複單個任務

用于任務錯亂 腦裂情況 根據日志恢複任務

/reStoreJob?JobId=xxx
   GET      

恢複所有未完成的任務

根據日志恢複任務

/reStore?expire=true
   GET      

參數

expire

表示是否需要恢複已過期還未執行的資料

清空隊列資料

根據日志中未完成的資料清空隊列中全部資料。清空之後 會删除緩存中的所有任務

/clearAll
 GET      

用戶端擷取隊列方式

目前預設實作了

RocketMQ

ActiveMQ

的推送方式。依賴MQ的方式來實作延時架構與具體業務系統的耦合。

消息體中消息與

RocketMQ

ActiveMQ

消息字段對應關系

mykit-delay RocketMQ ActiveMQ 備注
topic 點對點發送隊列名稱或者主題名稱
subtopic 點對點發送隊列子名稱或者主題子名稱
body 消息内容

關于系統配置

延遲架構與具體執行業務系統的互動方式通過延遲架構配置實作,具體配置檔案位置為mykit-delay-config項目下的

resources/properties/starter.properties

檔案中。

測試

需要配置好資料庫位址和Redis的位址 如果不是單機模式 也需要配置好Zookeeper

運作mykit-delay-test子產品下的測試類

io.mykit.delay.test.PushTest

添加任務到隊列中

啟動mykit-delay-test子產品下的

io.mykit.delay.TestDelayQueue

消費前面添加資料 為了友善查詢效果 預設的消費方式是

consoleCQ

控制台輸出

擴充

支援zset隊列個數可配置 避免大資料帶來高延遲的問題。

近期規劃

  • 分區(buck)支援動态設定
  • redis與資料庫資料一緻性的問題 (

    重要

  • 實作自己的推拉機制
  • 支援可切換實作方式 目前隻是依賴Redis實作,後續待優化
  • 支援Web控制台管理隊列
  • 實作消息消費

    TTL

    機制

如果這款開源架構對你有幫助,請小夥伴們打開github連結:

https://github.com/sunshinelyz/mykit-delay

 ,給個Star,讓更多的小夥伴看到,減輕工作中繁瑣的掃描資料表的定時任務開發。也希望能夠有越來越多的小夥伴參與這個開源項目,我們一起養肥它!!

好了,不早了,今天就到這兒吧,我是冰河,我們下期見!!