在2011年Storm開源之前,由于Hadoop的火紅,整個業界都在喋喋不休地談論大資料。Hadoop的高吞吐,海量資料處理的能力使得人們可以友善地地處理海量資料。但是,Hadoop的缺點也和它的優點同樣鮮明——延遲大,響應緩慢,運維複雜。
有需求也就有創造,在Hadoop基本奠定了大資料霸主地位的時候很多的開源項目都是以彌補Hadoop的實時性為目标而被創造出來。而在這個節骨眼上
Storm橫空出世了。
Storm帶着流式計算的标簽華麗麗滴出場了,其實storm相對來說比hadoop複雜多了.如果你學會storm以後再去看hadoop那是非常簡單的,看看它的一些賣點
-
- 分布式系統:可橫向拓展,現在的項目不帶個分布式特性都不好意思開源。
- 運維簡單:Storm的部署的确簡單。雖然沒有Mongodb的解壓即用那麼簡單,但是它也就是多安裝兩個依賴庫而已。
- 高度容錯:子產品都是無狀态的,随時當機重新開機。
- 無資料丢失:Storm創新性提出的ack消息追蹤架構和複雜的事務性處理,能夠滿足很多級别的資料處理需求。不過,越高的資料處理需求,性能下降越嚴重。
- 多語言:實際上,Storm的多語言更像是臨時添加上去似的。因為,你的送出部分還是要使用Java實作.
下面我們來對比一下hadoop與storm

但是他們沒有誰強誰弱,應為側重的方向不一樣,hadoop側重離線海量資料的分析,而storm需要實時計算分析,比如交通流量的調查,
淘寶或者京東購買商品時候的實時推薦等等 都可以通過storm實作.Storm主要分為兩種元件Nimbus和Supervisor。
這兩種元件都是快速失敗的,沒有狀态。任務狀态和心跳資訊等都儲存在Zookeeper上的,送出的代碼資源都在本地機器的硬碟上。
-
- Nimbus負責在叢集裡面發送代碼,配置設定工作給機器,并且監控狀态。全局隻有一個。
- Supervisor會監聽配置設定給它那台機器的工作,根據需要啟動/關閉工作程序Worker。每一個要運作Storm的機器上都要部署一個,并且,按照機器的配置設定上面配置設定的槽位數。
- Zookeeper是Storm重點依賴的外部資源。
- Nimbus和Supervisor甚至實際運作的Worker都是把心跳儲存在Zookeeper上的。
- Nimbus也是根據Zookeerper上的心跳和任務運作狀況,進行排程和任務配置設定的。Storm送出運作的程式稱為Topology。
- Topology處理的最小的消息機關是一個Tuple,也就是一個任意對象的數組。Topology由Spout和Bolt構成。
- Spout是發出Tuple的結點。Bolt可以随意訂閱某個Spout或者Bolt發出的Tuple。Spout和Bolt都統稱為component。
我們來看張經典的Topology的流程圖就明白了
在看張storm的架構圖
可能看到這或許你大概知道怎麼工作的了,
下面看下我們的hell world
首先引入依賴
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
看下我們的Topology
public static void main(String[] args) throws InterruptedException {
//初始化top構造器
TopologyBuilder top=new TopologyBuilder();
//設定spout
top.setSpout("spout",new TestSpout());
//設定bolt
top.setBolt("print-bolt",new TestBoot()).shuffleGrouping("spout");
top.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt");
//本地模式
LocalCluster cluster=new LocalCluster();
//送出top圖
cluster.submitTopology("tag",new Config(),top.createTopology());
Thread.sleep(10000);
cluster.killTopology("tap");
cluster.shutdown();
}
下面是Spout
public class TestSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private static final Map<Integer, String> map = new HashMap<Integer, String>();
static {
map.put(0, "java");
map.put(1, "php");
map.put(2, "groovy");
map.put(3, "python");
map.put(4, "ruby");
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//對spout進行初始化
this.collector = collector;
System.out.println("++++++++++++++++++++++++++++++");
//System.out.println(this.collector);
}
/**
* <B>方法名稱:</B>輪詢tuple<BR>
* <B>概要說明:</B><BR>
* @see
*/
@Override
public void nextTuple() {
//随機發送一個單詞
final Random r = new Random();
int num = r.nextInt(5);
System.out.println("----------------------------");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.collector.emit(new Values(map.get(num)));
}
/**
* <B>方法名稱:</B>declarer聲明發送資料的field<BR>
* <B>概要說明:</B><BR>
* @see
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
System.out.println("=============================");
//進行聲明
declarer.declare(new Fields("print"));
}
}
TestBoot
public class TestBoot extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
_collector=outputCollector;
}
@Override
public void execute(Tuple tuple) {
String spout = tuple.getStringByField("print");
System.out.println("1111111111111"+spout);
//發射字段
_collector.emit(new Values(spout));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//這是下遊接收者
outputFieldsDeclarer.declare(new Fields("write"));
}
}
writeBolt
public class WriteBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Log log = LogFactory.getLog(WriteBolt.class);
private FileWriter writer ;
@Override
public void execute(Tuple input) {
//擷取上一個元件所聲明的Field
String text = input.getStringByField("write");
System.out.println(text);
try {
if(writer == null){
if(System.getProperty("os.name").equals("Windows 10")){
writer = new FileWriter("D:\\099_test\\" + this);
} else if(System.getProperty("os.name").equals("Windows 8.1")){
writer = new FileWriter("D:\\099_test\\" + this);
} else if(System.getProperty("os.name").equals("Windows 7")){
writer = new FileWriter("D:\\099_test\\" + this);
} else if(System.getProperty("os.name").equals("Linux")){
System.out.println("----:" + System.getProperty("os.name"));
writer = new FileWriter("/usr/local/temp/" + this);
}
}
log.info("【write】: 寫入檔案");
writer.write(text);
writer.write("\n");
writer.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
}
我們的Topology圖的流程是這樣的由TestSpout 流向 TestBoot 然後在流向 WriteBolt ,TestBoot負責在控制台列印,WriteBolt 負責往磁盤寫入 ,下面使我們的運作結果
通過Spout随機給TestBoot發送單詞,然後經過WriteBolt進行寫入到磁盤.并且啟動的是本地模式.如果想在伺服器上的storm環境中運作Topology 需要改成叢集模式
try {
//開啟叢集模式
StormSubmitter.submitTopology("top1", new Config(), top.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
本文章重點是要讓大家對Storm有個初步的認識,希望能幫到大家