1. 背景介绍
我们都知道,
Knative
有两个主要的子项目:
Serving
和
Eventing
。其中 关于
Serviing
可以查看之前的一篇公众号文章 【超详细】深入探究 Knative 扩缩容的奥秘。
Eventing
将系统中的服务以事件驱动的方式松耦合的绑定在一起:即事件发送者不关注谁来消费事件,事件消费者也不关注事件是由谁产生的。
Eventing
中有多种事件组合的方式,比如:
- 最简单的
->Source
直接绑定Service
- 通过
与channel
订阅的方式subscriptions
- 并行事件流处理 的方式
,以及串行事件流处理的方式Parallel
Sequence
- 以及支持事件过滤的
也是本文将要重点介绍的模式Broker & Trigger
✅注:对于其他几种事件组合的方式,感兴趣的可以在官网查阅 https://knative.dev/docs/eventing/
2. 工作原理介绍
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5SZ0YWO0kzM2kDM1MGOlBTZhFGMwAzY3YTM4IWN5cDOz8CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
上图展示的是
Broker & Trigger
模型的中
Event
传递的基本方式,其中事件传递的格式是标准的
CLoudEvent
的格式。简要介绍下图中的流程:
- 事件由事件源
产生, Knative 支持多种事件源,如Source
、GitHub
、Heartbeats
、k8s
等,更多ContainerSource
可在官网查阅 https://knative.dev/docs/eventing/sources/,也可以自定义Source
,参考之前发的一篇公众号为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源。Source
- 图中 事件源产生
为type
的事件,发送到foo
,其中 有三个Broker
绑定了Trigger
,两个Broker
的Trigger
是filter
,也就是会关注type:foo
的事件,然后发送给对应的消费者。type=foo
-
和Service1
只是单纯的消费,并不回复事件Service3
-
收到事件后回复Service2
的事件,事件重新传递到 Broker上,此时只有一个 Trigger 过滤了type=bar
事件被传送到type:bar
消费。Service3
- 其中消费者是否回复事件是可选的。
关于图中的实例 yaml,可参考 附录
章节
3. 底层实现原理
上面介绍了
Broker & Trigger
的工作原理,现在从底层实现的角度进一步讲解。
Knative Eventing 事件传递过程中依赖消息通道 Channel,为了便于持久化,生产环境使用可持久化的消息通过( 本文以 NatsStreaming 为例) 来做事件的消息通道,如果是开发调试,则直接用 InMemoryChannel 即可(只会保存在内存中,不会持久化)
先上图
- 图中包含控制平面与数据平面,图中箭头:控制平面为实线,数据平面为虚线
- 实线的方框为 Knative 组件,虚线的方框为 k8s CR 资源
下面分别从数据平面和控制平面分别讲解
3.1 数据平面
EventSource------> Broker------>Trigger------->SubScriber
1. EventSource------> Broker
Broker
可以手动生成,或者通过给
namespace
打 label
eventing.knative.dev/injection:true
,让
sugar-controller
会自动生成对应的
Broker
实例
EventSource
一般通过
SinkURI
环境变量将
Broker
的 地址传入 。比如,
Broker
的地址为
http://broker-ingress.knative-eventing.svc.cluster.local/default/default
,该地址为
broker-ingress
的地址,而
broker-ingress
通过 请求
URL
的
path
可以得到
Broker
的信息,比如
/default/default
表示
default namespace
下名为
default
的
Broker
# kubectl get broker
NAME URL AGE READY REASON
default http://broker-ingress.knative-eventing.svc.cluster.local/default/default 46h True
2.Broker------>Trigger
broker-ingress
****得到
Broker
的信息(name namespace)之后,可以得到
Broker
的
status
信息,如下得到
channelAddress
的地址
(http://default-kne-trigger-kn-channel.default.svc.cluster.local)
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: default
namespace: default
status:
address:
url: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
annotations:
knative.dev/channelAPIVersion: messaging.knative.dev/v1beta1
knative.dev/channelAddress: http://default-kne-trigger-kn-channel.default.svc.cluster.local
knative.dev/channelKind: NatssChannel
knative.dev/channelName: default-kne-trigger
channelAddress
的地址是个svc 的地址,通过
externalName
指向
natss-ch-dispatcher
# kubectl get svc default-kne-trigger-kn-channel
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
default-kne-trigger-kn-channel ExternalName <none> natss-ch-dispatcher.knative-eventing.svc.cluster.local <none> 46h
natss-ch-dispatcher
负责将 消息(主题为
channel.Name + "." + channel.Namespace
) 发布到
Natss-Streaming
组件
3. Trigger------->SubScriber
natss-ch-dispatcher
****不仅负责发布,还负责订阅消息,
natss-ch-dispatcher
****watch
natssChannel
(见下面的
NatssChannel
), 获取
natssChannel
的
subscriber
的地址
subscriberUri
,通过
subscriberUri
发送消息给
broker-filter
,跟
broker-ingress
一样,
subscriberUri
的地址是
broker-filter
的 地址,通过请求
path
区分哪个
trigger
,请求 path :
/triggers/<trigger namespace>/<trigger name>/<trigger UID>
apiVersion: messaging.knative.dev/v1beta1
kind: NatssChannel
metadata:
name: default-kne-trigger
namespace: default
spec:
subscribers:
- generation: 1
replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger2/c0f3f1bb-9a25-4fb2-b803-fc5cd74e57da
uid: 9b4c991a-8912-4333-aa5a-caf053e5ee9c
- generation: 1
replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger3/28e7ba73-d514-4aea-a0de-7946fc21e7cc
uid: 56d00260-e69f-4d7a-bd05-c81479774a95
- generation: 1
replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger1/3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3
uid: 963fd671-097f-489a-a857-c1803ad3fb19
status:
address:
url: http://default-kne-trigger-kn-channel.default.svc.cluster.local
broker-filter
获取到
Trigger
的信息后,通过 根据 Trigger 的filter 将消息过滤,再决定是否将消息发给 对应的
subscriber
,
subscriber
可以从
Trigger
status
的
subcriberUri
获取到,见下图,对于
subscriber
Reply 的消息,
broker-filter
发送到
replyUri
地址上,
http://broker-ingress.knative-eventing.svc.cluster.local/default/default
,也就是发送给 Broker (实际是
broker-ingress
)。
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: trigger1
namespace: default
spec:
broker: default
filter:
attributes:
type: dev.knative.sources.ping
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: service1
namespace: default
status:
subscriberUri: http://service1.default.svc.cluster.local
至此数据面的通信流就完成了,接下来看控制面的数据流。
3.2 控制平面
控制平面主要看上图中的实线部分,此处再贴一下图
接下来按流程讲解
- 1.1
watchmt-broker-controller
的创建Broker
- 1.2
根据mt-broker-controller
的配置,创建对应的Broker
,此处为Channel
NatssChannel
- 1.3
watchNatss-ch-controller
的创建,更新NatssChannel
的服务端状态到Natss-Streaming
的 statusNatssChannel
- 1.4
创建 svc,externalname指向Natss-ch-controller
natss-ch-dispatcher
- 1.5
watchmt-broker-controller
的statusNatssChannel
- 1.6
更新mt-broker-controller
其中包含:Broker 的status,
的Broker
(address
的地址),broker-ingress
的channel
,供address
使用broker-ingress
- 2.1
watchmt-broker-controller
Trigger
- 2.2
根据mt-broker-controller
创建Trigger(含 subscriber 的信息)
其中包含subscription,
的地址subsciber(broker-filter)
的信息和 Broker
apiVersion: messaging.knative.dev/v1
kind: Subscription
metadata:
name: default-trigger1-3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3
namespace: default
spec:
channel:
apiVersion: messaging.knative.dev/v1beta1
kind: NatssChannel
name: default-kne-trigger
reply:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
namespace: default
subscriber:
uri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger1/3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3
- 2.3
watcheventing-controller
,解析subscription
和subcriber
的地址(replyUri
的地址)broker-ingress
- 2.4
根据eventing-controller
更新subcription
:Natss-Channel
和subsriberUrl
replyUrl
spec:
subscribers:
- generation: 1
replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger2/c0f3f1bb-9a25-4fb2-b803-fc5cd74e57da
uid: 9b4c991a-8912-4333-aa5a-caf053e5ee9c
- generation: 1
replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger3/28e7ba73-d514-4aea-a0de-7946fc21e7cc
uid: 56d00260-e69f-4d7a-bd05-c81479774a95
- generation: 1
replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger1/3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3
uid: 963fd671-097f-489a-a857-c1803ad3fb19
status:
address:
url: http://default-kne-trigger-kn-channel.default.svc.cluster.local
2.5
mt-broker-controller
根据 subscription 和 broker 的状态 更新 Trigger 的状态
控制面的逻辑就完了,和数据面的逻辑结合的地方在于:
broker-ingress
和
broker-filter
:
-
watchbroker-ingress
,从Broker
的Broker
中获取status
的地址(channel
的natss-ch-dispatch
的地址)svc
-
watchbroker-filter
,根据 trigger 中subscriber 地址,将event filter 后决定是否发送到 对应的 targetTrigger
至此,整个流程都讲完了,可以按照附录中的资源,创建一下,看看其中的资源。
附录
- broker.yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: default
namespace: default
- trigger1.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: trigger1
spec:
filter:
attributes:
type: foo
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: service1
- trigger2.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: trigger2
spec:
filter:
attributes:
type: foo
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: service2
- trigger3.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: trigger3
spec:
filter:
attributes:
type: bar
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: service3
- service1.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: service1
spec:
template:
spec:
containers:
- image: docker.io/zhaojizhuang66/event-display:v1
5.service3.yaml (service1和service3 一样都是打印接收到的 event)
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: service1
spec:
template:
spec:
containers:
- image: docker.io/zhaojizhuang66/event-display:v1
- service2.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: service2
spec:
template:
spec:
containers:
- image: docker.io/zhaojizhuang66/event-display-with-reply:v1
关注公众号: Knative,了解更多 Serverless 、Knative,云原生相关资讯
关注公众号,回复 "进群",即可进群与众多云原生 Serverless 技术大佬探讨技术,探讨人生。