天天看點

Storm源碼淺析之topology的送出

    最近一直在讀twitter開源的這個分布式流計算架構——storm的源碼,還是有必要記錄下一些比較有意思的地方。我按照storm的主要概念進行組織,并且隻分析我關注的東西,是以稱之為淺析。       

一、介紹

    storm的開發語言主要是java和clojure,其中java定義骨架,而clojure編寫核心邏輯。源碼統計結果:

<!–

code highlighting produced by actipro codehighlighter (freeware)

<a href="http://www.codehighlighter.com/" target="_blank">http://www.codehighlighter.com/</a>

–&gt;     180 text files.

     177 unique files.                                          

       7 files ignored.

http://cloc.sourceforge.net v 1.55  t=1.0 s (171.0 files/s, 46869.0 lines/s)

——————————————————————————-

language                     files          blank        comment           code

java                           125           5010           2414          25661

lisp                            33            732            283           4871

python                           7            742            433           4675

css                              1             12             45           1837

bourne shell                     1              0              0              6

javascript                       2              1             15              6

sum:                           171           6519           3190          37160

    java代碼25000多行,而clojure(lisp)隻有4871行,說語言不重要再次證明是扯淡。

二、topology和nimbus       

    topology是storm的核心理念,将spout和bolt組織成一個topology,運作在storm叢集裡,完成實時分析和計算的任務。這裡我主要想介紹下topology部署到storm叢集的大概過程。送出一個topology任務到storm叢集是通過stormsubmitter.submittopology方法送出:

–&gt;stormsubmitter.submittopology(name, conf, builder.createtopology());

    我們将topology打成jar包後,利用bin/storm這個python腳本,執行如下指令:

–&gt;bin/storm jar xxxx.jar com.taobao.mytopology args

    将jar包送出給storm叢集。storm腳本會啟動jvm執行topology的main方法,執行submittopology的過程。而submittopology會将jar檔案上傳到nimbus,上傳是通過socket傳輸。在storm這個python腳本的jar方法裡可以看到:

–&gt;def jar(jarfile, klass, args):                                                                                                                               

   exec_storm_class(                                                                                                                                          

        klass,                                                                                                                                                

        jvmtype=“-client“,                                                                                                                                    

        extrajars=[jarfile, conf_dir, storm_dir + “/bin“],                                                                                                    

        args=args,                                                                                                                                            

        prefix=“export storm_jar=“ + jarfile + “;“)

     将jar檔案的位址設定為環境變量storm_jar,這個環境變量在執行submittopology的時候用到:

–&gt;//stormsubmitter.java 

private static void submitjar(map conf) {

        if(submittedjar==null) {

Storm源碼淺析之topology的送出

            string localjar = system.getenv(“storm_jar“);

            submittedjar = submitjar(conf, localjar);

        } else {

            log.info(“jar already uploaded to master. not submitting jar.“);

        }

    }

    通過環境變量找到jar包的位址,然後上傳。利用環境變量傳參是個小技巧。

    其次,nimbus在接收到jar檔案後,存放到資料目錄的inbox目錄,nimbus資料目錄的結構:

–&gt;-nimbus

     -inbox

         -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar

         -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

     -stormdist

        -storm-id

           -stormjar.jar

           -stormconf.ser

           -stormcode.ser

     其中inbox用于存放送出的jar檔案,每個jar檔案都重命名為stormjar加上一個32位的uuid。而stormdist存放的是啟動topology後生成的檔案,每個topology都配置設定一個唯一的id,id的規則是“name-計數-時間戳”。啟動後的topology的jar檔案名命名為storm.jar ,而它的配置經過java序列化後存放在stormconf.ser檔案,而stormcode.ser是将topology本身序列化後存放的檔案。這些檔案在部署的時候,supervisor會從這個目錄下載下傳這些檔案,然後在supervisor本地執行這些代碼。

    進入重點,topology任務的配置設定過程(zookeeper路徑說明忽略root):

1.在zookeeper上建立/taskheartbeats/{storm id} 路徑,用于任務的心跳檢測。storm對zookeeper的一個重要應用就是利用zk的臨時節點做存活檢測。task将定時重新整理節點的時間戳,然後nimbus會檢測這個時間戳是否超過timeout設定。

2.從topology中擷取bolts,spouts設定的并行數目以及全局配置的最大并行數,然後産生task id清單,如[1 2 3 4]

3.在zookeeper上建立/tasks/{strom id}/{task id}路徑,并存儲task資訊

4.開始配置設定任務(内部稱為assignment), 具體步驟:

 (1)從zk上獲得已有的assignment(新的toplogy當然沒有了)

 (2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個worker的端口。

 (3)将任務均勻地配置設定給可用的worker,這裡有兩種情況:

 (a)task數目比worker多,例如task是[1 2 3 4],可用的slot隻有[host1:port1 host2:port1],那麼最終是這樣配置設定

–&gt;{1: [host1:port1] 2 : [host2:port1]

         3 : [host1:port1] 4 : [host2:port1]}

,可以看到任務平均地配置設定在兩個worker上。

(b)如果task數目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那麼首先會将woker排序,*将不同host間隔排列,保證task不會全部配置設定到同一個worker上,也就是将worker排列成

–&gt;[host1:port1 host2:port1 host1:port2 host2:port2]

,然後配置設定任務為

–&gt;{1: host1:port1 , 2 : host2:port2}

(4)記錄啟動時間

(5)判斷現有的assignment是否跟重新配置設定的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。

5.啟動topology,所謂啟動,隻是将zookeeper上/storms/{storm id}對應的資料裡的active設定為true。

6.nimbus會檢查task的心跳,如果發現task心跳超過逾時時間,那麼會重新跳到第4步做re-assignment。

本文來源于"阿裡中間件團隊播客",原文發表時間" 2011-12-02 "