天天看點

TBSchedule源碼學習筆記-啟動過程TBSchedule源碼學習筆記-啟動過程

TBSchedule源碼學習筆記-啟動過程

TBSchedule基本概念及原理

概念介紹

TBSchedule是一個支援分布式的排程架構,能讓一種批量任務或者不斷變化的任務,被動态的配置設定到多個主機的JVM中,不同的線程組中并行執行。基于ZooKeeper的純Java實作,由Alibaba開源。

代碼實作

起步配置

/* (non-Javadoc)
     * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        InputStream in = TaskCenter.class.getClassLoader().getResourceAsStream("schedule-conf.properties");
        Properties p = new Properties();
        p.load(in);
        TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();
        scheduleManagerFactory.setApplicationContext(applicationContext);
        try {
            scheduleManagerFactory.init(p);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
           

TBScheduleManagerFactory(com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory)是個什麼?代碼裡對他的描述是“排程伺服器構造器”,隻關心他的啟動過程

public void init(Properties p) throws Exception {
        //這裡initialThread是個什麼,作用和意義?
        if(this.initialThread != null){
            this.initialThread.stopThread();
        }
        //lock是一個鎖(ReentrantLock)使用預設的非公平鎖
        this.lock.lock();
        try{
            this.scheduleDataManager = null;
            this.scheduleStrategyManager = null;
            ConsoleManager.setScheduleManagerFactory(this);
            if(this.zkManager != null){
                this.zkManager.close();
            }
            this.zkManager = new ZKManager(p);
            this.errorMessage = "Zookeeper connecting ......" + this.zkManager.getConnectStr();
            initialThread = new InitialThread(this);
            initialThread.setName("TBScheduleManagerFactory-initialThread");
            initialThread.start();
        }finally{
            this.lock.unlock();
        }
           

到這裡需要知道,initalThread是個什麼線程,該線程的作用?點開看一下run方法的實作

@Override
    public void run() {
        //factory就是這個排程伺服器
        facotry.lock.lock();
        try {
            int count =;
            while(facotry.zkManager.checkZookeeperState() == false){
                count = count + ;
                if(count %  == ){
                    facotry.errorMessage = "Zookeeper connecting ......" + facotry.zkManager.getConnectStr() + " spendTime:" + count *  +"(ms)";
                    log.error(facotry.errorMessage);
                }
                Thread.sleep();//貌似每20ms 檢測一次zk的連接配接狀态
                if(this.isStop ==true){
                    return;
                }
            }
            //如果連接配接成功則初始化資料
            facotry.initialData();
        } catch (Throwable e) {
             log.error(e.getMessage(),e);
        }finally{
            facotry.lock.unlock();
        }

    }
           

這個initalThread内部會競争factory的鎖,是以如果zk一直檢查失敗,鎖得不到釋放,有可能一直阻塞線程,導緻啟動項目啟動問題?while出口的isStop是什麼?isStop莫非是一個取消狀态?代表停止這個線程,這個線程可能更多的做一個zk狀态的啟動前自檢。

多慮了,後邊他的主線程會釋放鎖…..

scheduleDataManager是什麼?具體作用是

`com.taobao.pamirs.schedule.taskmanager.IScheduleDataManager

他是一個排程中心用戶端,啟動過程幾乎沒怎麼參與。

scheduleStrategyManager是什麼?類名

`com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK

官方沒有給出他的命名,暫且取名為排程政策管理器,提供了一些政策查詢和更新的功能。

ConsoleManager.setScheduleManagerFactory(this);,貌似與管理控制台有關?

實際使用中,控制台和排程項目是不相關的,那麼這個ConsoleManager是什麼?

然後剩下的就是初始化一個zk的管理器,最後釋放鎖

問題是內建tbschedule隻需這樣就行,剩下的都是一些zk配置操作。如果和zk相關應該包含兩個點:

1.在哪裡添加監視點的?即tbschedule如何響應 rootPath 配置的節點路徑下的相關變化的?

2.監視點的動作是怎樣的

目前為止值得看下去的就兩行,分别是

com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory

類下的init(Properties p)方法。

public void init(Properties p) throws Exception {
       //.......//
            ConsoleManager.setScheduleManagerFactory(this);
        //.......//   
           

還有

com.taobao.pamirs.schedule.strategy.InitialThread

類下的

public void run() {
        //......//
            facotry.initialData();
         //......//
    }
           

其中TBScheduleManagerFactory類的init方法裡的代碼行 更像一個setter方法,正常編碼時會避免在setter中做指派以外操作的。暫且不管

然後facotry.initialData(); 方法名告訴我該方法會初始化資料。

/**
     * 在Zk狀态正常後回調資料初始化
     * @throws Exception
     */
    public void initialData() throws Exception{
            //初始化zk管理器
            this.zkManager.initial();
            //初始化排程中心用戶端
            this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);
            //初始化排程政策管理器
            this.scheduleStrategyManager  = new ScheduleStrategyDataManager4ZK(this.zkManager);
            if (this.start == true) {
                // 注冊排程管理器
                this.scheduleStrategyManager.registerManagerFactory(this);
                if(timer == null){
                    timer = new Timer("TBScheduleManagerFactory-Timer");
                }
                if(timerTask == null){
                    timerTask = new ManagerFactoryTimerTask(this);
                    timer.schedule(timerTask, ,this.timerInterval);
                }
            }
    }
           

