1、Storm是什麼
storm是twitter公司開源貢獻給apache的一款實時流式處理的一個開源軟體,主要用于解決資料的實時計算以及實時的處理等方面的問題
Storm 是一個人的
Hadoop 是一個人寫的
Lucene 是一個人寫的
Spark是一個學生團隊
Python 是一個人寫的
Javascript 是一個人的
Linux是一個人寫的
2012年橫空出世
Storm的特點
Storm是一個開源的分布式實時計算系統,可以簡單、可靠的處理大量的資料流。Storm有很多使用場景:如實時分析,線上機器學習,持續計算,分布式RPC,ETL等等。Storm支援水準擴充,具有高容錯性,保證每個消息都會得到處理,而且處理速度很快(在一個小叢集中,每個結點每秒可以處理數以百萬計的消息)。Storm的部署和運維都很便捷,而且更為重要的是可以使用任意程式設計語言來開發應用。
Storm有如下特點:
程式設計模型簡單
在大資料處理方面相信大家對hadoop已經耳熟能詳,基于Google Map/Reduce來實作的Hadoop為開發者提供了map、reduce原語,使并行批處理程式變得非常地簡單和優美。同樣,Storm也為大資料的實時計算提供了一些簡單優美的原語,這大大降低了開發并行實時處理的任務的複雜性,幫助你快速、高效的開發應用。
可擴充
在Storm叢集中真正運作topology的主要有三個實體:工作程序、線程和任務。Storm叢集中的每台機器上都可以運作多個工作程序,每個工作程序又可建立多個線程,每個線程可以執行多個任務,任務是真正進行資料處理的實體,我們開發的spout、bolt就是作為一個或者多個任務的方式執行的。
是以,計算任務在多個線程、程序和伺服器之間并行進行,支援靈活的水準擴充。
高可靠性
Storm可以保證spout發出的每條消息都能被“完全處理”,這也是直接差別于其他實時系統的地方,如S4。
請注意,spout發出的消息後續可能會觸發産生成千上萬條消息,可以形象的了解為一棵消息樹,其中spout發出的消息為樹根,Storm會跟蹤這棵消息樹的處理情況,隻有當這棵消息樹中的所有消息都被處理了,Storm才會認為spout發出的這個消息已經被“完全處理”。如果這棵消息樹中的任何一個消息處理失敗了,或者整棵消息樹在限定的時間内沒有“完全處理”,那麼spout發出的消息就會重發。
考慮到盡可能減少對記憶體的消耗,Storm并不會跟蹤消息樹中的每個消息,而是采用了一些特殊的政策,它把消息樹當作一個整體來跟蹤,對消息樹中所有消息的唯一id進行異或計算,通過是否為零來判定spout發出的消息是否被“完全處理”,這極大的節約了記憶體和簡化了判定邏輯,後面會對這種機制進行詳細介紹。
這種模式,每發送一個消息,都會同步發送一個ack/fail,對于網絡的帶寬會有一定的消耗,如果對于可靠性要求不高,可通過使用不同的emit接口關閉該模式。
上面所說的,Storm保證了每個消息至少被處理一次,但是對于有些計算場合,會嚴格要求每個消息隻被處理一次,幸而Storm的0.7.0引入了事務性拓撲,解決了這個問題,後面會有詳述。
高容錯性
如果在消息處理過程中出了一些異常,Storm會重新安排這個出問題的處理單元。Storm保證一個處理單元永遠運作(除非你顯式殺掉這個處理單元)。
當然,如果處理單元中存儲了中間狀态,那麼當處理單元重新被Storm啟動的時候,需要應用自己進行中間狀态的恢複。
支援多種程式設計語言
除了用java實作spout和bolt,你還可以使用任何你熟悉的程式設計語言來完成這項工作,這一切得益于Storm所謂的多語言協定。多語言協定是Storm内部的一種特殊協定,允許spout或者bolt使用标準輸入和标準輸出來進行消息傳遞,傳遞的消息為單行文本或者是json編碼的多行。
Storm支援多語言程式設計主要是通過ShellBolt, ShellSpout和ShellProcess這些類來實作的,這些類都實作了IBolt 和 ISpout接口,以及讓shell通過java的ProcessBuilder類來執行腳本或者程式的協定。
可以看到,采用這種方式,每個tuple在處理的時候都需要進行json的編解碼,是以在吞吐量上會有較大影響。
支援本地模式
Storm有一種“本地模式”,也就是在程序中模拟一個Storm叢集的所有功能,以本地模式運作topology跟在叢集上運作topology類似,這對于我們開發和測試來說非常有用。
高效
2、Storm的架構模型

