天天看點

JStorm之Nimbus簡介

一、簡介

JStorm叢集包含兩類節點:主要節點(Nimbus)和工作節點(Supervisor)。其分别對應的角色如下:

1. 主要節點(Nimbus)上運作Nimbus Daemon。Nimbus負責接收Client送出的Topology,分發代碼,配置設定任務給工作節點,監控叢集中運作任務的狀态等工作。Nimbus作用類似于Hadoop中JobTracker。

2. 工作節點(Supervisor)上運作Supervisor Daemon。Supervisor通過subscribe Zookeeper相關資料監聽Nimbus配置設定過來任務,據此啟動或停止Worker工作程序。每個Worker工作程序執行一個Topology任務的子集;單個Topology的任務由分布在多個工作節點上的Worker工作程序協同處理。

Nimbus和Supervisor節點之間的協調工作通過Zookeeper實作。此外,Nimbus和Supervisor本身均為無狀态程序,支援Fail Fast;JStorm叢集節點的狀态資訊或存儲在Zookeeper,或持久化到本地,這意味着即使Nimbus/Supervisor當機,重新開機後即可繼續工作。這個設計使得JStorm叢集具有非常好的穩定性。

前面介紹了JStorm中節點狀态資訊儲存在Zookeeper裡面,Nimbus通過向Zookeeper寫狀态資訊配置設定任務,Supervisor通過從Zookeeper訂閱相關資料領取任務,同時Supervisor也定期發送心跳資訊到Zookeeper,使得Nimbus可以掌握整個JStorm叢集的狀态,進而可以進行任務排程或負載均衡。ZooKeeper使得整個JStorm叢集十分健壯,任何節點當機都不影響叢集任務,隻要重新開機節點即可。

Zookeeper上存儲的狀态資料及Nimbus/Supervisor本地持久化資料涉及到的地方較多,詳細介紹Nimbus之前就上述資料的存儲結構簡要說明如下(注:引用自[5]http://xumingming.sinaapp.com/)。

JStorm之Nimbus簡介

圖1 JStorm存儲在Zookeeper中資料說明

JStorm之Nimbus簡介

圖2 Nimbus本地資料說明

JStorm之Nimbus簡介

圖3 Supervisor本地資料說明

二、系統架構與原理

Nimbus做三件事情:

1、接收Client送出Topology任務;

2、任務排程;

3、監控叢集任務運作狀況。

前面已經提到,Nimbus通過向Zookeeper寫資料完成任務配置設定,通過讀Zookeeper上相關狀态資訊監控叢集中任務的運作狀态,是以與Nimbus直接發生互動僅Client和Zookeeper。如下圖示。

JStorm之Nimbus簡介

三、實作邏輯與代碼剖析

以jstorm-0.7.1為例,Nimbus相關實作在jstorm-server/src/main/java目錄的com.alipay.dw.jstorm.daemon.nimbus包裡。Nimbus Daemon的啟動入口在NimbusServer.java。

1.Nimbus啟動

Nimbus Daemon程序啟動流程如下:

1、根據配置檔案初始化Context資料;

2、與Zookeeper資料同步;

3、初始化RPC服務處理類ServiceHandler;

4、啟動任務配置設定政策線程;

5、啟動Task的Heartbeat監控線程;

6、啟動RPC服務;

7、其他初始化工作。

Nimbus的詳細啟動邏輯如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
      
@SuppressWarnings("rawtypes")
private void launchServer(Map conf) throws Exception {
    LOG.info("Begin to start nimbus with conf " + conf);
    //1.檢查配置檔案中是否配置為分布式模式
    StormConfig.validate_distributed_mode(conf);
    //2.注冊主線程退出Hook現場清理(關閉線程+清理資料)
    initShutdownHook();
    //3.建立NimbusData資料,記錄30s逾時上傳下載下傳通道Channel/BufferFileInputStream
    data = createNimbusData(conf);
    //4.nimbus本地不存在的stormids資料如果在ZK上存在則删除,其中删除操作包括/zk/{assignments,tasks,storms}相關資料
    NimbusUtils.cleanupCorruptTopologies(data);
    //5.啟動Topology配置設定政策
    initTopologyAssign();
    //6.初始化所有topology的狀态為startup
    initTopologyStatus();
    //7.監控所有task的heartbeat,一旦發現taskid失去心跳将其置為needreassign 1次/10s
    initMonitor(conf);
    //8.啟動cleaner線程,預設600s掃描一次,預設删除3600s沒有讀寫過的jar檔案
    initCleaner(conf);
    //9.初始化ServiceHandler
    serviceHandler = new ServiceHandler(data);
    //10.啟動rpc server
    initThrift(conf);
}
           

2.Topology送出

JStorm叢集啟動完成後,Client可向其送出Topology。jstorm-0.7.1源碼目錄jstorm-client/src/main/java下包backtype.storm為使用者提供向叢集送出Topology的StormSubmitter.submitTopology方法。送出Topology在Client/Nimbus兩端都會做相關的處理。

Client端送出Topology分兩步完成:

1)打包Topology計算邏輯代碼jar送出給Nimbus,上傳到Nimbus目錄$jstorm_local_dir/nimbus/inbox/stormjar-{$randomid}.jar;其中randomid是Nimbus生成的随機UUID;

