天天看點

淘寶分布式排程架構TBSchedule

聲明:本文為CSDN原創投稿文章,未經許可,禁止任何形式的轉載。 

作者:周立偉(ITeye部落格:mycolababy.iteye.com),京東商城進階工程師,關注分布式、高并發和Java中間件的研究。 

責編:錢曙光,關注架構和算法領域,尋求報道或者投稿請發郵件[email protected],另有「CSDN 進階架構師群」,内有諸多知名網際網路公司的大牛架構師,歡迎架構師加微信qshuguang2008申請入群,備注姓名+公司+職位。

【編者按】 TBSchedule是一款非常優秀的高性能分布式排程架構,本文是作者結合多年使用TBSchedule的經驗,在研讀三遍源碼的基礎上完成。期間作者也與阿裡空玄有過不少技術交流,并非常感謝空玄給予的大力支援。另外,作者寫這篇文章的目的一是出于對TBSchedule的一種熱愛,二是現在是一個資源共享、技術共享的時代,希望把它展現給大家(送人玫瑰,手留餘香),能給大家的工作帶來幫助。

以下為文章正文:

一、TBSchedule初識

時下網際網路和電商領域,各個平台都存在大資料、高并發的特點,對資料處理的要求越來越高,既要保證高效性,又要保證安全性、準确性。TBSchedule的使命就是将排程作業從業務系統中分離出來,降低或者是消除和業務系統的耦合度,進行高效異步任務處理。其實在網際網路和電商領域TBSchedule的使用非常廣泛,目前被應用于阿裡巴巴、淘寶、支付寶、京東、聚美、汽車之家、國美等很多網際網路企業的流程排程系統。

在深入了解TBSchedule之前我們先從内部和外部形态對它有個初步認識,如圖1.1、圖1.2。

淘寶分布式排程架構TBSchedule

圖1.1 TBSchedule關鍵字

淘寶分布式排程架構TBSchedule

圖1.2 TBSchedule外部形态

從TBSchedule的内部形态來說,與他有關的關鍵詞包括批量任務、動态擴充、多主機、多線程、并發、分片……,這些詞看起來非常的高大上,都是時下網際網路技術比較流行的詞彙。從TBSchedule的外部架構來看,一目了然,宿主在排程應用中與ZooKeeper進行通信。一個架構結構是否是優秀的,從美感的角度就可以看出來,一個好的架構一定是隐藏了内部複雜的原理,外部視覺上美好的,讓使用者使用起來簡單易懂。

二、TBSchedule原理

為什麼TBSchedule值得推廣呢?

  1. 傳統的排程架構spring task、quartz也是可以進行叢集排程作業的,一個節點挂了可以将任務漂移給其他節點執行進而避免單點故障,但是不支援分布式作業,一旦達到單機處理極限也會存在問題。
  2. elastic-job支援分布式,是一個很好的排程架構,但是開源時間較短,還沒有經曆大範圍市場考驗。
  3. Beanstalkd基于C語言開發,使用範圍較小,無法引入到php、java系統平台。

TBSchedule到底有多強大呢?我對TBSchedule的優勢特點進行了如下總結:

  1. 支援叢集、分布式
  2. 靈活的任務分片
  3. 動态的服務擴容和資源回收
  4. 任務監控支援
  5. 經曆了多年市場考驗,阿裡強大技術團隊支援

TBSchedule支援Cluster,可以宿主在多台伺服器多個線程組并行進行任務排程,或者說可以将一個大的任務拆成多個小任務配置設定到不同的伺服器。

TBSchedule的分布式機制是通過靈活的Sharding方式實作的,比如可以按所有資料的ID按10取模分片(分片規則如圖2.1)、按月份分片等等,根據不同的需求,不同的場景由用戶端配置分片規則。然後就是TBSchedule的宿主伺服器可以進行動态擴容和資源回收,這個特點主要是因為它後端依賴的ZooKeeper,這裡的ZooKeeper對于TBSchedule來說是一個NoSQL,用于存儲政策、任務、心跳資訊資料,它的資料結構類似檔案系統的目錄結構,它的節點有臨時節點、持久節點之分。排程引擎上線後,随着業務量資料量的增多,目前Cluster可能不能滿足目前的處理需求,那麼就需要增加伺服器數量,一個新的伺服器上線後會在ZooKeeper中建立一個代表目前伺服器的一個唯一性路徑(臨時節點),并且新上線的伺服器會和ZooKeeper保持長連接配接,當通信斷開後,節點會自動摘除。