其中timer和timerTask定義如下:其中ManagerFactoryTimerTask繼承自

java.util.TimerTask

,是以會定時執行任務。這裡是2秒後開始執行,之後每2秒執行一次。

private java.util.Timer timer;
private com.taobao.pamirs.schedule.strategy.ManagerFactoryTimerTask timerTask;
           

排程任務ManagerFactoryTimerTask 内部做了些啥?看樣子是做了一些ZK連接配接狀态監測的操作,不斷重試連不上就重連,連上了就重新整理資訊。等等……重新整理?刷的是啥????

public void run() {
        try {

            Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
            if(this.factory.zkManager.checkZookeeperState() == false){
                if(count > ){
                   log.error("Zookeeper連接配接失敗,關閉所有的任務後,重新連接配接Zookeeper伺服器......");
                   this.factory.reStart();

                }else{
                   count = count + ;
                }
            }else{
                count = ;
                //重點是這裡、
                this.factory.refresh();
            }
        }  catch (Throwable ex) {
            log.error(ex.getMessage(), ex);
        } finally {
            //這個後續有什麼作用???
            factory.timerTaskHeartBeatTS = System.currentTimeMillis();
        }
    }
           

那麼initialData() 的執行流程就是這樣了?

1.初始化排程中心用戶端

2.初始化排程政策管理器

3.注冊排程管理器

4.啟動一個定時任務,這個定時任務會以2秒為一個周期對目前的zk狀态做檢查,如果zk連接配接失敗(重試次數5)會停止目前服務,否則重新整理排程伺服器

那麼問題來了,這個定時器裡所做的”重新整理”操作又在做什麼操作,哪些東西被刷了??

public void refresh() throws Exception {
        this.lock.lock();
        try {
            // 判斷狀态是否終止
            ManagerFactoryInfo stsInfo = null;
            boolean isException = false;
            try {
                //貌似是從政策管理器中拿到政策資訊
                stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid());
            } catch (Exception e) {
                isException = true;
                logger.error("擷取伺服器資訊有誤:uuid="+this.getUuid(), e);
            }
            if (isException == true) {
                try {
                    stopServer(null); // 停止所有的排程任務
                    this.getScheduleStrategyManager().unRregisterManagerFactory(this);
                } finally {
                    reRegisterManagerFactory();
                }
            } else if (stsInfo.isStart() == false) {
                stopServer(null); // 停止所有的排程任務
                this.getScheduleStrategyManager().unRregisterManagerFactory(
                        this);
            } else {
                reRegisterManagerFactory();
            }
        } finally {
            this.lock.unlock();
        }
    }
           

這個方法裡有幾個疑點:

1.getUuid() 拿到的是個啥?貌似是這個排程中心的唯一ID?沒有在目前的代碼邏輯看到對他的初始化?初始化在哪做的,對應官方的哪個名詞?

2.stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid()); 取到的資訊有哪些作用,資料來源是哪裡,源資料的格式是什麼?什麼情況下會抛出異常??

3.ManagerFactoryInfo對象的isStart()方法,聲明的是什麼開始狀态??