2)Client通過RPC向Nimbus送出Topology DAG及配置資訊;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
      
public static void submitTopology(
String name,
Map stormConf,
StormTopology topology)
throws AlreadyAliveException, InvalidTopologyException {
  if(!Utils.isValidConf(stormConf)) {
      throw new IllegalArgumentException("Storm conf is not valid.");
  }
  stormConf = new HashMap(stormConf);
  stormConf.putAll(Utils.readCommandLineOpts());
  Map conf = Utils.readStormConfig();
  conf.putAll(stormConf);
  try {
      String serConf = JSONValue.toJSONString(stormConf);
      if(localNimbus!=null) {
          LOG.info("Submitting topology " + name + " in local mode");
          localNimbus.submitTopology(name, null, serConf, topology);
      } else {
          //1.向Nimbus送出jar包
          submitJar(conf);
          NimbusClient client = NimbusClient.getConfiguredClient(conf);
          try {
              LOG.info("Submitting topology " +  name + " in distributed mode with conf "  + serConf);
              //2.送出topology DAG及序列化後的配置資訊serconf(json)
              client.getClient().submitTopology(name, submittedJar, serConf, topology);
          } finally {
              client.close();
          }
      }
      LOG.info("Finished submitting topology: " +  name);
  } catch(TException e) {
      throw new RuntimeException(e);
  }
}
           

其中RPC和資料序列化通過跨語言服務架構Thrift(http://wiki.apache.org/thrift/)實作。JStorm的服務定義在other/storm.thrift裡。

Nimbus端接收到Client送出上來的Topology計算邏輯代碼jar包後如前面所述将jar包暫存在目錄$jstorm_local_dir/nimbus/inbox/stormjar-{$randomid}.jar;

Nimbus端接收到Client送出上來的Topology DAG和配置資訊後:

1)簡單合法性檢查;主要檢查是否存在相同TopologyName的Topology,如果存在,拒絕Topology送出。

2)生成topologyid;生成規則:TopologyName-counter-currenttime;

3)序列化配置檔案和Topology代碼;

4)Nimbus本地準備運作時所需資料;

5)向Zookeeper注冊Topology和Task;

6)将Tasks壓入配置設定隊列等待TopologyAssign配置設定;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
      