TBSchedule會定時掃描目前伺服器的數量,重新進行任務配置設定。TBSchedule不僅提供了服務端的高性能排程服務,還提供了一個scheduleConsole war随着宿主應用的部署直接部署到伺服器,可以通過web的方式對排程的任務、政策進行監控管理,以及實時更新調整。

淘寶分布式排程架構TBSchedule

圖2.1 TBSchedule分片規則

是不是已經對TBSchedule稍微了有些好感呢?我們接着往下看。

TBSchedule提供了兩個核心元件ScheduleServer、TBScheduleManagerFactory和兩類核心接口IScheduleTaskDeal、IScheduleTaskDealSingle、IScheduleTaskDealMuti,這兩部分是用戶端研發的關鍵部分,是使用TBSchedule必須要了解的。

ScheduleServer即任務處理器,的主要作用是任務和政策的管理、任務采集和執行,由一組工作線程組成,這組工作線程是基于隊列實作的,進行任務抓取和任務處理(有兩種處理模式,下面會講)。每個任務處理器和ZooKeeper有一個心跳通信連接配接,用于檢測Server的狀态和進行任務動态配置設定。舉個例子,比如3台伺服器的worker叢集執行出票消息生成任務,對于這個任務類型每台伺服器可以配置一個ScheduleSever(即一個線程組),也可以配置兩個線程組,那麼就相當于6台伺服器在并行執行此任務類型。當某台伺服器當機或者其他原因與ZooKeeper通信斷開時,它的任務将被其他伺服器接管。ScheduleServer參數定義如圖2.2

淘寶分布式排程架構TBSchedule

圖2.2 ScheduleServer參數定義

在這些參數中taskItems是一個非常重要的屬性,是客戶單可以自由發揮的地方,是任務分片的基礎,比如我們處理一個任務可以根據ID按10取模,那麼任務項就是0-9,3台伺服器分别拿到4、 3、 3個任務項,伺服器的上下線都會對任務項進行重新配置設定。任務項是進行任務配置設定的最小機關。一個任務項隻能由一個ScheduleServer來進行處理,但一個Server可以處理任意數量的任務項。這就是剛才我們說的分片特性。

排程伺服器TBScheduleManagerFactory的主要工作ZooKeeper連接配接參數配置和ZooKeeper的初始化、排程管理。

兩類核心接口是需要被我們定義的目标任務實作的,根據自己的需要進行任務采集(重寫selectTasks方法)和任務執行(重寫execute方法),這兩類接口也是用戶端研發根據需求自由發揮的地方。

接下來我們深入了解下TBSchedule,看看它的内部是如何實作的。圖2.3流程圖是我花了很多心血通過一周時間畫出來的,基本是清晰的展現了TBSchedule内部的執行流程以及每個步驟ZooKeeper節點路徑和資料的變化。因為圖中的注釋已經描述的很詳細了,每個節點右側是ZooKeeper的資訊(資料結構見圖2.4),這裡就不再做過多的文字描述了,有任何建議或者不明白的地方可以找我交流。

淘寶分布式排程架構TBSchedule

圖2.3 TBSchedule内部流程

淘寶分布式排程架構TBSchedule

圖2.4 TBSchedule之ZooKeeper資料結構

TBSchedule還有個強大之處是它提供了兩種處理器模式模式:

1. SLEEP模式

當某一個線程任務處理完畢,從任務池中取不到任務的時候,檢查其它線程是否處于活動狀态。如果是,則自己休眠;如果其它線程都已經因為沒有任務進入休眠,目前線程是最後一個活動線程的時候,就調用業務接口,擷取需要處理的任務,放入任務池中,同時喚醒其它休眠線程開始工作。

2. NOTSLEEP模式

當一個線程任務處理完畢,從任務池中取不到任務的時候,立即調用業務接口擷取需要處理的任務,放入任務池中。

SLEEP模式内部邏輯相對較簡單,如果遇到大任務需要處理較長時間,可能會造成其他線程被動阻塞的情況。但其實生産環境一般都是小而快的任務,即使出現阻塞的情況ScheduleConsole也會及時的監控到。NOTSLEEP模式減少了線程休眠的時間,避免了因大任務造成阻塞的情況,但為了避免資料被重複處理,增加了CPU在資料比較上的開銷。TBSchedule預設是SLEEP模式。

