前面已經實作了第一版實時消息推送核心架構搭建,下面就在第一版基礎上加以更新完善。
使用技術架構:分布式排程 http://git.oschina.net/hugui/light-task-scheduler 請自行玩玩 這裡簡單介紹下。
light-task-scheduler 排程架構基本原理和第一版實作一樣不過這裡更加完善的是該架構實作了消息重發,當消息失敗消息會被重新儲存起來定期重發這裡可以具體了解下架構自身原理,該架構本身已是生産者-消費者模式 在更新版中主要使用到了 jobclient:産生消息 jobtracker:處理連接配接client和task tasktracer:負責處理任務 queue:消息隊列支援資料庫隊列 。在使用該架構的是時候需要打包jar在項目中引入。具體架構整合可以了解下架構本身。下面主要貼上消息生産 處理 以及自身使用redis作為消息存取配合任務排程使用
消息生産:
@Override
public void sendMessage(Message msMessage) {
//寫入redis緩存系統隊列
if(redisUtils.redisCheckStatus()){
String id=UUID.randomUUID().toString();
ArrayList<Message> messages=new ArrayList<Message>();
msMessage.setRedisstatus(1);//進入redis隊列
msMessage.setId(id);
//消息加入redis
redisUtils.addMessageQueue(id, msMessage);
Job job = new Job();
job.setTaskId(id);
job.setParam("messageid",id);
job.setTaskTrackerNodeGroup("message_trade_TaskTracker"); //
job.setNeedFeedback(true);
job.setReplaceOnExist(true); // 當任務隊列中存在這個任務的時候,是否替換更新
job.setCronExpression(null);//立即執行
Response response = jobClient.submitJob(job);
logger.info("執行結果:"+response.getMsg());
}else{
msMessage.setRedisstatus(0);//未進入redis隊列
}
//寫入資料庫 防止消息丢失
try {
Session session=getHibernateTemplate().getSessionFactory().openSession();
Transaction transaction=session.beginTransaction();
session.save(msMessage);
transaction.commit();
session.close();
} catch (HibernateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
消息處理:
@Override
public Result run(Job job) throws Throwable {
Result result=null;
try {
// TODO 業務邏輯
logger.error("我要執行:" + job);
String messageid=job.getParam("messageid");
boolean backresult=true;
if(!StringUtils.isEmpty(messageid)){
Message message=redisUtils.getMessageQueue(messageid);
if(message!=null){
backresult=sendMsg(message);//發送消息
if(backresult){
result=new Result(Action.EXECUTE_SUCCESS, "推送成功");
}else{
result=new Result(Action.EXECUTE_LATER, "推送失敗,稍後重新推送");
}
}
}
} catch (Exception e) {
logger.info("Run job failed!", e);
result=new Result(Action.EXECUTE_LATER, e.getMessage());
}
return result;
}
注意:
<1.消息發送失敗需要傳回 new Result(Action.EXECUTE_LATER, "推送失敗,稍後重新推送"); 通知任務排程該任務需要重發
<2.redis消息取出使用pop該消息就會重redis中删除 注意。。。
更新主要解決自行定時任務,以及消息發送失敗重發機制實作。下面把相關代碼貼上 沒有jar包
dubbo-demo-provider:消息處理系統 dubbo-demo-consumer:測試端