jstorm開發指南-寫個簡單的jstorm應用
發表于 2015-07-18 | 分類于 大資料 | 暫無評論
jstorm 是阿裡巴巴開源的基于storm采用Java重寫的一套分布式實時流計算架構,使用簡單,特點如下:
- 開發非常迅速: 接口簡單,容易上手,隻要遵守Topology,Spout, Bolt的程式設計規範即可開發出一個擴充性極好的應用,底層rpc,worker之間備援,資料分流之類的動作完全不用考慮。
- 擴充性極好:當一級處理單元速度,直接配置一下并發數,即可線性擴充性能
- 健壯:當worker失效或機器出現故障時, 自動配置設定新的worker替換失效worker
- 資料準确性: 可以采用Acker機制,保證資料不丢失。 如果對精度有更多一步要求,采用事務機制,保證資料準确。
為什麼要選擇jstorm,而不采用twitter的storm呢?jstorm對比storm有如下優點:
- Nimbus 實作HA
- 徹底解決Storm雪崩問題:底層RPC采用netty + disruptor保證發送速度和接受速度是比對的
- 新增supervisor、Supervisor shutdown時、送出新任務,worker數不夠時,均不自動觸發任務rebalance
- 新topology不影響現有任務,新任務無需去搶占老任務的cpu,memory,disk和net
- 減少對ZK的通路量:去掉大量無用的watch;task的心跳時間延長一倍;Task心跳檢測無需全ZK掃描
- Worker 内部全流水線模式:Spout nextTuple和ack/fail運作在不同線程
- 性能:采用ZeroMq, 比storm快30%;采用netty時, 和storm快10%,并且穩定非常多
總之,Jstorm 比Storm 更穩定,功能更強大,更快。而且Storm上跑的程式可以一行代碼不變運作在Jstorm上,零成本,推薦所有使用storm的兄弟們搭建個jstorm叢集緩過來。
jstorm 叢集的搭建過程,可以參考另一篇文章:分布式實時日志系統(一)環境搭建之 Jstorm 叢集搭建過程/Jstorm叢集一鍵安裝部署
jstorm 開發執行個體
上面也說過了,jstorm使用起來很簡單,遵循Topology,Spout, Bolt的程式設計規範就可以,在下面的例子中将一步步完成這些。例子也很簡單,在spout中不斷産生自增的int數組,bolt接受到數值後列印出日志,并插入到hbase中。(如果沒有hbase環境的,這一步可以繼續注釋掉,不用打開,隻看到跑到日志列印的地方就好了)
spout 的開發隻需要繼承
BaseRichSpout
,實作繼承的方法即可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| public class TestSpout extends BaseRichSpout {
private static final Logger LOGGER = LoggerFactory.getLogger(TestSpout.class);
static AtomicInteger sAtomicInteger = new AtomicInteger(0);
static AtomicInteger pendNum = new AtomicInteger(0);
private int sqnum;
SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
sqnum = sAtomicInteger.incrementAndGet();
this.collector = collector;
}
@Override
public void nextTuple() {
while (true) {
int a = pendNum.incrementAndGet();
LOGGER.info(String.format("spount %d,pendNum %d", sqnum, a));
this.collector.emit(new Values("xxxxx:"+a));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("log"));
}
/**
* 啟用 ack 機制,詳情參考:https://github.com/alibaba/jstorm/wiki/Ack-%E6%9C%BA%E5%88%B6
* @param msgId
*/
@Override
public void ack(Object msgId) {
super.ack(msgId);
}
/**
* 消息處理失敗後需要自己處理
* @param msgId
*/
@Override
public void fail(Object msgId) {
super.fail(msgId);
LOGGER.info("ack fail,msgId"+msgId);
}
}
|
bolt 同理,繼承
BaseRichBolt
實作其相應的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| public class TestBolt extends BaseRichBolt {
private static final Logger LOGGER = CustomerLoggerFactory.LOGGER(TestBolt.class);
OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String xx = input.getString(0);
LOGGER.info(String.format("receive from spout ,num is : %d", xx));
// 發送ack資訊告知spout 完成處理的消息 ,如果下面的hbase的注釋代碼打開了,則必須等到插入hbase完畢後才能發送ack資訊,這段代碼需要删除
this.collector.ack(input);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
|
topology 的開發同理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| public class TestTopology implements ILogTopology {
@Override
public void start(Properties properties) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, IOException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("testspout", new TestSpout(), 1);
builder.setBolt("testbolt", new TestBolt(), 2).shuffleGrouping("testspout");
Config conf = ConfigUtils.getStormConfig(properties);
conf.setNumAckers(1);
StormSubmitter.submitTopology("testtopology", conf, builder.createTopology());
System.out.println("storm cluster will start");
}
}
|
經過上面的三個步驟,一個最簡單的jstorm應用就開發完成了,接下來通過編譯、打包完後,生成jar檔案
jstorm-hbase-demo-0.1.jar
,将此jar檔案在jstorm叢集的nimbus機器上送出即可:
jstorm jar jstorm-hbase-demo-0.1.jar com.xirong.demo.BootStrap config.properties
。
demo運作效果
從jstorm叢集的監控圖賞可以看到,對應topology的運作情況:
jstorm開發指南-寫個簡單的jstorm應用jstorm開發指南-寫個簡單的jstorm應用jstorm 開發執行個體 bolt 的執行效率,及ack數量,占用機器記憶體等:
jstorm開發指南-寫個簡單的jstorm應用jstorm開發指南-寫個簡單的jstorm應用jstorm 開發執行個體 源碼已經上傳到github上面,喜歡研究的同學,可以fork後自己修改練習。位址為:https://github.com/xirong/jstorm-hbase-demo
源碼中使用到的Phoenix元件,hbase上層的中間件,使得開發人員可以使用sql的方式來對hbase進行相應的操作,感興趣的可以閱讀:使用Phoenix通過sql語句更新操作hbase資料 ,此文中介紹了如何安裝及使用。
另外想對hbase的有所了解的可以檢視:列式存儲hbase系統架構學習
原文http://www.ixirong.com/2015/07/18/develop-the-first-jstorm-demo/