Storm
分布式計算引擎。Jstorm是一個類似于Hadoop MapReduce的系統,使用者按照指定的接口實作一個任務,然後這個任務交給JStorm系統,JStorm将這個任務跑起來,并按7 * 24小時運作。如果中間一個worker發生了意外故障,排程器立即配置設定一個新的worker來替換這個失效的worker。
特性:
- 異常健壯:叢集易管理,可輪流重新開機節點
- 容錯性好:消息處理過程出現異常,會進行重試
語言無關性:topology可以用多種語言編寫
架構類型:
- 主從架構:簡單,高效,但主節點存在單點問題
- 對稱架構:複雜,效率較低,但無單點問題,更加可靠
概念
Apache storm從一端讀取實時資料的原始流,并将其傳遞通過一系列小處理單元,并在另一端輸出處理/有用的資訊。

元件 | 描述 |
---|---|
Tuple | Tuple是Storm中的主要資料結構。它是有序元素的清單。預設情況下,Tuple支援所有資料類型。通常,它被模組化為一組逗号分隔的值,并傳遞到Storm叢集。 |
Stream | 流是元組的無序序列。 |
Spouts | 流的源。通常,Storm從原始資料源(如Twitter Streaming API,Apache Kafka隊列,Kestrel隊列等)接受輸入資料。否則,您可以編寫spouts以從資料源讀取資料。“ISpout”是實作spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。 |
Bolts | Bolts是邏輯處理單元。Spouts将資料傳遞到Bolts和Bolts過程,并産生新的輸出流。Bolts可以執行過濾,聚合,加入,與資料源和資料庫互動的操作。Bolts接收資料并發射到一個或多個Bolts。 “IBolt”是實作Bolts的核心接口。一些常見的接口是IRichBolt,IBasicBolt等。 |
Storm元件:Nimbus
- 接受用戶端topo代碼,拆分成多個task,将task資訊存入zk
- 将task配置設定給Supervisor,将映射關系存入zk
- 故障檢測
Storm元件:Supervisor
- 從Nimbus目錄讀取代碼,從zk上讀取Nimbus配置設定的task
- 啟動工作程序Worker執行任務
- 檢測運作的工作程序Worker
Storm元件: Worker
- 從zk上讀取配置設定的task,并計算出task需要給哪些task發消息
- 啟動一個或多個Executor線程執行任務Task
Storm元件:Zookeeper
- Numbus于Supervisor進行通信(配置設定任務和心跳)
- Supervisor與Worker進行通信(配置設定任務和心跳)
- Nimbus高可用(HA機制)
Strom作業送出流程
- 使用者編寫Strom Topolgy
- 使用Client送出Topology給Nimbus
- Nimbus指派Task給Supervisor
- Supervisor為Task啟動Worker
- Worker執行Task
Grouping方式
Storm如何保證資料有效性
- Nimubs故障,換台機器重新開機即可
- Supervisor挂掉,遷移其上的Worker即可
- Worker挂掉,遷移走資料能正确處理嗎?
如何保證worker資料正确的恢複?不會被重複計算?
Spout資料保障
- 不丢:Acker機制保證資料如果未處理成功,可以即使發現,并通知Spout重發
- 不重:使用msgID去重
API參數介紹
Spout建立
Spout是用于資料生産的元件。基本上,一個spout将實作一個IRichSpout接口。“IRichSpout”接口有以下重要方法
- open 為Spout提供執行環境。執行器将運作此方法來初始化噴頭
- nextTuple 通過收集器發出生成的資料
- close 當spout要關閉時調用此方法
- declareOutFields 聲明元組的輸出模式
- activate 當Spout已經失效時被調用。該Spout的nextTuple方法很快就會被調用
- ack 确認處理了特定元組
- fail 指定不處理和不重新處理特定元組
open
- conf - 為此spout提供storm配置
- context - 提供有關拓撲中的spout位置,其任務ID,輸入和輸出資訊的完整資訊
- collector - 使我們能夠發出将由bolts處理的元組
nextTuple
nextTuple()從與ack()和fail()方法相同的循環中定期調用。它必須釋放線程的控制,當沒有工作要做,以便其他方法有機會被調用。是以,nextTuple的第一行檢查處理是否已完成。如果是這樣,它應該休眠至少一毫秒,以減少處理器在傳回之前的負載。
close
關閉資源的方法
declareOutFields
declarer - 用于聲明輸出流id,輸出字段等
此方法用于指定元組的輸出模式
ack
該方法确認已經處理了特定元組。
nextTuple
此方法通知特定元組尚未完全處理,Storm将重新處理特定的元組
Bolt建立
Bolt是一個使用元組作為輸入,處理元組,并産生新的元組作為輸出的元件。Bolts将實作IRichBolt接口。在此程式中,使用兩個Bolts類CallLogCreatorBolt和CallLogCounterBolt來執行操作
IRichBolt接口有以下方法
- prepare - 為bolt提供要執行的環境。執行器将運作此方法來初始化spout
- excute - 處理單個元組的輸入
- cleanup - 當spout要關閉時調用
- declareOutputFields - 聲明元組的輸出模式
prepare
- conf - 為此bolt提供Storm配置
- context - 提供有關拓撲中的bolt位置,其任務ID,輸入和輸出資訊等的完整資訊
- collector - 使我們能夠發出處理的元組
execute
這裡的元組是要處理的輸入元組
execute方法一次處理單個元組。元組資料可以通過Tuple類的getValue方法通路。不必立即處理輸入元組。多元組可以被處理和輸出為單個輸出元組。處理的元組可以通過使用OutputCollector類發出。
cleanup
declareOutputFields
這裡的參數declarer用于聲明輸出流id,輸出字段等
此方法用于指定元組的輸出模式
分組政策
- Shuffle Grouping 随機分組:輪詢,平均配置設定
- Fields Grouping 按字段分組:比如按照userid分組,具有相同的userid的tuple會被分到同一組
- All Grouping 廣播發送:對于每個tuple,所有的bolts都會收到
- Global Grouping 全局分組:這個tuple被配置設定到storm中的一個bolt的其中一個task
- Non Grouping 不分組:stream不關心到底誰會收到他的tuple
- Direct Grouping 直接分組:消息的發送者指定由消息接收者的哪個task處理這個消息
- Local or shuffle grouping :如果目标bolt有一個或多個task在同一個工作程序中,tuple将會被随機發送給這些tasks。否則,和普通的shuffle Grouping行為一緻。