到目前為止我相信大家對TBSchedule有了一個深刻的了解,心中的疑霧逐漸散開了。理論是實踐的基礎,實踐才是最終的目的,下一節我們将結合理論知識進行TBSchedule實戰。

三、TBSchedule實戰

在項目中使用TBSchedule需要依賴ZooKeeper、TBSchedule。

ZooKeeper依賴:

<dependency>
        <groupId>org.apache.ZooKeeper</groupId>
        <artifactId>ZooKeeper</artifactId>
        <version>3.4.6</version>
    </dependency>           

TBSchedule依賴:

<dependency>
        <groupId>com.taobao.pamirs.schedule</groupId>
        <artifactId>TBSchedule</artifactId>
        <version>3.3.3.2</version>
    </dependency>           

TBSchedule有三種引入方式:

  1. 通過ScheduleConsole引入

TBSchedule随着宿主排程應用部署到伺服器後,可以通過Web浏覽器的方式通路其提供監控平台。

第一步,初始化ZooKeeper

淘寶分布式排程架構TBSchedule

第二步,建立排程政策

淘寶分布式排程架構TBSchedule

第三步,建立排程任務

淘寶分布式排程架構TBSchedule

第四步,監控排程任務

淘寶分布式排程架構TBSchedule

2、通過原生Java引入

// 初始化Spring
        ApplicationContext ctx = new FileSystemXmlApplicationContext(
                "spring-config.xml");

        // 初始化排程工廠
        TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();

        Properties p = new Properties();
        p.put("zkConnectString", "127.0.0.1:2181");
        p.put("rootPath", "/taobao-schedule/train_worker");
        p.put("zkSessionTimeout", "60000"); 
        p.put("userName", "train_dev");
        p.put("password", " train_dev ");
        p.put("isCheckParentPath", "true");

        scheduleManagerFactory.setApplicationContext(ctx);

        scheduleManagerFactory.init(p); 

                // 建立任務排程任務的基本資訊
String baseTaskTypeName = "DemoTask";
                ScheduleTaskType baseTaskType = new ScheduleTaskType();
                baseTaskType.setBaseTaskType(baseTaskTypeName);
                baseTaskType.setDealBeanName("demoTaskBean");
                baseTaskType.setHeartBeatRate(10000);
                baseTaskType.setJudgeDeadInterval(100000);
                baseTaskType.setTaskParameter("AREA=BJ,YEAR>30");
                baseTaskType.setTaskItems(ScheduleTaskType.splitTaskItem(
                "0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4}," +
                "4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8}," +
                "8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"));
        baseTaskType.setFetchDataNumber(500);
        baseTaskType.setThreadNumber(5);
        this.scheduleManagerFactory.getScheduleDataManager()
                .createBaseTaskType(baseTaskType);
        log.info("建立排程任務成功:" + baseTaskType.toString());

        // 建立任務的排程政策
        String taskName = baseTaskTypeName;
        String strategyName =taskName +"-Strategy";
        try {
            this.scheduleManagerFactory.getScheduleStrategyManager()
                    .deleteMachineStrategy(strategyName,true);
        } catch (Exception e) {
            e.printStackTrace();
        }
        ScheduleStrategy strategy = new ScheduleStrategy();
        strategy.setStrategyName(strategyName);
        strategy.setKind(ScheduleStrategy.Kind.Schedule);
        strategy.setTaskName(taskName);
        strategy.setTaskParameter("china");

        strategy.setNumOfSingleServer(1);
        strategy.setAssignNum(10);
        strategy.setIPList("127.0.0.1".split(","));
        this.scheduleManagerFactory.getScheduleStrategyManager()
                .createScheduleStrategy(strategy);
        log.info("建立排程政策成功:" + strategy.toString());           

3、通過Spring容器引入

<!-- 初始化ZooKeeper -->  
<bean id="scheduleManagerFactory"
            class="xx.xx.TBScheduleManagerFactory">
<property name="zkConfig">
<map>
    <entry key="zkConnectString" value="127.0.0.1:2181" />
    <entry key="rootPath" value="/taobao-schedule/train_worker" />
    <entry key="zkSessionTimeout" value="60000" />
    <entry key="userName" value="train_dev" />
    <entry key="password" value="train_dev" />
    <entry key="isCheckParentPath" value="true" />
