天天看點

Spark源碼分析 -- SchedulableBuilder

schedulablebuilder就是對scheduleable tree的封裝, 

在pool層面(中間節點), 完成對taskset的排程(fifo, fair) 

在tasksetmanager 層面(葉子節點), 完成對taskset中task的排程(locality)以及track(retry)

用于封裝taskset, 主要提供對單個taskset内部的tasks的track和schedule 

是以主要的接口, 

resourceoffer, 對于一個resource offer, 如何schedule一個task來執行 

statusupdate, 對于task狀态的track

clusterscheduler上對于tasksetmanager的實作

1 addpendingtask 

locality, 在schedule時候需要考慮, 應該優先執行盡可能近的task 

所有未被執行的tasks, 都是pending task, 并且是安裝不同locality粒度存儲在hashmap中的 

pendingtasksforexecutor, hashmap, 每個executor被指定的task 

pendingtasksforhost,  hashmap, 每個instance被指定的task 

pendingtasksforrack, hashmap, 每個機架被指定的task 

pendingtaskswithnoprefs, arraybuffer, 沒有locality preferences的tasks, 随便在那邊執行 

allpendingtasks, arraybuffer, 所有的pending task 

speculatabletasks, 重複的task, 熟悉hadoop的應該容易了解 

可以繼續看下addpendingtask, 如何把task加到各個list上去

addpendingtask(index: int, readding: boolean = false) 

兩個參數, 

index, task的index, 用于從taskset中取得task 

readding, 表示是否新的task, 因為當executor失敗的時候, 也需要把task重新再加到各個list中, list中有重複的task是沒有關系的, 因為選取task的時候會自動忽略已經run的task

2 resourceoffer 

解決如何在taskset内部schedule一個task, 主要需要考慮的是locality, 直接看注釋 

其中比較意思的是, 對currentlocalityindex的維護 

初始時為0, process_local, 隻能選擇pendingtasksforexecutor 

每次調用resourceoffer, 都會計算和前一次task launch之間的時間間隔, 如果逾時(各個locality的逾時時間不同), currentlocalityindex會加1, 即不斷的放寬 

而代表前一次的lastlaunchtime, 隻有在resourceoffer中成功的findtask時會被更新, 是以邏輯就是優先選擇更local的task, 但當findtask總失敗時, 說明需要放寬 

但是放寬後, 當有比較local的task被選中時, 這個currentlocalityindex還會縮小, 因為每次都會把tasklocality指派給currentlocality

3 statusupdate 

應對statusupdate, 主要是通過在clusterscheduler中注冊的listener通知dagscheduler 

當然對于失敗的task, 還要再加到pending list裡面去

一種對schedulablequeue的抽象, 什麼是schedulable? 

注釋說的, 包含pools and tasksetmanagers, 這裡設計有問題, 你會發現pools和tasksetmanagers的核心接口完全不同, 雖然tasksetmanagers裡面也實作了這些接口, 但都是meanless的 

簡單了解成, 作者想要統一對待, 泛化pools和tasksetmanagers, 是以這樣做了

是以對于pool, 可以了解為tasksetmanagers的容器, 當然由于pool本身也是schedulable, 是以容器裡面也可以放pool 

核心接口getsortedtasksetqueue, 通過配置不同的schedulingalgorithm來排程tasksetmanagers(或pool)

是以注意那些fifo或fair都是用來排程taskset的, 是以spark排程的基礎是stage

上面說了pool裡面可以是tasksetmanagers也可以是pool, 這樣是不是可以形成tree 

schedulablebuilder就是對schedulable tree的封裝, 通過tasksetmanagers(葉節點)和pools(中間節點), 來生成schedulable tree 

這裡隻列出最簡單的fifo, 看不出tree的感覺 

對于fifo很簡單, 直接使用一個pool就可以, 把所有的taskset使用addschedulable加進去, 然後排序讀出來即可

這裡沒有列出fair的實作, 比較複雜, 後面再分析吧

繼續閱讀