4.除去失敗的情況需要終止排程任務,那麼這個正确的情況reRegisterManagerFactory(); 做了哪些事情??

問題1:

關于getUuid(),uuid哪裡初始化來的?

一般有getter就有setter ,通過setUuid(String uuid) 方法,找到注入值的位置,發現在com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK類的public List registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception方法中有調用。

巧了正好這個registerManagerFactory(factory) 反複看見好幾遍了,他到底是個啥,幹點啥好呢,現在有了問題5

5.ScheduleStrategyDataManager4ZK 所謂排程政策管理器,起到了啥作用,registerManagerFactory方法在幹些什麼??

其他操作都省略,就看uuid他的生成政策,到底是個啥,能辨別個什麼的唯一性??

/**
     * 注冊ManagerFactory
     * @param managerFactory
     * @return 需要全部登出的排程,例如當IP不在清單中
     * @throws Exception
     */
    public List<String> registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception{
        //第一波進來肯定無值啊
        if(managerFactory.getUuid() == null){
            String uuid = managerFactory.getIp() +"$" + managerFactory.getHostName() +"$"+ UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
            //...省略...//
            managerFactory.setUuid(zkPath.substring(zkPath.lastIndexOf("/") + ));
        }
        //...省略...//
        return result;
    }
           

生成政策是當不存在uuid時才會生成uuid,這個uuid很容易發現是跟目前機器的ip還有hostname有關的,為了避免重複還追加了一個uuid,我想這裡可能是考慮到了一個機器下有多個項目內建tbschedule的情況吧

是以這裡uuid代表了

目前應用(注意是應用)在整個排程伺服器下的唯一ID,目前這個uuid在console(管理頁面)上也有所展現。

問題2:

知道了uuid是個什麼,那麼這個代碼

stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid());

字面意思可能就好了解了一些,貌似是拿應用的uuid去排程政策管理器中拿自己的排程政策?

public ManagerFactoryInfo loadManagerFactoryInfo(String uuid) throws Exception {
        //這裡會拼裝一個zk的路徑,帶有uuid哦
        String zkPath = this.PATH_ManagerFactory + "/" + uuid;
        //判斷是否存在這個節點,沒有相關節點就抛出異常?
        if(this.getZooKeeper().exists(zkPath, false)==null){
            throw new Exception("任務管理器不存在:" + uuid);
        }
        //拿到節點的存儲值
        byte[] value = this.getZooKeeper().getData(zkPath,false,null);
        ManagerFactoryInfo result = new ManagerFactoryInfo();
        result.setUuid(uuid);
        //如果值為null代表已啟動的??
        if(value== null){
            result.setStart(true);
        }else{
            //根據節點值來判斷是否start
            result.setStart(Boolean.parseBoolean(new String(value)));
        }
        return result;
    }
           

那麼可見會已uuid拼裝一個zk路徑,去判斷對應節點是否存在,如果不存在就抛出異常!如果存在就去判斷節點值(true/false),辨別了目前節點的啟動狀态。那麼問題又來了。

6.應用在排程政策管理器下的節點是什麼時候變更狀态的?

問題3:

貌似和問題6有關?

問題4:

reRegisterManagerFactory()

public void reRegisterManagerFactory() throws Exception{
        //重新配置設定排程器
        List<String> stopList = this.getScheduleStrategyManager().registerManagerFactory(this);
        for (String strategyName : stopList) {
            this.stopServer(strategyName);
        }
        //根據政策重新配置設定排程任務的機器
        this.assignScheduleServer();
        //
        this.reRunScheduleServer();
    }
           

重新注冊了一次排程政策管理器,what這個方法有傳回值??之前為什麼不對這個傳回值做處理呢????!是以這個registerManagerFactory(this) 方法變得愈發的混亂了,有必要打開詳細看看了。

