Kubernetes 排程器
Kubernetes 是一個基于容器的分布式排程器,實作了自己的排程子產品。
在Kubernetes叢集中,排程器作為一個獨立子產品通過pod運作。從幾個方面介紹Kubernetes排程器。
排程器工作方式
Kubernetes中的排程器,是作為單獨元件運作,一般運作在Master中,和Master數量保持一緻。通過Raft協定選出一個執行個體作為Leader工作,其他執行個體Backup。 當Master故障,其他執行個體之間繼續通過Raft協定選出新的Master工作。
其工作模式如下:
- 排程器内部維護一個排程的pods隊列podQueue, 并監聽APIServer。
- 當我們建立Pod時,首先通過APIServer 往ETCD寫入pod中繼資料。
- 排程器通過Informer監聽pods狀态,當有新增pod時,将pod加入到podQueue中。
- 排程器中的主程序,會不斷的從podQueue取出的pod,并将pod進入排程配置設定節點環節
- 排程環節分為兩個步奏, Filter過濾滿足條件的節點 、 Prioritize根據pod配置,例如資源使用率,親和性等名額,給這些節點打分,最終選出分數最高的節點。
- 配置設定節點成功, 調用apiServer的binding pod 接口, 将
設定為所配置設定的那個節點。pod.Spec.NodeName
- 節點上的kubelet同樣監聽ApiServer,如果發現有新的pod被排程到所在節點,調用本地的dockerDaemon 運作容器。
- 假如排程器嘗試排程Pod不成功,如果開啟了優先級和搶占功能,會嘗試做一次搶占,将節點中優先級較低的pod删掉,并将待排程的pod排程到節點上。 如果未開啟,或者搶占失敗,會記錄日志,并将pod加入podQueue隊尾。
實作細節
kube-scheduling 是一個獨立運作的元件,主要工作内容在
Run函數 。
這裡面主要做幾件事情:
- 初始化一個Scheduler 執行個體
,傳入各種Informer,為關心的資源變化建立監聽并注冊handler,例如維護podQuenesched
- 注冊events元件,設定日志
- 注冊http/https 監聽,提供健康檢查和metrics 請求
- 運作主要的排程内容入口
。 如果設定sched.run()
,代表啟動多個執行個體,通過Raft選主,執行個體隻有當被選為master後運作主要工作函數--leader-elect=true
。sched.run
排程核心内容在
sched.run()
函數,它會啟動一個go routine不斷運作
sched.scheduleOne
, 每次運作代表一個排程周期。
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
我們看下
sched.scheduleOne
主要做什麼
func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
.... // do some pre check
scheduleResult, err := sched.schedule(pod)
if err != nil {
if fitError, ok := err.(*core.FitError); ok {
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
..... // do some log
} else {
sched.preempt(pod, fitError)
}
}
}
...
// Assume volumes first before assuming the pod.
allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
...
fo func() {
// Bind volumes first before Pod
if !allBound {
err := sched.bindVolumes(assumedPod)
if err != nil {
klog.Errorf("error binding volumes: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
}
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: scheduleResult.SuggestedHost,
},
})
}
}
在
sched.scheduleOne
中,主要會做幾件事情
- 通過
, 從podQuene中取出podsched.config.NextPod()
- 運作
,嘗試進行一次排程。sched.schedule
- 假如排程失敗,如果開啟了搶占功能,會調用
嘗試進行搶占,驅逐一些pod,為被排程的pod預留白間,在下一次排程中生效。sched.preempt
- 如果排程成功,執行bind接口。在執行bind之前會為pod volume中聲明的的PVC 做provision。
sched.schedule
是主要的pod排程邏輯
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
// Get node list
nodes, err := nodeLister.List()
// Filter
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
if err != nil {
return result, err
}
// Priority
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err != nil {
return result, err
}
// SelectHost
host, err := g.selectHost(priorityList)
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
FeasibleNodes: len(filteredNodes),
}, err
}
排程主要分為三個步奏:
- Filters: 過濾條件不滿足的節點
- PrioritizeNodes: 在條件滿足的節點中做Scoring,擷取一個最終打分清單priorityList
- selectHost: 在priorityList中選取分數最高的一組節點,從中根據round-robin 方式選取一個節點。
接下來我們繼續拆解, 分别看下這三個步奏會怎麼做
Filters
Filters 相對比較容易,排程器預設注冊了一系列的predicates方法, 排程過程為并發調用每個節點的predicates 方法。最終得到一個node list,包含符合條件的節點對象。
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
if len(g.predicates) == 0 {
filtered = nodes
} else {
allNodes := int32(g.cache.NodeTree().NumNodes())
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
checkNode := func(i int) {
nodeName := g.cache.NodeTree().Next()
// 此處會調用這個節點的所有predicates 方法
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
)
if fits {
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
// 如果目前符合條件的節點數已經足夠,會停止計算。
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
}
}
}
// 并發調用checkNode 方法
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
filtered = filtered[:filteredLen]
}
return filtered, failedPredicateMap, nil
}
值得注意的是, 1.13中引入了FeasibleNodes 機制,為了提高大規模叢集的排程性能。允許我們通過
bad-percentage-of-nodes-to-score
參數, 設定filter的計算比例(預設50%), 當節點數大于100個, 在 filters的過程,隻要滿足條件的節點數超過這個比例,就會停止filter過程,而不是計算全部節點。
舉個例子,當節點數為1000, 我們設定的計算比例為30%,那麼排程器認為filter過程隻需要找到滿足條件的300個節點,filter過程中當滿足條件的節點數達到300個,filter過程結束。 這樣filter不用計算全部的節點,同樣也會降低Prioritize 的計算數量。 但是帶來的影響是pod有可能沒有被排程到最合适的節點。
Prioritize
Prioritize 的目的是幫助pod,為每個符合條件的節點打分,幫助pod找到最合适的節點。同樣排程器預設注冊了一系列Prioritize方法。這是Prioritize 對象的資料結構
// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {
Name string
Map PriorityMapFunction
Reduce PriorityReduceFunction
// TODO: Remove it after migrating all functions to
// Map-Reduce pattern.
Function PriorityFunction
Weight int
}
每個PriorityConfig 代表一個評分的名額,會考慮服務的均衡性,節點的資源配置設定等因素。 一個 PriorityConfig 的主要Scoring過程分為 Map和Reduce,
- Map 過程計算每個節點的分數值
- Reduce 過程會将目前PriorityConfig的所有節點的打分結果再做一次處理。
所有PriorityConfig 計算完畢後,将每個PriorityConfig的數值乘以對應的權重,并按照節點再做一次聚合。
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
for i := range priorityConfigs {
var err error
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
}
})
for i := range priorityConfigs {
wg.Add(1)
go func(index int) {
defer wg.Done()
if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]);
}(i)
}
wg.Wait()
// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
for j := range priorityConfigs {
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
}
}
此外Filter和Prioritize 都支援extener scheduler 的調用,本文不做過多闡述。
現狀
目前kubernetes排程器的排程方式是Pod-by-Pod,也是目前排程器不足的地方。主要瓶頸如下
- kubernets目前排程的方式,每個pod會對所有節點都計算一遍,當叢集規模非常大,節點數很多時,pod的排程時間會非常慢。 這也是percentage-of-nodes-to-score 嘗試要解決的問題
- pod-by-pod的排程方式不适合一些機器學習場景。 kubernetes早期設計主要為線上任務服務,在一些離線任務場景,比如分布式機器學習中,我們需要一種新的算法gang scheduler,pod也許對排程的即時性要求沒有那麼高,但是送出任務後,隻有當一個批量計算任務的所有workers都運作起來時,才會開始計算任務。 pod-by-pod 方式在這個場景下,當資源不足時非常容易引起資源死鎖。
- 目前排程器的擴充性不是十分好,特定場景的排程流程都需要通過寫死實作在主流程中,比如我們看到的bindVolume部分, 同樣也導緻Gang Scheduler 無法在目前排程器架構下通過原生方式實作。
Kubernetes排程器的發展
社群排程器的發展,也是為了解決這些問題
- 排程器V2架構,增強了擴充性,也為在原生排程器中實作Gang schedule做準備。
- Kube-batch: 一種Gang schedule的實作 https://github.com/kubernetes-sigs/kube-batch
- poseidon: Firmament 一種基于網絡圖排程算法的排程器,poseidon 是将Firmament接入Kubernetes排程器的實作 https://github.com/kubernetes-sigs/poseidon
接下來,我們會分析一個具體的排程器方法實作,幫助了解拆解排程器的過程。 并且關注分析排程器的社群動态。