天天看點

Storm 原理機制雜記

Storm 原理機制雜記

Storm:

Storm是Twitter開源的分布式實時計算系統,Storm通過簡單的API使開發者可以可靠地處理無界持續的流資料,進行實時計算,開發語言為Clojure和Java,非JVM語言可以通過stdin/stdout以JSON格式協定與Storm進行通信。Storm的應用場景很多:實時分析、線上機器學習、持續計算、分布式RPC、ETL處理,等等。

storm的優點是全記憶體計算,因為記憶體尋址速度是硬碟的百萬倍以上,是以storm的速度相比較hadoop非常快(瓶頸是記憶體,cpu) 

缺點就是不夠靈活:必須要先寫好topology結構來等資料進來分析,如果我們需要對幾百個次元進行組合分析,那麼。。另外推薦storm的DRPC實在太有用了

Storm在叢集上運作一個Topology時,主要通過以下3個實體來完成Topology的執行工作:

1. Worker(程序)

2. Executor(線程)

3. Task(邏輯概念任務,實體概念叫executor線程,Storm0.8後多個task由一個excutor執行) 

// builder.setBolt("uvbolt", new UvBolt(),2).setNumTasks(4).fieldsGrouping("pvbolt", new Fields("pid","uid")) ; 

//  UvBolt開啟4個task執行個體(bolt執行個體),右2個線程執行,如果不設定tasknumber,預設2個線程執行2個task,預設一個線程一個task,每個線程執行task是串行,開啟4個task,相同pid,uid進入相同task

hadoop:電梯 storm:扶梯

調整消息不可靠:

 如果可靠性不是那麼重要,那麼不跟蹤tuple樹可以節省一半的消息,減少帶寬占用。 •方法1: Config.TOPOLOGY_ACKERS=0,此時storm會在spout發射一個tuple之後馬上調用spout的ack,tuple樹不會被跟蹤。 •方法2:發射tuple的時候不指定message_id •方法3:發射tuple的時候不進行anchor,則這部分不會被跟蹤。

下圖簡要描述了這3者之間的關系:

Storm 原理機制雜記

1個worker程序執行的是1個topology的子集(注:不會出現1個worker為多個topology服務)。1個worker程序會啟動1個或多個executor線程來執行1個topology的component(spout或bolt)。是以,1個運作中的topology就是由叢集中多台實體機上的多個worker程序組成的。

executor是1個被worker程序啟動的單獨線程。每個executor隻會運作1個topology的1個component(spout或bolt)的task(注:task可以是1個或多個,storm預設是1個component隻生成1個task,executor線程裡會在每次循環裡順序調用所有task執行個體)。

task是最終運作spout或bolt中代碼的單元(注:1個task即為spout或bolt的1個執行個體,executor線程在執行期間會調用該task的nextTuple或execute方法)。topology啟動後,1個component(spout或bolt)的task數目是固定不變的,但該component使用的executor線程數可以動态調整(例如:1個executor線程可以執行該component的1個或多個task執行個體)。這意味着,對于1個component存在這樣的條件:#threads<=#tasks(即:線程數小于等于task數目)。預設情況下task的數目等于executor線程數目,即1個executor線程隻運作1個task。

Storm 原理機制雜記
Storm 原理機制雜記

備注:一個灰格一個線程,綠色task四個,由2個線程同時完成,其它啟動多線程預設一線程執行一個task, 

       Task,執行具體資料處理的相關實體,也就是使用者實作的Spout/Blot執行個體。Storm中,一個executor可能會對應一個或者多個task

Config conf = new Config(); conf.setNumWorkers(2); // use two worker processestopologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // parallelism hinttopologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping("blue-spout");topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6) .shuffleGrouping("green-bolt");StormSubmitter.submitTopology( "mytopology", conf, topologyBuilder.createTopology() );

//預設一個excutor一個task.動态調整workprocess和excutors,通過rebalance,無需重新開機storm

# Reconfigure the topology "mytopology" to use 5 worker processes, # the spout "blue-spout" to use 3 executors and # the bolt "yellow-bolt" to use 10 executors. 

$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping("blue-spout");//每個excutor執行兩個task