/**
     * 注冊ManagerFactory
     * @param managerFactory
     * @return 需要全部登出的排程,例如當IP不在清單中
     * @throws Exception
     */
    public List<String> registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception{

        if(managerFactory.getUuid() == null){
            String uuid = managerFactory.getIp() +"$" + managerFactory.getHostName() +"$"+ UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
            //拼裝了一個zk節點的路徑,等等這個路徑的拼裝格式貌似見過,是以loadManagerFactoryInfo方法中資料節點是在這一步時候建立的了?
            String zkPath = this.PATH_ManagerFactory + "/" + uuid +"$";
            zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL_SEQUENTIAL);
            managerFactory.setUuid(zkPath.substring(zkPath.lastIndexOf("/") + ));
        }else{
            String zkPath = this.PATH_ManagerFactory + "/" + managerFactory.getUuid();
            if(this.getZooKeeper().exists(zkPath, false)==null){
                zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL);           
            }
        }
        // 方法的傳回值是這個list
        List<String> result = new ArrayList<String>();
        //貌似會加載所有政策,這個loadAllScheduleStrategy() 方法,會把 this.zkManager.getRootPath() +  "/strategy" 這個節點下的所有子節點全都傳回
        //然後将每個節點的内容取出,再調用這個方法ScheduleStrategy result = (ScheduleStrategy)this.gson.fromJson(valueString, ScheduleStrategy.class); 可以看作對json反序列化操作。
        //是以控制台操作建立的政策,是存儲在this.zkManager.getRootPath() +  "/strategy" 節點的子節點上的
        for(ScheduleStrategy scheduleStrategy:loadAllScheduleStrategy()){
            boolean isFind = false;
            //暫停或者不在IP範圍
            //猜測是要判斷政策在目前機器上是否可用,如果不可用則加入清單傳回,如果可用則在對應的政策節點下建立一個臨時節點
            if(ScheduleStrategy.STS_PAUSE.equalsIgnoreCase(scheduleStrategy.getSts()) == false &&  scheduleStrategy.getIPList() != null){
                for(String ip:scheduleStrategy.getIPList()){
                    //這裡就是為什麼控制太界面說“127.0.0.1或者localhost會在所有機器上運作” 的原因了
                    if(ip.equals("127.0.0.1") || ip.equalsIgnoreCase("localhost") || ip.equals(managerFactory.getIp())|| ip.equalsIgnoreCase(managerFactory.getHostName())){
                        //添加可管理TaskType
                        //建立一個臨時節點,這個臨時節點與之前的排程伺服器中生成的uuid有關。
                        //這一步建立的臨時節點如下文。
                        String zkPath = this.PATH_Strategy+"/"+ scheduleStrategy.getStrategyName()+ "/"+ managerFactory.getUuid();
                        if(this.getZooKeeper().exists(zkPath, false)==null){
                            zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL);           
                        }
                        isFind = true;
                        break;
                    }
                }
            }
            if(isFind == false){//清除原來注冊的Factory
                String zkPath = this.PATH_Strategy+"/"+ scheduleStrategy.getStrategyName()+ "/"+ managerFactory.getUuid();
                if(this.getZooKeeper().exists(zkPath, false)!=null){
                    ZKTools.deleteTree(this.getZooKeeper(), zkPath);
                    result.add(scheduleStrategy.getStrategyName());
                }
            }
        }
        return result;
    }
           

針對我的環境,此處rootPath配置為”/dsp_official_0928_wd/schedule”,其zk的政策節點為,可見其與通過控制台錄入的政策是對應的

[zk: localhost:2181(CONNECTED) 7] ls /dsp_official_0928_wd/schedule/strategy

[commonSyncAdvertiserTask, commonSyncCreativeTask]

按照loadAllScheduleStrategy() 方法的邏輯來說,會把這兩個節點上的内容都拿出來,反序列化成com.taobao.pamirs.schedule.strategy.ScheduleStrategy 對象

以/dsp_official_0928_wd/schedule/strategy/commonSyncAdvertiserTask 節點内容為例

{

“strategyName”: “commonSyncAdvertiserTask”,

“IPList”: [

“127.0.0.1”

],

“numOfSingleServer”: 0,

“assignNum”: 2,

“kind”: “Schedule”,

“taskName”: “commonSyncAdvertiserTask”,

“taskParameter”: “”,

“sts”: “resume”

}

某政策可用的應用節點:

[zk: localhost:2181(CONNECTED) 8] ls /dsp_official_0928_wd/schedule/strategy/commonSyncAdvertiserTask