@SuppressWarnings("unchecked")
@Override
public void submitTopology(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology)
        throws AlreadyAliveException, InvalidTopologyException, TException {
    ……
    try {
    	//1.檢測topologyName是否已經存在,如果存在相同名稱的topology則拒絕送出
        checkTopologyActive(data, topologyname, false);
    }
    ……
    //2.根據topologyname構造topologyid(=topologyname-$counter-$ctime)
    int counter = data.getSubmittedCount().incrementAndGet();
    String topologyId = topologyname + "-" + counter + "-"
            + TimeUtils.current_time_secs();
    //3.根據輸入參數jsonConf重組配置資料
    Map serializedConf = (Map) JStormUtils.from_json(jsonConf);
    if (serializedConf == null) {
        throw new InvalidTopologyException("");
    }
    serializedConf.put(Config.STORM_ID, topologyId);
    Map stormConf;
    try {
        stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology);
    } catch (Exception e1) {
        throw new TException(errMsg);
    }
    Map totalStormConf = new HashMap(conf);
    totalStormConf.putAll(stormConf);
    StormTopology newtopology = new StormTopology(topology);
    //4.檢查topology的合法性,包括componentid檢查和spout/bolt不能為空檢查
    // this validates the structure of the topology
    Common.validate_basic(newtopology, totalStormConf, topologyId);
    try {
        StormClusterState stormClusterState = data.getStormClusterState();
        //5.在nimbus的本地準備所有topology相關資料
        //包括$storm-local-dir/nimbus/stormdist/topologyid/{tormjar.jar,stormcode.ser,stormconf.ser}
        // create $storm-local-dir/nimbus/topologyId/xxxx files
        setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
                newtopology);
        //6.向zk寫入task資訊
        //6.1建立目錄$zkroot/taskbeats/topologyid
        //6.2寫檔案$zkroot/tasks/topologyid/taskid 内容為對應task的taskinfo[内容:componentid]
        // generate TaskInfo for every bolt or spout in ZK
        // $zkroot /tasks/topoologyId/xxx
        setupZkTaskInfo(conf, topologyId, stormClusterState);
        //7.任務配置設定事件壓入待配置設定隊列
        // make assignments for a topology
        TopologyAssignEvent assignEvent = new TopologyAssignEvent();
        assignEvent.setTopologyId(topologyId);
        assignEvent.setScratch(false);
        assignEvent.setTopologyName(topologyname);
        TopologyAssign.push(assignEvent);
    }
    ……
}
           

3.任務排程

Topology被成功送出後會壓入Nimbus中TopologyAssign的FIFO隊列,背景任務排程線程對隊列中的Topology逐個進行任務排程。

從0.9.0開始,JStorm提供非常強大的排程功能,基本上可以滿足大部分的需求,同時支援自定義任務排程政策。JStorm的資源不再僅是Worker的端口,而從CPU/Memory/Disk/Net等四個次元綜合考慮。

jstorm-0.7.1的任務排程政策仍主要以Worker端口/Net單一次元排程。

任務排程需要解決的問題是:如何将Topology DAG中各個計算節點和叢集資源比對,才能發揮高效的邏輯處理。0.7.1的政策是:

1、将叢集中的資源排序:按照空閑worker數從小到大的順序重排節點,節點内部按照端口大小順序排列;

2、Topology中需要配置設定的任務(重新配置設定的Topology時大多任務不再需要配置設定)逐個映射到上述排好序的資源裡。

任務排程核心邏輯如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
      