1.Nimbus:負責資源配置設定和任務排程。新版本中的nimbus節點可以有多個,做主備
2.Supervisor:負責接受nimbus配置設定的任務,啟動和停止屬于自己管理的worker程序。
3.Worker:運作具體處理元件邏輯的程序。
Task:worker中每一個spout/bolt的線程稱為一個task. 在storm0.8之後,task不再與實體線程對應,同一個spout/bolt的task可能會共享一個實體線程,該線程稱為executor。最新版本的Jstorm已經廢除了task的概念。
3、Storm的安裝
三台機器運作服務規劃
運作服務\機器規劃 node1 node2 node3
Zookeeper版本 3.4.6
Zookeeper服務 是 是 是
Storm版本 Apache-storm-1.1.1
Nimbus服務 是(leader) 是 是
Supervisor服務 是 是 是
IP位址規劃 192.168.200.100 192.168.200.101 192.168.200.102
3.1三台機器安裝zookeeper服務
Node01配置檔案修改
修改zoo.cfg
dataDir=/export/servers/zookeeper-3.4.9/zkData/data
dataLogDir=/export/servers/zookeeper-3.4.9/zkData/log
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
修改myid
Node02 修改配置檔案
修改zoo.cfg
dataDir=/export/servers/zookeeper-3.4.9/zkData/data
dataLogDir=/export/servers/zookeeper-3.4.9/zkData/log
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
修改myid
node3修改配置檔案
修改zoo.cfg
bashdataDir=/export/servers/zookeeper-3.4.9/zkData/data
dataLogDir=/export/servers/zookeeper-3.4.9/zkData/log
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
在這裡插入代碼片
修改myid
三台伺服器啟動zookeeper服務
bin/zkServer.sh start
三台機器檢視zookeeper服務狀态
bin/zkServer.sh status
3.2 三台機器安裝storm叢集
1、上傳storm壓縮包
2、解壓安裝包
tar -zxvf apache-storm-1.1.1.tar.gz -C /export/servers/
3、重命名解壓目錄
mv apache-storm-1.1.1 storm
4、修改配置檔案
storm.zookeeper.servers:
- "node1"
- "node2"
- "node3"
nimbus.seeds: ["node1", "node2", "node3"]
storm.local.dir: "/export/servers/storm/stormdata"
ui.port: 8008
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
4、将storm安裝程式分發拷貝到另外兩台機器上
scp -r storm node2:/export/servers
scp -r storm node3:/export/servers
5、三台機器啟動storm服務
node1 啟動相關服務
啟動 nimbus程序
nohup bin/storm nimbus >/dev/null 2>&1 &
啟動supervisor
nohup bin/storm supervisor >/dev/null 2>&1 &
啟動web UI
nohup bin/storm ui >/dev/null 2>&1 &
啟動logViewer
nohup bin/storm logviewer >/dev/null 2>&1 &
node2啟動相關服務
nimbus:nohup bin/storm nimbus >/dev/null 2>&1 &
supervisor:nohup bin/storm supervisor >/dev/null 2>&1 &
logviewer:nohup bin/storm logviewer >/dev/null 2>&1 &
node3啟動相關服務
nimbus:nohup bin/storm nimbus >/dev/null 2>&1 &
supervisor:nohup bin/storm supervisor >/dev/null 2>&1 &
logviewer:nohup bin/storm logviewer >/dev/null 2>&1 &
4、Storm的UI界面管理
通路位址http://node1:8008
5、Storm的程式設計模型
6、Storm的入門程式
6.1、實作單次計數的統計
第一步:建立maven java 項目,導入jar包
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
第二步:開發我們的spout,随機選擇一些單詞發送到下一個bolt
public class RandomSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
Random rand;
/**
* Map conf 系統初始化讀取的配置檔案
*
* TopologyContext context 應用程式的上下文對象
*
* SpoutOutputCollector collector 用于接收spout輸出的資料
*
* 這個方法主要用于系統的初始化工作,例如連接配接kafka,讀取資料,連接配接mysql,或者連接配接redis等的初始化工作
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector; //初始化我們的系統當中的資料
rand = new Random();
}
/**
* storm架構當中 會一直調用nextTuple将資料不斷的往後發送,發送給下一個元件當中去
*/
@Override
public void nextTuple() {
//監控某個目錄下面所有的檔案,一旦發現新增的檔案,就将檔案給讀取完成,然後将檔案重命名
try {
String[] sentences = new String[]{ "my storm word count", "hello my storm", "hello storm hello world"};
String sentence = sentences[rand.nextInt(sentences.length)];
Thread.sleep(3000);
collector.emit(new Values(sentence));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("helloStorm"));
}
}
第三步:開發我們的SplitBolt 将我們的英文句子切割成一個個的單詞
public class SplitBolt extends BaseBasicBolt{
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String stringByField = input.getStringByField("helloStorm");
String[] split = stringByField.split(" ");
for (String string : split) {
collector.emit(new Values(string,1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","num"));
}
}
第四步:開發我們的計數器CountBolt
public class CountBolt extends BaseBasicBolt{
private Map<String, Integer> map = new HashMap<String,Integer>();
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String stringByField = input.getStringByField("word");
Integer integerByField = input.getIntegerByField("num");
if(map.containsKey(stringByField)){
map.put(stringByField, map.get(stringByField)+integerByField);
}else{
map.put(stringByField, integerByField);
}
System.out.println(map.toString());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
第五步:組裝我們的程式向storm叢集進行送出
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("mySpout", new RandomSpout(),2);
topologyBuilder.setBolt("splitBolt", new SplitBolt(),2).shuffleGrouping("mySpout");
topologyBuilder.setBolt("countBolt", new CountBolt(),2).shuffleGrouping("splitBolt");
Config config = new Config();
if(args.length > 0){
config.setDebug(false);
config.setNumWorkers(1);
config.setNumAckers(config, 5);
StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
}else{
config.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCount", config, topologyBuilder.createTopology());
}
}
}
7、Storm的并行度
config.setNumWorkers(1);
topologyBuilder.setSpout("mySpout", new RandomSpout(),3);
topologyBuilder.setBolt("splitBolt", new SplitBolt(),3).shuffleGrouping("mySpout");
topologyBuilder.setBolt("countBolt", new CountBolt(),3).setNumTasks(4).shuffleGrouping("splitBolt");
Storm當中的worker,executor,task之間的互相關系
Worker:表示一個程序
Executor:表示由worker啟動的線程
一個worker隻會負責一個topology任務,不會出現一個worker負責多個topology任務的情況。
一個worker程序當中,可以啟動多個線程executor,也就是說,一個worker程序可以對應多個executor線程
task 是實際執行資料處理的最小工作單元(注意,task 并不是線程) —— 在你的代碼中實作的每個 spout 或者 bolt 都會在叢集中運作很多個 task。在拓撲的整個生命周期中每個元件的 task 數量都是保持不變的,不過每個元件的 executor 數量卻是有可能會随着時間變化。在預設情況下 task 的數量是和 executor 的數量一樣的,也就是說,預設情況下 Storm 會在每個線程上運作一個 task
注:調整task的數量,并不能夠實際上提高storm的并行度,因為storm不管是spout還是bolt當中的代碼都是串行執行的,就算一個executor對應多個task,這多個task也是串行去執行executor當中的代碼,是以這個調整task的個數,實際上并不能提高storm的并行度
在實際工作當中,由于spout與bolt的數量不能夠精準确定,是以需要随時調整spout與bolt的數量,是以在storm當中,我們可以通過指令來動态的進行調整
storm rebalance mytopo -n 3 -e mySpout=5 -e splitBolt=6 -e countBolt=8
一定要注意:重新調整的時候=号兩邊不要有空格
8、Storm的分發政策
Storm當中的分組政策,一共有八種:
所謂的grouping政策就是在Spout與Bolt、Bolt與Bolt之間傳遞Tuple的方式。總共有八種方式:
1)shuffleGrouping(随機分組)随機分組;将tuple随機配置設定到bolt中,能夠保證各task中處理的資料均衡;
2)fieldsGrouping(按照字段分組,在這裡即是同一個單詞隻能發送給一個Bolt)
按字段分組; 根據設定的字段相同值得tuple被配置設定到同一個bolt進行處理;
舉例:builder.setBolt(“mybolt”, new MyStoreBolt(),5).fieldsGrouping(“checkBolt”,new Fields(“uid”));
說明:該bolt由5個任務task執行,相同uid的元組tuple被配置設定到同一個task進行處理;該task接收的元祖字段是mybolt發射出的字段資訊,不受uid分組的影響。該分組不僅友善統計而且還可以通過該方式保證相同uid的資料儲存不重複(uid資訊寫入資料庫中唯一);
3)allGrouping(廣播發送,即每一個Tuple,每一個Bolt都會收到)廣播發送:所有bolt都可以收到該tuple;
4)globalGrouping(全局分組,将Tuple配置設定到task id值最低的task裡面)全局分組:tuple被發送給bolt的同一個并且最小task_id的任務處理,實作事務性的topology;
5)noneGrouping(随機分派)不分組:效果等同于shuffle Grouping;
6)directGrouping(直接分組,指定Tuple與Bolt的對應發送關系);
直接分組:由tuple的發射單元直接決定tuple将發射給那個bolt,一般情況下是由接收tuple的bolt決定接收哪個bolt發射的Tuple。這是一種比較特别的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。 隻有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來擷取處理它的消息的taskid (OutputCollector.emit方法也會傳回taskid)。
7)Local or shuffle Grouping本地或者随機分組,優先将資料發送到本機的處理器executor,如果本機沒有對應的處理器,那麼再發送給其他機器的executor,避免了網絡資源的拷貝,減輕網絡傳輸的壓力;
8)customGrouping (自定義的Grouping)。
9、Storm與kafka內建
9.1舊版本的kafka與storm之間互相內建
第一步:導入jar包
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
</dependency>
<!-- use old kafka spout code -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
第二步:代碼實作
9.2新版本的kafka與storm1.1.1內建
第一步:導入jar包
<!-- use new kafka spout code -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
第二步:編寫我們的主函數入口程式
public class KafkStormTopo {
public static void main(String[] args) throws Exception {
KafkaSpoutConfig.Builder<String, String> builder = KafkaSpoutConfig.builder("192.168.200.100:9092,192.168.200.101:9092,192.168.200.102:9092","yun01");
builder.setGroupId("test_storm_wc");
KafkaSpoutConfig<String, String> kafkaSpoutConfig = builder.build();
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("WordCountFileSpout",new KafkaSpout<String,String>(kafkaSpoutConfig), 1);
topologyBuilder.setBolt("readKafkaBolt", new KafkaBolt()).shuffleGrouping("WordCountFileSpout");
Config config = new Config();
if(args !=null && args.length > 0){
config.setDebug(false);
StormSubmitter submitter = new StormSubmitter();
submitter.submitTopology("kafkaStromTopo", config, topologyBuilder.createTopology());
}else{
config.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafkaStromTopo", config, topologyBuilder.createTopology());
}
}
}
第三步:開發我們的kafkabolt作為消息處理
public class KafkaBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println(input.getValues().get(4)+"消息接受bolt");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}