天天看點

StormStorm

Storm

分布式計算引擎。Jstorm是一個類似于Hadoop MapReduce的系統,使用者按照指定的接口實作一個任務,然後這個任務交給JStorm系統,JStorm将這個任務跑起來,并按7 * 24小時運作。如果中間一個worker發生了意外故障,排程器立即配置設定一個新的worker來替換這個失效的worker。

特性:

  • 異常健壯:叢集易管理,可輪流重新開機節點
  • 容錯性好:消息處理過程出現異常,會進行重試
  • 語言無關性:topology可以用多種語言編寫

    架構類型:

  • 主從架構:簡單,高效,但主節點存在單點問題
  • 對稱架構:複雜,效率較低,但無單點問題,更加可靠

概念

Apache storm從一端讀取實時資料的原始流,并将其傳遞通過一系列小處理單元,并在另一端輸出處理/有用的資訊。

StormStorm
元件 描述
Tuple Tuple是Storm中的主要資料結構。它是有序元素的清單。預設情況下,Tuple支援所有資料類型。通常,它被模組化為一組逗号分隔的值,并傳遞到Storm叢集。
Stream 流是元組的無序序列。
Spouts 流的源。通常,Storm從原始資料源(如Twitter Streaming API,Apache Kafka隊列,Kestrel隊列等)接受輸入資料。否則,您可以編寫spouts以從資料源讀取資料。“ISpout”是實作spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。
Bolts Bolts是邏輯處理單元。Spouts将資料傳遞到Bolts和Bolts過程,并産生新的輸出流。Bolts可以執行過濾,聚合,加入,與資料源和資料庫互動的操作。Bolts接收資料并發射到一個或多個Bolts。 “IBolt”是實作Bolts的核心接口。一些常見的接口是IRichBolt,IBasicBolt等。
StormStorm

Storm元件:Nimbus

  • 接受用戶端topo代碼,拆分成多個task,将task資訊存入zk
  • 将task配置設定給Supervisor,将映射關系存入zk
  • 故障檢測

Storm元件:Supervisor

  • 從Nimbus目錄讀取代碼,從zk上讀取Nimbus配置設定的task
  • 啟動工作程序Worker執行任務
  • 檢測運作的工作程序Worker

Storm元件: Worker

  • 從zk上讀取配置設定的task,并計算出task需要給哪些task發消息
  • 啟動一個或多個Executor線程執行任務Task

Storm元件:Zookeeper

  • Numbus于Supervisor進行通信(配置設定任務和心跳)
  • Supervisor與Worker進行通信(配置設定任務和心跳)
  • Nimbus高可用(HA機制)
Strom作業送出流程
  1. 使用者編寫Strom Topolgy
  2. 使用Client送出Topology給Nimbus
  3. Nimbus指派Task給Supervisor
  4. Supervisor為Task啟動Worker
  5. Worker執行Task

Grouping方式

StormStorm

Storm如何保證資料有效性

  • Nimubs故障,換台機器重新開機即可
  • Supervisor挂掉,遷移其上的Worker即可
  • Worker挂掉,遷移走資料能正确處理嗎?

如何保證worker資料正确的恢複?不會被重複計算?

Spout資料保障

  • 不丢:Acker機制保證資料如果未處理成功,可以即使發現,并通知Spout重發
  • 不重:使用msgID去重

API參數介紹

Spout建立

Spout是用于資料生産的元件。基本上,一個spout将實作一個IRichSpout接口。“IRichSpout”接口有以下重要方法

  • open 為Spout提供執行環境。執行器将運作此方法來初始化噴頭
  • nextTuple 通過收集器發出生成的資料
  • close 當spout要關閉時調用此方法
  • declareOutFields 聲明元組的輸出模式
  • activate 當Spout已經失效時被調用。該Spout的nextTuple方法很快就會被調用
  • ack 确認處理了特定元組
  • fail 指定不處理和不重新處理特定元組

open

  • conf - 為此spout提供storm配置
  • context - 提供有關拓撲中的spout位置,其任務ID,輸入和輸出資訊的完整資訊
  • collector - 使我們能夠發出将由bolts處理的元組

nextTuple

nextTuple()從與ack()和fail()方法相同的循環中定期調用。它必須釋放線程的控制,當沒有工作要做,以便其他方法有機會被調用。是以,nextTuple的第一行檢查處理是否已完成。如果是這樣,它應該休眠至少一毫秒,以減少處理器在傳回之前的負載。

close

關閉資源的方法

declareOutFields

declarer - 用于聲明輸出流id,輸出字段等

此方法用于指定元組的輸出模式

ack

該方法确認已經處理了特定元組。

nextTuple

此方法通知特定元組尚未完全處理,Storm将重新處理特定的元組

Bolt建立

Bolt是一個使用元組作為輸入,處理元組,并産生新的元組作為輸出的元件。Bolts将實作IRichBolt接口。在此程式中,使用兩個Bolts類CallLogCreatorBolt和CallLogCounterBolt來執行操作

IRichBolt接口有以下方法

  • prepare - 為bolt提供要執行的環境。執行器将運作此方法來初始化spout
  • excute - 處理單個元組的輸入
  • cleanup - 當spout要關閉時調用
  • declareOutputFields - 聲明元組的輸出模式

prepare

  • conf - 為此bolt提供Storm配置
  • context - 提供有關拓撲中的bolt位置,其任務ID,輸入和輸出資訊等的完整資訊
  • collector - 使我們能夠發出處理的元組

execute

這裡的元組是要處理的輸入元組

execute方法一次處理單個元組。元組資料可以通過Tuple類的getValue方法通路。不必立即處理輸入元組。多元組可以被處理和輸出為單個輸出元組。處理的元組可以通過使用OutputCollector類發出。

cleanup

declareOutputFields

這裡的參數declarer用于聲明輸出流id,輸出字段等

此方法用于指定元組的輸出模式

分組政策

  • Shuffle Grouping 随機分組:輪詢,平均配置設定
  • Fields Grouping 按字段分組:比如按照userid分組,具有相同的userid的tuple會被分到同一組
  • All Grouping 廣播發送:對于每個tuple,所有的bolts都會收到
  • Global Grouping 全局分組:這個tuple被配置設定到storm中的一個bolt的其中一個task
  • Non Grouping 不分組:stream不關心到底誰會收到他的tuple
  • Direct Grouping 直接分組:消息的發送者指定由消息接收者的哪個task處理這個消息
  • Local or shuffle grouping :如果目标bolt有一個或多個task在同一個工作程序中,tuple将會被随機發送給這些tasks。否則,和普通的shuffle Grouping行為一緻。

繼續閱讀