[127.0.0.1 admadc01 FC8BBDAC658C4B00B0F47377E50A570D 0000000080,127.0.0.1 adm_adc02 AB1899D1EA4F4783B25EC4B4E04A6A79 0000000081]

通常情況下,政策應用節點的内容如下,但在registerManagerFactory此時還沒有初始化該節點的值,預設為null:

{

“strategyName”: “commonSyncAdvertiserTask”,

“uuid”: “127.0.0.1 admadc01 FC8BBDAC658C4B00B0F47377E50A570D$0000000080”,

“requestNum”: 0,

“currentNum”: 0,

“message”: “”

}

先不管如上節點内容的意義,其實了解上也簡單。

問題5:

是以registerManagerFactory() 方法就是在目前機器支援的政策配置上注冊目前機器節點用于同步,并傳回目前機器上所有不支援的政策

那麼回到 reRegisterManagerFactory() 方法上來,

public void reRegisterManagerFactory() throws Exception{
        //重新配置設定排程器
        //因為目前應用不支援這些政策,是以要在目前機器上停止這些政策的服務。
        List<String> stopList = this.getScheduleStrategyManager().registerManagerFactory(this);
        for (String strategyName : stopList) {
            this.stopServer(strategyName);
        }
        //根據政策重新配置設定排程任務的機器
        this.assignScheduleServer();
        this.reRunScheduleServer();
    }
  ```
  這裡迫切的需要知道以下兩行的含義。
  ```java
this.assignScheduleServer();
        this.reRunScheduleServer();
           

assignScheduleServer() 方法的官方解釋是 根據政策重新配置設定排程任務的機器 那麼配置設定政策是什麼呢?

1.加載所有本應用能夠運作的政策(注意上文說道的政策應用節點的資料格式,傳回預設格式)。

2.周遊1傳回政策清單,按政策taskType擷取到所有的可運作政策應用項清單

3.周遊跟目前factory執行個體相關的strategy,選舉出每個strategy的leader factory執行個體,由leader重新計算每個factory執行個體能夠分到的reqNum,即根據strategy身上的assignNum“numOfSingleServer,将assignNum平分給每個factory執行個體。

4.根據2的傳回結果(總的伺服器數量),和3的政策配置資訊,為2傳回的政策服務項配置設定值(修改zk政策應用節點的requestNum)

reRunScheduleServer() 方法沒有給出官方的解釋,這裡暫且認為其是為了重新運作排程服務,怎麼做的????

public void reRunScheduleServer() throws Exception{
    //拿到所有本機可運作的政策
        for (ScheduleStrategyRunntime run : this.scheduleStrategyManager.loadAllScheduleStrategyRunntimeByUUID(this.uuid)) {
            //在緩存中取
            List<IStrategyTask> list = this.managerMap.get(run.getStrategyName());
            if(list == null){
                list = new ArrayList<IStrategyTask>();
                this.managerMap.put(run.getStrategyName(),list);
            }
            //如果已經有在運作的任務組,且目前任務組數大于已配置設定的最大任務組數量,那麼就停止最後添加的任務組
            while(list.size() > run.getRequestNum() && list.size() >){
                //what??先從清單裡删了????如果停止失敗呢????????
                IStrategyTask task  =  list.remove(list.size() - );
                    try {
                        task.stop(run.getStrategyName());
                    } catch (Throwable e) {
                        logger.error("登出任務錯誤:strategyName=" + run.getStrategyName(), e);
                    }
                }
           //不足,增加排程器
           ScheduleStrategy strategy = this.scheduleStrategyManager.loadStrategy(run.getStrategyName());
           //注意這個循環,建立線程組直到運作的線程組滿了
           while(list.size() < run.getRequestNum()){
               //厲害了這裡做了什麼????
               IStrategyTask result = this.createStrategyTask(strategy);
               if(null==result){
                   logger.error("strategy 對應的配置有問題。strategy name="+strategy.getStrategyName());
               }
               list.add(result);
            }
        }
    }
           

看起來該方法會周遊所有目前應用支援的政策,并針對政策建立一個叫做IStrategyTask的東西,并維護在一個map中,這個this.createStrategyTask(strategy); 好吧,官方有說明“建立排程伺服器”

