目錄
-
- 目錄
- 前言
- 正文
- 題目分析
- 拓撲設計
- 代碼展示
- 關鍵優化
- 後記
前言
跳過廢話,直接看正文
通過上一篇的配置,我們就搭建好了初賽所需要的開發及測試的環境。接下來就是對賽題進行分析、拓撲設計以及碼代碼了。
正文
題目分析
賽題官方介紹
初賽的任務就是寫一個topology來完成對大量實時交易資訊的簡單計算,整個任務流程就是利用JStorm從RocketMQ拉取資料,按照要求實時計算出結果,并将結果寫入Tair。
拓撲設計
在這個過程中嘗試了許多種拓撲結構,最終的結構如下圖所示:

從最後的結果來看,這種拓撲并不算很好。在初賽結束後與其他選手進行交流發現前幾名的拓撲結構都設計的簡單巧妙,是以了解到拓撲的流水線不能太長,否則消息線上程間的傳遞會占用大量時間,進而使得最後的平均處理時間變長。
代碼展示
拓撲的結構代碼如下:
package com.alibaba.middleware.race.jstorm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.middleware.race.Constants;
import com.alibaba.middleware.race.RaceConfig;
import com.alibaba.middleware.race.util.FileUtil;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* 這是一個很簡單的例子
* 選手的拓撲送出到叢集,我們是有逾時設定的。每個選手的拓撲最多跑20分鐘,一旦超過這個時間
* 我們會将選手拓撲殺掉。
*/
/**
* 選手拓撲入口類,我們定義必須是com.alibaba.middleware.race.jstorm.RaceTopology
* 因為我們背景對選手的git進行下載下傳打包
* ,拓撲運作的入口類預設是com.alibaba.middleware.race.jstorm.RaceTopology; 是以這個主類路徑一定要正确
*/
public class RaceTopology {
private static Logger LOG = LoggerFactory.getLogger(RaceTopology.class);
/** Spout **/
private static final int AllSpoutParallelism = ;
public static final String ALLSPOUT = "AllSpout";
/** Platform Distinguish **/
private static final int PlatformParallelism = ;
public static final String PLATFORMBOLT = "PlatformBolt";
public static final String TMPAYSTREAM = "TMPayStream";
public static final String TBPAYSTREAM = "TBPayStream";
public static final String ALLPAYSTREAM = "AllPayStream";
/** PayMsgPartSum **/
private static final int PayMsgPartSumParallelism = ;
public static final String PAY_MSG_PART_SUM_BOLT = "PayMsgPartSum";
/** Writer Bolt **/
private static final int TMCounterWriterParallelism = ;
public static final String TMCOUNTERWRITERBOLT = "TMCounterWriter";
private static final int TBCounterWriterParallelism = ;
public static final String TBCOUNTERWRITERBOLT = "TBCounterWriter";
private static final int RatioWriterParallelism = ;
public static final String RATIOWRITERBOLT = "RatioWriter";
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
/** Spout **/
builder.setSpout(ALLSPOUT, new AllSpout(), AllSpoutParallelism);
/** Bolts receive tuples form spout **/
builder.setBolt(PLATFORMBOLT, new PlatformDistinguish(),
PlatformParallelism).fieldsGrouping(ALLSPOUT,
new Fields("orderID"));
/** tm/tb Writer Bolt **/
builder.setBolt(TMCOUNTERWRITERBOLT, new TMCounterWriter(),
TMCounterWriterParallelism).fieldsGrouping(PLATFORMBOLT,
TMPAYSTREAM, new Fields("time"));
builder.setBolt(TBCOUNTERWRITERBOLT, new TBCounterWriter(),
TBCounterWriterParallelism).fieldsGrouping(PLATFORMBOLT,
TBPAYSTREAM, new Fields("time"));
/** ratio related **/
builder.setBolt(PAY_MSG_PART_SUM_BOLT, new PayMsgPartSum(),
PayMsgPartSumParallelism).fieldsGrouping(PLATFORMBOLT,
ALLPAYSTREAM, new Fields("time"));
builder.setBolt(RATIOWRITERBOLT, new NewRatioWriter(),
RatioWriterParallelism).globalGrouping(PAY_MSG_PART_SUM_BOLT);
String topologyName = RaceConfig.JstormTopologyName;
Config conf = new Config();
conf.setNumWorkers();
conf.setNumAckers();
// conf.setMessageTimeoutSecs(90);
// conf.setMaxSpoutPending(RaceConfig.SpoutMaxPending);
try {
StormSubmitter.submitTopology(topologyName, conf,
builder.createTopology());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
檢視完整代碼
關鍵優化
- 根據日志分析拓撲的瓶頸,進而多次改變拓撲結構
- 減輕Spout工作量,并改為支援多線程操作, 這個優化十分重要,将性能提升了2-3倍
- message去重(由于RocketMQ的消息在被多線程消費時是不安全的,是以需要在代碼中進行去重),這個優化将最後的準确率提升到了100%
- 其他還有一些優化手段,都是根據日志來進行調整的,并不是太關鍵
後記
通過初賽學到了實時流資料計算架構的一些基本知識,了解到了淘寶雙十一後端的一些技術。
從學習新技術的角度出發,我認為在初賽學到的東西比複賽還要多一些。年輕人嘛,不能總是用自己熟悉的東西,還是要多多去學習和擁抱新技術的,不然就真的是名副其實的碼農了。