
作者|林俊(萬念)
來源|爾達 Erda 公衆号
Erda Pipeline 是端點自研、用 Go 編寫的一款企業級流水線服務。截至目前,已經為衆多行業頭部客戶提供傳遞和穩定的服務。
為什麼我們堅持自研,而不用 jenkins 等産品呢?在當時,至少有以下幾點理由:
- 時至今日,開源社群仍沒有一個事實上的流水線标準
- K8s、DC/OS 等的 Job 實作都偏弱,上下文傳遞等缺失,不滿足我們的需求,更不要說 Flow 了
- 自研能更快地響應業務需求,進行定制化開發
作為基礎服務,Pipeline 在 Erda 内部支撐了 CI/CD、快資料平台、自動化測試平台、SRE 運維鍊路等産品化場景。本文就從幾個方面來介紹一下 Pipeline。
為什麼會有 Pipeline
這就需要從應用建構開始說起。Pipeline 的前身是 Packer 和 CI。
Packer
Erda 最開始是端點内部使用的 PaaS 平台。從 2017 年開始,Erda 就管理了公司所有的研發項目。項目下每個應用都逃不開
代碼 -> 編譯 -> 鏡像制作 -> 部署
的标準流程。這個時候我們開發了 Packer,顧名思義,它是一個專門負責
打包
的元件。使用者需要提供 Dockerfile,這在當時還是有着較高學習成本的。
CI
随着 CI/CD(持續內建、持續傳遞)概念的深入人心,我們也推出了 Packer 的更新版 CI 。同時,基礎設施即代碼(IaC)的理念也在這裡得到了實踐:通過 erda.yaml 1.0 文法同時聲明應用的微服務架構和建構過程。
在使用者體驗上,我們不再直接暴露 Dockerfile,而是把最佳實踐以 BuildPack 大禮包的方式給到使用者,使用者甚至不需要聲明應用的開發語言和建構方式,就可以通過 BuildPack 的自動探測和識别,完成 CI/CD 流程。
受限于單容器的運作方式,當時我們也遇到了一些問題,譬如把 CI 建構過程自定義能力開放、建構環境多版本問題等,這些問題在 Pipeline 裡都迎刃而解。
Pipeline
今天回過頭來看,從 CI 更新到 Pipeline 是一個很自然的過程:因為 CI/CD 本身就是一個很标準的流程,我們完全可以抽象出一個更通用的流程引擎,這就是 Pipeline。CI/CD 成為了 Pipeline 最開始支撐的場景。
在設計之初,我們就做了以下改進:
- 對外:通過清晰易用的 pipeline.yaml 文法,降低使用者的上手成本。
- 對内:抽象出任務定義,配合 ActionExecutor Plugin Mechenism(任務執行器插件機制),很友善地對接各個單任務執行平台,譬如 DC/OS Metronome、K8s Job、Flink/Spark Job 等。
- 由 Pipeline 提供一緻、強大的流程編排能力。
Pipeline 功能特性
Pipeline 有許多靈活、強大的功能,譬如:
- 配置即代碼,通過 pipeline.yaml 文法描述流程,基于 Stage 文法簡化編排複雜度。
- 豐富的擴充市場,平台内置超過百款開箱即用的 Action,滿足大部分日常場景;同時可輕松擴充你自己的 Action。
- 可視化編輯,通過圖形界面互動快速配置流水線。
- 支援嵌套流水線,在流水線級别進行複用,組合出更強大的流水線。
- 靈活的執行政策,包括串并行、循環、分支政策、逾時、人工确認等。
- 支援工作流優先隊列,優先級可實時調整,保證高優先級流水線優先執行。
- 多元度的重試機制,支援斷點重試、全流程重試。
- 定時流水線,同時提供強大的定時補償功能。
- 動态配置,支援
和值
兩種類型,均支援加密存儲,確定資料安全性。檔案
- 上下文傳遞,後置任務可以引用前置任務的
值
。檔案
- 開放的 OpenAPI 接口,友善第三方系統快速接入。
- ······
Pipeline 架構
如上圖所示,Pipeline 支援 UI / OPENAPI / CLI 多種方式進行互動。
Pipeline 本身支援水準擴充,保證高可用,還可以将其劃分為:服務層、核心層和引擎層。下面我們詳細介紹一下。
服務層
- yaml parser 解析流程定義檔案,支援靈活的變量文法。例如上下文值引用:
;配置管理引用:${{ outputs.preTaskName.key }}
等。${{ configs.key }}
- 對接擴充市場擷取擴充能力。
核心層
- Cron 守護程序。
- EventManager 抽象内部事件發送,使用擴充卡模式解耦監控名額上報、發送 ws 消息、支援 webhook 等。
- AOP 擴充點機制(借鑒 Spring),把代碼關鍵節點進行暴露,友善開發同學在不修改核心代碼的前提下定制流水線行為。這個能力後續我們還會開放給調用方,包括使用者,支援他們去做一些有意思的事情。
目前許多有意思的功能都是通過擴充點機制實作的,譬如自動化測試報告嵌套生成、隊列彈出前檢查、接口測試 Cookie 保持等:
引擎層
引擎層包括:
- 流程推進器(Reconciler)
- 優先隊列管理器
- 任務執行器插件機制
具體内容在下一節會展開講解。
中間件依賴
我們盡可能做到簡化中間件依賴,使部署更簡單。
- 使用 MySQL 做資料持久化。
- 使用 etcd watch 功能實作多執行個體狀态同步以及分布式鎖。
- 使用 etcd key ttl 實作資料 defer GC。
流水線是如何被推進的
在引擎側,pipeline.yaml 被解析為 DAG(Directed Acyclic Graph,有向無環圖) 結構後被推進。
換句話說,引擎并不認識、也不關心 pipeline.yaml 文法,使用者側完全可以提供多種多樣的文法友善不同使用者使用,隻需要最終能被轉換成 Pipeline 簡單封裝過的 DAG 結構。
Pipeline 級别由推進器 Reconciler 根據 DAG 計算出目前可被推進的任務,每個任務異步去執行推進邏輯。
任務的推進由 TaskFramework 處理,其中抽象出
prepare -> create -> start -> queue -> wait
标準步驟。當有需要時也可以很友善地進行标準擴充。
當任意一個任務推進完畢時,會再次遞歸調用 reconcile 方法去重複上述流程,直到流程整體執行完畢。
Reconciler 中 通過 DAG 計算可排程任務代碼如下:
// getSchedulableTasks return the list of schedulable tasks.
// tasks in list can be schedule concurrently.
func (r *Reconciler) getSchedulableTasks(p *spec.Pipeline, tasks []*spec.PipelineTask) ([]*spec.PipelineTask, error) {
// construct DAG
dagNodes := make([]dag.NamedNode, 0, len(tasks))
for _, task := range tasks {
dagNodes = append(dagNodes, task)
}
_dag, err := dag.New(dagNodes,
// pipeline DAG 中目前可以禁用任意節點,即 dag.WithAllowMarkArbitraryNodesAsDone=true
dag.WithAllowMarkArbitraryNodesAsDone(true),
)
if err != nil {
return nil, err
}
// calculate schedulable nodes according to dag and current done tasks
schedulableNodeFromDAG, err := _dag.GetSchedulable((&spec.PipelineWithTasks{Tasks: tasks}).DoneTasks()...)
if err != nil {
return nil, err
}
......
}
ActionExecutor 插件機制
把複雜留給自己,把簡單留給别人。
在前文我們說到:由流水線提供靈活、一緻的流程編排能力。它的前提是單個任務的執行已經被很好的抽象了。
在 Pipeline 中,我們對一個任務執行的抽象是 ActionExecutor:
type ActionExecutor interface {
Kind() Kind
Name() Name
Create(ctx context.Context, action *spec.PipelineTask) (interface{}, error)
Start(ctx context.Context, action *spec.PipelineTask) (interface{}, error)
Update(ctx context.Context, action *spec.PipelineTask) (interface{}, error)
Exist(ctx context.Context, action *spec.PipelineTask) (created bool, started bool, err error)
Status(ctx context.Context, action *spec.PipelineTask) (apistructs.PipelineStatusDesc, error)
// Optional
Inspect(ctx context.Context, action *spec.PipelineTask) (apistructs.TaskInspect, error)
Cancel(ctx context.Context, action *spec.PipelineTask) (interface{}, error)
Remove(ctx context.Context, action *spec.PipelineTask) (interface{}, error)
}
是以,一個執行器隻要實作 單個任務 的 建立、啟動、更新、狀态查詢、删除 等基礎方法,就可以注冊成為一個 ActionExecutor。
恰當的任務執行器抽象,使得 Batch/Streaming/InMemory Job 的配置和使用方式完全一緻,批流一體,對使用者屏蔽底層細節,做到無感覺切換。在同一條流水線中,可以混用各種 ActionExecutor。
排程時,Pipeline 根據任務類型和叢集資訊,将任務排程到對應的任務執行器上。
目前我們已經擁有許多的 ActionExecutor:
插件化的開發機制,使我們在未來對接其他任務引擎也變得非常簡單,例如對接 Jenkins 成為一個 ActionExecutor。
這裡舉一個真實的例子:在自動化測試平台裡,之前每一個 API 都會啟動一個容器去執行,而容器的啟停最快也需要數秒,這和 API 接口正常毫秒級的耗時比起來,慢了幾個數量級。得益于 ActionExecutor 插件機制,我們快速開發了基于記憶體的 API-Test 任務執行器,很快就解決了這個問題,使用者不需要做任何調整,節省了很多時間成本。
更友好的使用者接入層 pipeline.yaml
pipeline.yaml 是 IaC 的一個實踐,我們通過 YAML 格式描述流水線定義,基于 Stage 文法簡化編排複雜度。
一個簡單的示例如下所示:
version: 1.1
cron: 0 */10 * * * ?
# stage 表示 階段,多個 stage 串行成為 stages
stages:
# 一個 stage 内包含多個 并行 的 Action
- stage:
- git-checkout: # Action 類型
params:
depth: 1
- stage:
- buildpack:
alias: backend
params:
context: ${{ dirs.git-checkout }}
resources:
cpu: 0.5
mem: 2048
- custom-script:
image: centos:7
commands: # 支援直接執行指令
- sleep 5
- echo hello world
- cat ${{ dirs.git-checkout }}/erda.yml # 這裡通過 ${{ dirs.git-checkout }} 文法來引用檔案
以 Pipeline 為技術底座
目前,以 Pipeline 作為技術底座,向上支撐了:
- DevOps CI/CD 場景,包括 Erda 自身的持續內建和 Release 版本釋出。
- 快資料平台:工作流編排,批流一體,支援工作流優先級隊列,保證高優先級資料任務必須執行。至今已為多家世界 500 強企業和頭部客戶提供穩定服務。
- 自動化測試平台:測試流程編排,API(出參、斷言)、資料銀行等不同類型的任務統一編排。
- SRE 叢集運維鍊路。
- 提供無限擴充:基于 ActionExecutor 擴充機制和擴充市場。
開源架構更新
目前,Pipeline 所有代碼均已完成開源。我們正在進行的重構工作包括:
- 使用 Erda-Infra 微服務架構重新梳理功能子產品
- Pipeline 平台支援獨立部署,UI 自動适配
- 通過 ActionExecutor 插件機制支援使用者本地 Agent,充分利用本地資源
- 在 GitHub 上推出 Erda Cloud Pipeline App,提供免費的 CI 能力