天天看點

jstorm開發指南-寫個簡單的jstorm應用jstorm開發指南-寫個簡單的jstorm應用jstorm 開發執行個體

jstorm開發指南-寫個簡單的jstorm應用

發表于 2015-07-18   |   分類于 大資料   |   暫無評論

jstorm 是阿裡巴巴開源的基于storm采用Java重寫的一套分布式實時流計算架構,使用簡單,特點如下:

  • 開發非常迅速: 接口簡單,容易上手,隻要遵守Topology,Spout, Bolt的程式設計規範即可開發出一個擴充性極好的應用,底層rpc,worker之間備援,資料分流之類的動作完全不用考慮。
  • 擴充性極好:當一級處理單元速度,直接配置一下并發數,即可線性擴充性能
  • 健壯:當worker失效或機器出現故障時, 自動配置設定新的worker替換失效worker
  • 資料準确性: 可以采用Acker機制,保證資料不丢失。 如果對精度有更多一步要求,采用事務機制,保證資料準确。

為什麼要選擇jstorm,而不采用twitter的storm呢?jstorm對比storm有如下優點:

  • Nimbus 實作HA
  • 徹底解決Storm雪崩問題:底層RPC采用netty + disruptor保證發送速度和接受速度是比對的
  • 新增supervisor、Supervisor shutdown時、送出新任務,worker數不夠時,均不自動觸發任務rebalance
  • 新topology不影響現有任務,新任務無需去搶占老任務的cpu,memory,disk和net
  • 減少對ZK的通路量:去掉大量無用的watch;task的心跳時間延長一倍;Task心跳檢測無需全ZK掃描
  • Worker 内部全流水線模式:Spout nextTuple和ack/fail運作在不同線程
  • 性能:采用ZeroMq, 比storm快30%;采用netty時, 和storm快10%,并且穩定非常多

總之,Jstorm 比Storm 更穩定,功能更強大,更快。而且Storm上跑的程式可以一行代碼不變運作在Jstorm上,零成本,推薦所有使用storm的兄弟們搭建個jstorm叢集緩過來。

jstorm 叢集的搭建過程,可以參考另一篇文章:分布式實時日志系統(一)環境搭建之 Jstorm 叢集搭建過程/Jstorm叢集一鍵安裝部署

jstorm 開發執行個體

上面也說過了,jstorm使用起來很簡單,遵循Topology,Spout, Bolt的程式設計規範就可以,在下面的例子中将一步步完成這些。例子也很簡單,在spout中不斷産生自增的int數組,bolt接受到數值後列印出日志,并插入到hbase中。(如果沒有hbase環境的,這一步可以繼續注釋掉,不用打開,隻看到跑到日志列印的地方就好了)

spout 的開發隻需要繼承

BaseRichSpout

,實作繼承的方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
      
public class TestSpout extends BaseRichSpout {
 private static final Logger LOGGER = LoggerFactory.getLogger(TestSpout.class);
 static AtomicInteger sAtomicInteger = new AtomicInteger(0);
 static AtomicInteger pendNum = new AtomicInteger(0);
 private int sqnum;
 SpoutOutputCollector collector;

 @Override
 public void open(Map conf, TopologyContext context,
 SpoutOutputCollector collector) {
 sqnum = sAtomicInteger.incrementAndGet();
 this.collector = collector;
 }

 @Override
 public void nextTuple() {
 while (true) {
 int a = pendNum.incrementAndGet();
 LOGGER.info(String.format("spount %d,pendNum %d", sqnum, a));
 this.collector.emit(new Values("xxxxx:"+a));

 try {
 Thread.sleep(10000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 declarer.declare(new Fields("log"));

 }

 /**
 * 啟用 ack 機制,詳情參考:https://github.com/alibaba/jstorm/wiki/Ack-%E6%9C%BA%E5%88%B6
 * @param msgId
 */
 @Override
 public void ack(Object msgId) {
 super.ack(msgId);
 }

 /**
 * 消息處理失敗後需要自己處理
 * @param msgId
 */
 @Override
 public void fail(Object msgId) {
 super.fail(msgId);
 LOGGER.info("ack fail,msgId"+msgId);
 }

}
      

bolt 同理,繼承 

BaseRichBolt

 實作其相應的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
      
public  class TestBolt extends BaseRichBolt {

 private static final Logger LOGGER = CustomerLoggerFactory.LOGGER(TestBolt.class);
 OutputCollector collector;

 @Override
 public void prepare(Map stormConf, TopologyContext context,
 OutputCollector collector) {
 this.collector = collector;
 }

 @Override
 public void execute(Tuple input) {
 String xx = input.getString(0);
 LOGGER.info(String.format("receive from spout ,num is : %d", xx));

 // 發送ack資訊告知spout 完成處理的消息 ,如果下面的hbase的注釋代碼打開了,則必須等到插入hbase完畢後才能發送ack資訊,這段代碼需要删除
 this.collector.ack(input);
 try {
 Thread.sleep(10000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {

 }
}
      

topology 的開發同理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
      
public class TestTopology implements ILogTopology {
 @Override
 public void start(Properties properties) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, IOException {

 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout("testspout", new TestSpout(), 1);
 builder.setBolt("testbolt", new TestBolt(), 2).shuffleGrouping("testspout");

 Config conf = ConfigUtils.getStormConfig(properties);
 conf.setNumAckers(1);

 StormSubmitter.submitTopology("testtopology", conf, builder.createTopology());
 System.out.println("storm cluster will start");
 }

}
      

經過上面的三個步驟,一個最簡單的jstorm應用就開發完成了,接下來通過編譯、打包完後,生成jar檔案 

jstorm-hbase-demo-0.1.jar

 ,将此jar檔案在jstorm叢集的nimbus機器上送出即可:

jstorm jar jstorm-hbase-demo-0.1.jar com.xirong.demo.BootStrap config.properties

demo運作效果

從jstorm叢集的監控圖賞可以看到,對應topology的運作情況:

jstorm開發指南-寫個簡單的jstorm應用jstorm開發指南-寫個簡單的jstorm應用jstorm 開發執行個體

bolt 的執行效率,及ack數量,占用機器記憶體等:

jstorm開發指南-寫個簡單的jstorm應用jstorm開發指南-寫個簡單的jstorm應用jstorm 開發執行個體

源碼已經上傳到github上面,喜歡研究的同學,可以fork後自己修改練習。位址為:https://github.com/xirong/jstorm-hbase-demo

源碼中使用到的Phoenix元件,hbase上層的中間件,使得開發人員可以使用sql的方式來對hbase進行相應的操作,感興趣的可以閱讀:使用Phoenix通過sql語句更新操作hbase資料 ,此文中介紹了如何安裝及使用。

另外想對hbase的有所了解的可以檢視:列式存儲hbase系統架構學習

原文http://www.ixirong.com/2015/07/18/develop-the-first-jstorm-demo/