建議:一般機器數量=task數量 (component:spolt or bolt)

對應任何emit操作都要ack或者fail否則容易導緻記憶體溢出,BasicBolt封裝ack,fail方法,不用顯示調用 預設批量Acker的task會跟蹤每一個噴發出的tuple的DAG,每一個tuple賦予64位的随機ID,每個tuple會知道所有ids

Storm 原理機制雜記

For example, if tuples "D" and "E" were created based on tuple "C", here's how the tuple tree changes when "C" is acked:

Since "C" is removed from the tree at the same time that "D" and "E" are added to it

比如說一個topology中要啟動300個線程來運作spout/bolt, 而指定的worker程序數量是60個, 那麼storm将會給每個worker配置設定5個線程來跑spout/bolt

假設同屬于一個Topology的Spout與Bolt分别處于不同的JVM,即不同的worker中,不同的JVM可能處于同一台實體機器,也可能處于不同的實體機器中。為了讓情景簡單,認為JVM處于不同的實體機器中。

spout消息到bolt執行過程: Spout的輸出消息到達Bolt,作為Bolt的輸入會經過這麼幾個階段。 1. spout的輸出通過該spout所處worker的消息輸出線程,将tuple輸入到Bolt所屬的worker。它們之間的通路是socket連接配接,用ZeroMQ實作。 2. bolt所處的worker有一個專門處理socket消息的receive thread 接收到spout發送來的tuple 3. receive thread将接收到的消息傳送給對應的bolt所在的executor。 在worker内部(即同一process内部),消息傳遞使用的是Lmax Disruptor pattern. 4. executor接收到tuple之後,由event-handler進行處理  

CoordinatedBolt的原理:

對于使用者在DRPC, Transactional Topology裡面的Bolt,都被CoordinatedBolt包裝了一層:也就是說在DRPC, Transactional Topology裡面的topology裡面運作的已經不是使用者提供的原始的Bolt, 而是一堆CoordinatedBolt, CoordinatedBolt把這些Bolt的事務都代理了

Storm:java和clojure語言

Storm是一個分布式的、高容錯的實時計算系統。

Storm對于實時計算的的意義相當于Hadoop對于批處理的意義。Hadoop為我們提供了Map和Reduce原語,使我們對資料進行批處理變的非常的簡單和優美。同樣,Storm也對資料的實時計算提供了簡單Spout和Bolt原語。

Storm适用的場景:

1、流資料處理:Storm可以用來用來處理源源不斷的消息,并将處理之後的結果儲存到持久化媒體中。

2、分布式RPC:由于Storm的處理元件都是分布式的,而且處理延遲都極低,是以可以Storm可以做為一個通用的分布式RPC架構來使用。

實時查詢服務:

全記憶體:直接提供資料讀取服務,定期dump到磁盤或資料庫進行持久化。

半記憶體:使用Redis、Memcache、MongoDB、BerkeleyDB等記憶體資料庫提供資料實時查詢服務,由這些系統進行持久化操作,先記憶體,再Redis

全磁盤:使用HBase等以分布式檔案系統(HDFS)為基礎的NoSQL資料庫,對于key-value引擎,關鍵是設計好key的分布。

Storm常見模式——批處理:Storm對流資料進行實時處理時,一種常見場景是批量一起處理一定數量的tuple元組,而不是每接收一個tuple就立刻處理一個tuple,這樣可能是性能的考慮,或者是具體業務的需要。

例如,批量查詢或者更新資料庫,如果每一條tuple生成一條sql執行一次資料庫操作,資料量大的時候,效率會比批量處理的低很多,影響系統吞吐量。當然,如果要使用Storm的可靠資料處理機制的話,應該使用容器将這些tuple的引用緩存到記憶體中,直到批量處理的時候,ack這些tuple

Storm 原理機制雜記
  • Intra-worker communication in Storm (inter-thread on the same Storm node): LMAX Disruptor
  • Inter-worker communication (node-to-node across the network): ZeroMQ or Netty
  • Inter-topology communication: nothing built into Storm, you must take care of this yourself with e.g. a messaging system such as Kafka/RabbitMQ, a database, etc.
Storm 原理機制雜記
Storm 原理機制雜記

繼續閱讀