</map>
</property> 
</bean>
<!-- 配置排程政策 淩晨1點到3點執行 -->
<bean id="abstractDemoScheduleTask" class="com.xx.core.TBSchedule.InitScheduleTask" abstract="true">
<property name="scheduleTaskType.heartBeatRate" value="10000" />
<property name="scheduleTaskType.judgeDeadInterval" value="100000" />
<property name="scheduleTaskType.permitRunStartTime" value="0 0 1 * * ?"/> 
<property name="scheduleTaskType.permitRunEndTime" value="0 0 3 * * ?"/>  
<property name="scheduleTaskType.taskParameter" value="AREA=BJ,YEAR>30" />
<property name="scheduleTaskType.sleepTimeNoData" value="60000"/>
<property name="scheduleTaskType.sleepTimeInterval" value="60000"/>
<property name="scheduleTaskType.fetchDataNumber" value="500" />
<property name="scheduleTaskType.executeNumber" value="1" />
<property name="scheduleTaskType.threadNumber" value="5" />
<property name="scheduleTaskType.taskItems"> 
<list>
        <value>0:{TYPE=A,KIND=1}</value>
        <value>1:{TYPE=A,KIND=2}</value>
        <value>2:{TYPE=A,KIND=3}</value>
        <value>3:{TYPE=A,KIND=4}</value>
        <value>4:{TYPE=A,KIND=5}</value>
        <value>5:{TYPE=A,KIND=6}</value>
        <value>6:{TYPE=A,KIND=7}</value>
        <value>7:{TYPE=A,KIND=8}</value>
        <value>8:{TYPE=A,KIND=9}</value>
        <value>9:{TYPE=A,KIND=10}</value>
    </list>
</property>
<property name="scheduleStrategy.kind" value="Schedule" />
<property name="scheduleStrategy.numOfSingleServer" value="1" />
<property name="scheduleStrategy.assignNum" value="10" />   
    <property name="scheduleStrategy.iPList">
        <list>
            <value>127.0.0.1</value>
        </list>
    </property>
    </bean>        
<!-- 配置排程任務 -->
<bean id="demoTask" class="com.xx.worker.task.DemoTask" parent="abstractDemoScheduleTask">
<property name="scheduleTaskType.baseTaskType" value="demoTask" />
<property name="scheduleTaskType.dealBeanName" value="demoTaskBean" />
<property name="scheduleStrategy.strategyName" value="demoTaskBean-Strategy" />
<property name="scheduleStrategy.taskName" value="demoTaskBean" />
</bean> 

            排程任務具體實作 DemoTask.java
 /**
 * DemoTask任務類
 */
public class DemoTask  mplements
        IScheduleTaskDealSingle,TScheduleTaskDeal {

 /**
  * 資料采集
  * @param taskItemNum--配置設定的任務項 taskItemList--總任務項 
  *        eachFetchDataNum--采集任務數量
  */
    @Override
    public List<DemoTask> selectTasks(String taskParameter,
            String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList,
            int eachFetchDataNum) throws Exception {
        List<DemoTask> taskList = new LinkedList<DemoTask>();
        //用戶端根據條件進行資料采集start

        //用戶端根據條件進行資料采集end
        return rt;
    }

/**
  * 資料處理
  */
    @Override
    public boolean execute(DemoTask task, String ownSign)
            throws Exception {
        //用戶端pop任務進行處理start

        //用戶端pop任務進行處理end
        return true;
    }
}           

其實我們看對于TBSchedule用戶端的使用非常簡單,初始化ZooKeeper、配置排程政策、配置排程任務,對排程任務進行具體實作,就這幾個步驟。現在可以慶祝下了,你又掌握了一個優秀開源架構的設計思想和使用方式。

四、TBSchedule挑戰

  1. 目前ScheduleConsole監控頁面過于簡單,需完善UI設計,提高使用者體驗。
  2. 支援Zookeeper叢集自動切換,避免ZooKeeper服務的叢集單點故障。
  3. 原生ZooKeeper操作替換為Curator,Curator對ZooKeeper進行了一次包裝,對原生ZooKeeper的操作做了大量優化,Client和Server之間的連接配接可能出現的問題處理等等,可以進一步提高TBSchedule的高可用。
  4. TBSchedule的幫助文檔較少,網上的資料基本是千篇一律,希望有更多的愛好者加入進來。

繼續閱讀