背景
在容器技術普及之前,事件驅動在資料庫領域中被廣泛使用。這個概念模型很簡單:每當添加、更改或删除資料時,會觸發一個事件來執行各種操作。事件驅動的方式可以在非常短的時延内,完成後續動作的執行。事件驅動架構的核心是對系統上的各種事件做出反應并執行相應的動作。彈性伸縮已成為幾乎所有雲平台中不可或缺的組成部分。Kubernetes中,容器水準伸縮器HPA(Horizontal Pod Autoscaler)是最常用的應用彈性方案。容器水準伸縮的核心是基于資源使用率與預設的門檻值水位之間的關系,來确認伸縮的計劃。容器水準伸縮的方式具有使用簡單、資源名額豐富等特點,但是它對于需要即時彈性的場景,尤其是對基于事件源進行離線作業支撐不足。ACK提供了ack-keda來提供事件驅動彈性能力,事件驅動彈性适用于音視訊離線轉碼、事件驅動作業、流式資料處理等場景。
事件驅動彈性原理
ACK通過增強版本的ack-keda來提供事件驅動彈性能力,下圖是ack-keda的基本原理。

ack-keda會從事件源中進行資料的周期性消費。當消息出現堆積,即可秒級觸發一個批次的離線任務伸縮。下一個周期到來時,會異步進行下一個批次的作業伸縮。ack-keda具有以下特性:
-
豐富的事件源
ack-keda内置支援Kafka、MySQL、PostgreSQL、Rabbitmq、Redis等多種内置資料源。同時支援從客戶自定義的事件源擷取事件并進行Job或者pod次元的彈性縮放。
-
離線任務的并發控制
對于大規模的離線作業而言,底層管控的穩定性會面臨比較大的挑戰,需要提供資源、額度、API請求的整體控制。ack-keda提供了單批次、總批次的任務并發控 制,保障系統的穩定性。
-
結束任務後自動清理中繼資料&&支援任務回溯
大規模離線作業執行完畢後,會留存大量的中繼資料資訊。中繼資料資訊的堆積會造成APIServer的穩定性下降,造成叢集的性能下降、穩定性不足,甚至可能影響其 他的業務。ack-keda會在任務執行結束後自動清理中繼資料,降低中繼資料的量級。同時,ack-keda也支援保留一些執行失敗的Job,便于回溯,定位原因。
事件驅動彈性轉碼案例
在本案例中,我們準備一個簡單的轉碼作業,當有一個新任務到來的時候會向mongoDB插入一條類似下面的資料
{"type":"mp4","state":"waiting","createTimeStamp":"1610332940","fileName":"World and peace","endTimeStamp":"","uuid":"1fae72ff-3239-42f5-af97-04711d8007e8"}
,此時,容器服務的事件驅動控制器會從資料庫中查詢到狀态為
"state":"waiting"
的作業,彈出與任務數目比對的Job Pod來承載轉碼作業,完成轉碼業務并将資料中的state字段從之前的waiting修改成finished。同時Job完成後,自動清理,降低中繼資料對APIServer帶來的壓力,極大的減輕開發者的負擔。
1.安裝事件驅動彈性控制器 - ack-keda
- 登入阿裡雲容器服務kubernetes控制台,點選左側邊欄的應用市場,搜尋ack-keda
雲原生事件驅動彈性轉碼方案解析 - 選擇對應叢集,點選部署,部署到該叢集
雲原生事件驅動彈性轉碼方案解析 - 選擇左側邊欄的工作負載,選擇無狀态服務,選擇kube-system 命名空間,确認ack-keda部署成功
雲原生事件驅動彈性轉碼方案解析
2.部署基于mongoDB事件源驅動彈性示例
1.部署mongoDB
- 建立mongoDB.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mongodb
spec:
replicas: 1
selector:
matchLabels:
name: mongodb
template:
metadata:
labels:
name: mongodb
spec:
containers:
- name: mongodb
image: mongo:4.2.1
imagePullPolicy: IfNotPresent
ports:
- containerPort: 27017
name: mongodb
protocol: TCP
---
kind: Service
apiVersion: v1
metadata:
name: mongodb-svc
spec:
type: ClusterIP
ports:
- name: mongodb
port: 27017
targetPort: 27017
protocol: TCP
selector:
name: mongodb
- 執行
部署到叢集。kubectl apply -f mongoDB.yaml -n mongodb
2.mongoDB建立User
// 建立使用者
kubectl exec -n mongodb mongodb-xxxxx -- mongo --eval 'db.createUser({ user:"test_user",pwd:"test_password",roles:[{ role:"readWrite", db: "test"}]})'
// 登陸認證
kubectl exec -n mongodb mongodb-xxxxx -- mongo --eval 'db.auth("test_user","test_password")'
// 建立collection
kubectl exec -n mongodb mongodb-xxxxx -- mongo --eval 'db.createCollection("test_collection")'
3.部署TriggerAuthentication和ScaledJob
- 建立TriggerAuthentication
TriggerAuthentication是用來登入mongoDB查詢資料時認證使用,TriggerAuthentication中的secretTargetRef字段會将指定Secret中的資料讀取到ack-keda中,完成對Mongo的登入認證。
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: mongodb-trigger
spec:
secretTargetRef:
- parameter: connectionString
name: mongodb-secret
key: connect
---
apiVersion: v1
kind: Secret
metadata:
name: mongodb-secret
type: Opaque
data:
connect: bW9uZ29kYjovL3Rlc3RfdXNlcjp0ZXN0X3Bhc3N3b3JkQG1vbmdvZGItc3ZjLm1vbmdvZGIuc3ZjLmNsdXN0ZXIubG9jYWw6MjcwMTcvdGVzdA==
-
到叢集。kubectl apply -f auth.yaml -n mongodb-test
- 建立ScaledJob
ScaledJob主要用來配置Job模闆以及指定查詢的資料庫及查詢表達式等,這裡我們配置的是從test資料庫中的test_collection中,查詢滿足
{"type":"mp4","state":"waiting"}
的待轉碼資料。
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: mongodb-job
spec:
jobTargetRef:
// Job模闆配置
template:
spec:
containers:
- name: mongo-update
image: registry.cn-hangzhou.aliyuncs.com/carsnow/mongo-update:v6
args:
- --connectStr=mongodb://test_user:[email protected]:27017/test
- --dataBase=test
- --collection=test_collection
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
pollingInterval: 15
maxReplicaCount: 5
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 10
triggers:
- type: mongodb
metadata:
dbName: test //要查詢的資料庫
collection: test_collection //要查詢的collection
query: '{"type":"mp4","state":"waiting"}' //會對查詢轉碼類型為mp4且狀态是waiting的資料拉起job進行處理
queryValue: "1"
authenticationRef:
name: mongodb-trigger
-
kubectl apply -f scaledJob.yaml -n mongodb-test
4.插入待轉碼業務資料
// 插入5條待轉碼資料
kubectl exec -n mongodb mongodb-xxxxx -- mongo --eval 'db.test_collection.insert([
{"type":"mp4","state":"waiting","createTimeStamp":"1610352740","fileName":"My Love","endTimeStamp":"","uuid":"1gae72ff-3239-42f5-af97-04711d8007e8"},
{"type":"mp4","state":"waiting","createTimeStamp":"1610350740","fileName":"Harker","endTimeStamp":"","uuid":"1gae72ff-3239-42f5-af97-04711d8007e8"},
{"type":"mp4","state":"waiting","createTimeStamp":"1610152940","fileName":"The World","endTimeStamp":"","uuid":"1gae72ff-3239-42f5-af97-04711d87767e8"},
{"type":"mp4","state":"waiting","createTimeStamp":"1610390740","fileName":"Mother","endTimeStamp":"","uuid":"1gae72ff-3239-42f5-af97-04799d8007e8"},
{"type":"mp4","state":"waiting","createTimeStamp":"1610344740","fileName":"Jagger","endTimeStamp":"","uuid":"1gae72ff-3239-42f5-af97-04711d80099e8"},
])'
5.檢視Job動态
// watch job
watch -n 1 kubectl get job -n mongodb-test
可以看到成功擴充出5個Job。此時再登入資料庫,觀察轉碼業務狀态,可以看到資料狀态已經從waiting變成了finished。
寫在最後
本文介紹的轉碼業務實際也是我們在日常場景經常會遇到的一個需求,看得出來ack-keda的使用,相對來說還是比較容易的,而且實際效果也能滿足我們日常的需求。我們最近在keda社群的基礎上,新增了對于mongoDB事件源的支援,并已PR到社群,至此,内置的事件源已經能夠滿足我們絕大部分事件驅動場景。如果您想了解更多,請點選檢視
keda社群。