本文選自 《Knative 雲原生應用開發指南》 。
資源定義
我們先看一下 Parallel 資源定義,典型的 Parallel Spec描述如下:
apiVersion: messaging.knative.dev/v1alpha1
kind: Parallel
metadata:
name: me-odd-even-parallel
spec:
channelTemplate:
apiVersion: messaging.knative.dev/v1alpha1
kind: InMemoryChannel
cases:
- filter:
uri: "http://me-even-odd-switcher.default.svc.cluster.local/0"
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: me-even-transformer
- filter:
uri: "http://me-even-odd-switcher.default.svc.cluster.local/1"
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: me-odd-transformer
reply:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: me-event-display
主要包括如下 3 部分:
-
定義了一系列 filter 和 subscriber。對于每個條件分支:cases
- 首先判斷
, 當傳回事件時,調用 subscriber。filter和subscriber要求都是可通路的。filter
- subscriber 執行傳回的事件會發生到 reply。如果 reply 為空,則發送到
spec.reply
- 首先判斷
-
定義了目前 Parallel 中使用的Channel類型channelTemplate
-
定義了全局響應的目标函數。reply
邏輯架構如圖所示:
代碼實作
關鍵代碼實作如下:
- 首先為 Parallel 建立一個全局的 Channel。然後為每一個
建立一個過濾 Channelcase
- 在每個
中做了如下處理:case
- 為全局的 Channel建立一個 Subscription,訂閱條件為
資訊,并且把 reply 響應發送給目前filter
中的過濾 Channelcase
- 為過濾 Channel 建立一個 Subscription,将訂閱資訊發送給每個
中的case
。如果目前Reply
中沒有設定case
,則發送的全局Reply
Reply
- 為全局的 Channel建立一個 Subscription,訂閱條件為
func (r *Reconciler) reconcile(ctx context.Context, p *v1alpha1.Parallel) error {
p.Status.InitializeConditions()
// Reconciling parallel is pretty straightforward, it does the following things:
// 1. Create a channel fronting the whole parallel and one filter channel per branch.
// 2. For each of the Branches:
// 2.1 create a Subscription to the fronting Channel, subscribe the filter and send reply to the filter Channel
// 2.2 create a Subscription to the filter Channel, subcribe the subscriber and send reply to
// either the branch Reply. If not present, send reply to the global Reply. If not present, do not send reply.
// 3. Rinse and repeat step #2 above for each branch in the list
if p.DeletionTimestamp != nil {
// Everything is cleaned up by the garbage collector.
return nil
}
channelResourceInterface := r.DynamicClientSet.Resource(duckroot.KindToResource(p.Spec.ChannelTemplate.GetObjectKind().GroupVersionKind())).Namespace(p.Namespace)
if channelResourceInterface == nil {
msg := fmt.Sprintf("Unable to create dynamic client for: %+v", p.Spec.ChannelTemplate)
logging.FromContext(ctx).Error(msg)
return errors.New(msg)
}
// Tell tracker to reconcile this Parallel whenever my channels change.
track := r.resourceTracker.TrackInNamespace(p)
var ingressChannel *duckv1alpha1.Channelable
channels := make([]*duckv1alpha1.Channelable, 0, len(p.Spec.Branches))
for i := -1; i < len(p.Spec.Branches); i++ {
var channelName string
if i == -1 {
channelName = resources.ParallelChannelName(p.Name)
} else {
channelName = resources.ParallelBranchChannelName(p.Name, i)
}
c, err := r.reconcileChannel(ctx, channelName, channelResourceInterface, p)
if err != nil {
logging.FromContext(ctx).Error(fmt.Sprintf("Failed to reconcile Channel Object: %s/%s", p.Namespace, channelName), zap.Error(err))
return err
}
// Convert to Channel duck so that we can treat all Channels the same.
channelable := &duckv1alpha1.Channelable{}
err = duckapis.FromUnstructured(c, channelable)
if err != nil {
logging.FromContext(ctx).Error(fmt.Sprintf("Failed to convert to Channelable Object: %s/%s", p.Namespace, channelName), zap.Error(err))
return err
}
// Track channels and enqueue parallel when they change.
if err = track(utils.ObjectRef(channelable, channelable.GroupVersionKind())); err != nil {
logging.FromContext(ctx).Error("Unable to track changes to Channel", zap.Error(err))
return err
}
logging.FromContext(ctx).Info(fmt.Sprintf("Reconciled Channel Object: %s/%s %+v", p.Namespace, channelName, c))
if i == -1 {
ingressChannel = channelable
} else {
channels = append(channels, channelable)
}
}
p.Status.PropagateChannelStatuses(ingressChannel, channels)
filterSubs := make([]*v1alpha1.Subscription, 0, len(p.Spec.Branches))
subs := make([]*v1alpha1.Subscription, 0, len(p.Spec.Branches))
for i := 0; i < len(p.Spec.Branches); i++ {
filterSub, sub, err := r.reconcileBranch(ctx, i, p)
if err != nil {
return fmt.Errorf("Failed to reconcile Subscription Objects for branch: %d : %s", i, err)
}
subs = append(subs, sub)
filterSubs = append(filterSubs, filterSub)
logging.FromContext(ctx).Debug(fmt.Sprintf("Reconciled Subscription Objects for branch: %d: %+v, %+v", i, filterSub, sub))
}
p.Status.PropagateSubscriptionStatuses(filterSubs, subs)
return nil
}
示例示範
接下來讓我們通過一個執行個體具體了解一下 Parallel 。通過CronJobSource産生事件發送給
me-odd-even-parallel
Parallel, Parallel 會将事件發送給每個
case
, Case中通過 filter 不同的參數通路
me-even-odd-switcher
服務,
me-even-odd-switcher
服務會根據目前事件的建立時間随機計算0或1的值,如果計算值和請求參數值相比對,則傳回事件,否則不傳回事件。
- 若
比對成功,傳回事件到http://me-even-odd-switcher.default.svc.cluster.local/0
服務進行處理me-even-transformer
-
http://me-even-odd-switcher.default.svc.cluster.local/1
odd-transformer
不管哪個
case
處理完之後,将最終的事件發送給
me-event-display
服務進行事件顯示。
具體操作步驟如下:
建立 Knative Service
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: me-even-odd-switcher
spec:
template:
spec:
containers:
- image: villardl/switcher-nodejs:0.1
env:
- name: EXPRESSION
value: Math.round(Date.parse(event.time) / 60000) % 2
- name: CASES
value: '[0, 1]'
---
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: even-transformer
spec:
template:
spec:
containers:
- image: villardl/transformer-nodejs:0.1
env:
- name: TRANSFORMER
value: |
({"message": "we are even!"})
---
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: odd-transformer
spec:
template:
spec:
containers:
- image: villardl/transformer-nodejs:0.1
env:
- name: TRANSFORMER
value: |
({"message": "this is odd!"})
.
建立 Parallel
apiVersion: messaging.knative.dev/v1alpha1
kind: Parallel
metadata:
name: me-odd-even-parallel
spec:
channelTemplate:
apiVersion: messaging.knative.dev/v1alpha1
kind: InMemoryChannel
cases:
- filter:
uri: "http://me-even-odd-switcher.default.svc.cluster.local/0"
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: me-even-transformer
- filter:
uri: "http://me-even-odd-switcher.default.svc.cluster.local/1"
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: me-odd-transformer
reply:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: me-event-display
建立 CronJobSource 資料源
apiVersion: sources.eventing.knative.dev/v1alpha1
kind: CronJobSource
metadata:
name: me-cronjob-source
spec:
schedule: "*/1 * * * *"
data: '{"message": "Even or odd?"}'
sink:
apiVersion: messaging.knative.dev/v1alpha1
kind: Parallel
name: me-odd-even-parallel
檢視結果
運作之後可以看到類似如下結果:
kubectl logs -l serving.knative.dev/service=me-event-display --tail=30 -c user-container
️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 0.3
type: dev.knative.cronjob.event
source: /apis/v1/namespaces/default/cronjobsources/me-cronjob-source
id: 48eea348-8cfd-4aba-9ead-cb024ce16a48
time: 2019-07-31T20:56:00.000477587Z
datacontenttype: application/json; charset=utf-8
Extensions,
knativehistory: me-odd-even-parallel-kn-parallel-kn-channel.default.svc.cluster.local, me-odd-even-parallel-kn-parallel-0-kn-channel.default.svc.cluster.local
Data,
{
"message": "we are even!"
}
️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 0.3
type: dev.knative.cronjob.event
source: /apis/v1/namespaces/default/cronjobsources/me-cronjob-source
id: 42717dcf-b194-4b36-a094-3ea20e565ad5
time: 2019-07-31T20:57:00.000312243Z
datacontenttype: application/json; charset=utf-8
Extensions,
knativehistory: me-odd-even-parallel-kn-parallel-1-kn-channel.default.svc.cluster.local, me-odd-even-parallel-kn-parallel-kn-channel.default.svc.cluster.local
Data,
{
"message": "this is odd!"
}
結論
通過上面的介紹,相信大家對 Parallel 如何進行事件條件處理有了更多的了解,對于并行處理事件的場景下,不妨試試 Parallel。
“ 阿裡巴巴雲原生 關注微服務、Serverless、容器、Service Mesh 等技術領域、聚焦雲原生流行技術趨勢、雲原生大規模的落地實踐,做最懂雲原生開發者的技術圈。”