/**
     * 建立排程伺服器
     * @param baseTaskType
     * @param ownSign
     * @return
     * @throws Exception
     */
    public IStrategyTask createStrategyTask(ScheduleStrategy strategy)
            throws Exception {
        IStrategyTask result = null;
        try{
            if(ScheduleStrategy.Kind.Schedule == strategy.getKind()){
                //就是這裡。。。。。。。。。
                String baseTaskType = ScheduleUtil.splitBaseTaskTypeFromTaskType(strategy.getTaskName());
                String ownSign =ScheduleUtil.splitOwnsignFromTaskType(strategy.getTaskName());
                result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);
            }else if(ScheduleStrategy.Kind.Java == strategy.getKind()){
                result=(IStrategyTask)Class.forName(strategy.getTaskName()).newInstance();
                result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());
            }else if(ScheduleStrategy.Kind.Bean == strategy.getKind()){
                result=(IStrategyTask)this.getBean(strategy.getTaskName());
                result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());
            }
        }catch(Exception e ){
            logger.error("strategy 擷取對應的java or bean 出錯,schedule并沒有加載該任務,請确認" +strategy.getStrategyName(),e);
        }
        return result;
    }
           

根據上文和日常配置,也知道來自于會走第一個條件分支。最後通過一句

result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);

傳回一個

com.taobao.pamirs.schedule.strategy.IStrategyTask

接口的實作。

繼續看

com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic

類。

public TBScheduleManagerStatic(TBScheduleManagerFactory aFactory,
            String baseTaskType, String ownSign,IScheduleDataManager aScheduleCenter) throws Exception {
        super(aFactory, baseTaskType, ownSign, aScheduleCenter);
    }
           

調用了父類

com.taobao.pamirs.schedule.taskmanager.TBScheduleManager

的構造函數,源碼裡面少有的對這個類寫了一大堆文檔注釋,先拿出來學習下。

/**
 * 1、任務排程配置設定器的目标:    讓所有的任務不重複,不遺漏的被快速處理。
 * 2、一個Manager隻管理一種任務類型的一組工作線程。
 * 3、在一個JVM裡面可能存在多個處理相同任務類型的Manager,也可能存在處理不同任務類型的Manager。
 * 4、在不同的JVM裡面可以存在處理相同任務的Manager 
 * 5、排程的Manager可以動态的随意增加和停止
 * 
 * 主要的職責:
 * 1、定時向集中的資料配置中心更新目前排程伺服器的心跳狀态
 * 2、向資料配置中心擷取所有伺服器的狀态來重新計算任務的配置設定。這麼做的目标是避免集中任務排程中心的單點問題。
 * 3、在每個批次資料處理完畢後,檢查是否有其它處理伺服器申請自己把持的任務隊列,如果有,則釋放給相關處理伺服器。
 *  
 * 其它:
 *   如果目前伺服器在處理目前任務的時候逾時,需要清除目前隊列,并釋放已經把持的任務。并向控制主動中心報警。
 * 
 * @author xuannan
 *
 */
           

第2點:一個Manager隻管理一種任務類型的一組工作線程。

通過分析啟動過程已經能夠确認。發現針對每一個政策和線程組都建立了一個

com.taobao.pamirs.schedule.strategy.IStrategyTask

執行個體。

第3點:在一個JVM裡面可能存在多個處理相同任務類型的Manager,也可能存在處理不同任務類型的Manager。

如果啟動過程初始化多個

com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory

對象時可能會有可能有”在一個JVM裡面可能存在多個處理相同任務類型的Manager”這種情況,還有會有requsetNum個相同任務類型的Manager在同一個JVM

第4點:在不同的JVM裡面可以存在處理相同任務的Manager

這種情況是肯定存在的,取決于政策的”IP位址(逗号分隔)”配置

第5點:排程的Manager可以動态的随意增加和停止

這個要自己去驗證了。

疑問:

1. List list = this.managerMap.get(run.getStrategyName()); 到底起到什麼作用?是為了統計目前JVM已經運作的線程組數量麼???

目前tbschedule啟動過程到這就結束了,2秒一次的zk健康檢查(心跳),并重新整理任務組(及時響應控制台設定),貌似到目前為止,為什麼這裡并沒有使用監視點來響應控制台的設定,而是自己主動查詢,出發點是什麼???