public static List sortSlots(
Set allSlots, int needSlotNum) {
    Map> nodeMap = new HashMap>();
    // group by first
    // 按照節點進行組織Map> : nodeid -> ports
    for (NodePort np : allSlots) {
        String node = np.getNode();
        List list = nodeMap.get(node);
        if (list == null) {
            list = new ArrayList();
            nodeMap.put(node, list);
        }
        list.add(np);
    }
 
    //每一個nodeid按照端口的大小進行排序
    for (Entry> entry : nodeMap.entrySet()) {
        List ports = entry.getValue();
        Collections.sort(ports);
    }
 
    //收集所有的workers
    List> splitup = new ArrayList>(nodeMap.values());
 
    //按照節點可用worker數從小到大排序
    //1.assignTasks-Map supInfos
    //2.availSlots : splitup/List>
    Collections.sort(splitup, new Comparator> () {
        public int compare(List o1, List o2) {
            return o1.size() - o2.size();
        }
    });
 
    /*
     * splitup目前的狀态(A-F表示節點,1-h表示端口)
     * |A| |B| |C| |D| |E| |F|
     *--|---|---|---|---|---|--
     * |1| |2| |3| |4| |5| |6|
     *     |7| |8| |9| |0| |a|
     *         |b| |c| |d| |e|
     *                 |f| |g|
     *                     |h|
     * 經過interleave_all收集到的sortedFreeSlots為:
     * 1-2-3-4-5-6-7-8-9-0-a-b-c-d-e-f-g-h
     */
    List sortedFreeSlots = JStormUtils.interleave_all(splitup);
 
    //比較sortedFreeSlots.size和needSlotNum的大小配置設定workers
    if (sortedFreeSlots.size()  needSlotNum
    return sortedFreeSlots.subList(0, needSlotNum);
}
           

4.任務監控

初始化Nimbus時背景會随之啟動一個稱為MonitorRunnable的線程,該線程的作用是定期檢查所有運作Topology的任務Tasks是否存在Dead的狀态。一旦發現Topology中存在Dead的任務Task,MonitorRunnable将該Topology置為StatusType.monitor,等待任務配置設定線程對該Topology中的Dead任務進行重新配置設定。

MonitorRunnable線程預設10s執行一次檢查,主要邏輯如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
      
@Override
public void run() {
    //1.擷取jstorm對zk的操作接口
    StormClusterState clusterState = data.getStormClusterState();
    try {
        // Attetion, here don't check /ZK-dir/taskbeats to
        // get active topology list
        //2.通過$zkroot/assignments/擷取所有需要檢查active topology
        List active_topologys = clusterState.assignments(null);
 
        if (active_topologys == null) {
            LOG.info("Failed to get active topologies");
            return;
        }
 
        for (String topologyid : active_topologys) {
            LOG.debug("Check tasks " + topologyid);
            // Attention, here don't check /ZK-dir/taskbeats/topologyid to
            // get task ids
            //3.通過$zkroot/tasks/topologyid擷取組成topology的所有tasks
            List taskIds = clusterState.task_ids(topologyid);
            if (taskIds == null) {
                LOG.info("Failed to get task ids of " + topologyid);
                continue;
            }
 
            boolean needReassign = false;
            for (Integer task : taskIds) {
                //4.檢查task是否為Dead狀态,主要是task心跳是否逾時
                boolean isTaskDead = NimbusUtils.isTaskDead(data, topologyid, task);
                if (isTaskDead == true) {
                    needReassign = true;
                    break;
                }
            }
            if (needReassign == true) {
                //5.如果Topology裡有Dead狀态的Task則topology狀态置為monitor等待任務配置設定線程重新配置設定
                NimbusUtils.transition(data, topologyid, false, StatusType.monitor);
            }
        }
    } catch (Exception e) {
        // TODO Auto-generated catch block
        LOG.error(e.getCause(), e);
    }
}
           

四、結語

本文簡單介紹了Nimbus在整個JStorm系統中扮演的角色,及其實作邏輯和關鍵流程的源碼剖析,希望能夠對剛接觸JStorm的同學有所幫助。文中難免存在不足和錯誤,歡迎交流指導。

五、參考文獻

[1]Storm社群. http://Storm.incubator.apache.org/

[2]JStorm源碼. https://github.com/alibaba/jStorm/

[3]Storm源碼. https://github.com/nathanmarz/Storm/

[4]Jonathan Leibiusky, Gabriel Eisbruch, etc. Getting Started with Storm.http://shop.oreilly.com/product/0636920024835.do. O’Reilly Media, Inc.

[5]Xumingming Blog. http://xumingming.sinaapp.com/

[6]量子恒道官方部落格. http://blog.linezing.com/