天天看点

【图解 Knative】剖析 Eventing Broker-Trigger 实现原理1. 背景介绍

1. 背景介绍

我们都知道,

Knative

有两个主要的子项目:

Serving

Eventing

。其中 关于

Serviing

可以查看之前的一篇公众号文章 【超详细】深入探究 Knative 扩缩容的奥秘。

Eventing

将系统中的服务以事件驱动的方式松耦合的绑定在一起:即事件发送者不关注谁来消费事件,事件消费者也不关注事件是由谁产生的。

Eventing

中有多种事件组合的方式,比如:

  • 最简单的

    Source

    ->

    Service

    直接绑定
  • 通过

    channel

    subscriptions

    订阅的方式
  • 并行事件流处理 的方式

    Parallel

    ,以及串行事件流处理的方式

    Sequence

  • 以及支持事件过滤的

    Broker & Trigger

    也是本文将要重点介绍的模式
✅注:对于其他几种事件组合的方式,感兴趣的可以在官网查阅 https://knative.dev/docs/eventing/

2. 工作原理介绍

【图解 Knative】剖析 Eventing Broker-Trigger 实现原理1. 背景介绍

上图展示的是

Broker & Trigger

模型的中

Event

传递的基本方式,其中事件传递的格式是标准的

CLoudEvent

的格式。简要介绍下图中的流程:

  1. 事件由事件源

    Source

    产生, Knative 支持多种事件源,如

    GitHub

    Heartbeats

    k8s

    ContainerSource

    等,更多

    Source

    可在官网查阅 https://knative.dev/docs/eventing/sources/,也可以自定义

    Source

    ,参考之前发的一篇公众号为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源。
  2. 图中 事件源产生

    type

    foo

    的事件,发送到

    Broker

    ,其中 有三个

    Trigger

    绑定了

    Broker

    ,两个

    Trigger

    filter

    type:foo

    ,也就是会关注

    type=foo

    的事件,然后发送给对应的消费者。
  3. Service1

    Service3

    只是单纯的消费,并不回复事件
  4. Service2

    收到事件后回复

    type=bar

    的事件,事件重新传递到 Broker上,此时只有一个 Trigger 过滤了

    type:bar

    事件被传送到

    Service3

    消费。
  5. 其中消费者是否回复事件是可选的。
关于图中的实例 yaml,可参考

附录

章节

3. 底层实现原理

上面介绍了

Broker & Trigger

的工作原理,现在从底层实现的角度进一步讲解。

Knative Eventing 事件传递过程中依赖消息通道 Channel,为了便于持久化,生产环境使用可持久化的消息通过( 本文以 NatsStreaming 为例) 来做事件的消息通道,如果是开发调试,则直接用 InMemoryChannel 即可(只会保存在内存中,不会持久化)

先上图

【图解 Knative】剖析 Eventing Broker-Trigger 实现原理1. 背景介绍
  • 图中包含控制平面与数据平面,图中箭头:控制平面为实线,数据平面为虚线
  • 实线的方框为 Knative 组件,虚线的方框为 k8s CR 资源

下面分别从数据平面和控制平面分别讲解

3.1 数据平面

EventSource------> Broker------>Trigger------->SubScriber

1. EventSource------> Broker

Broker

可以手动生成,或者通过给

namespace

打 label

eventing.knative.dev/injection:true

,让

sugar-controller

会自动生成对应的

Broker

实例

EventSource

一般通过

SinkURI

环境变量将

Broker

的 地址传入 。比如,

Broker

的地址为

http://broker-ingress.knative-eventing.svc.cluster.local/default/default

,该地址为

broker-ingress

的地址,而

broker-ingress

通过 请求

URL

path

可以得到

Broker

的信息,比如

/default/default

表示

default namespace

下名为

default

Broker

# kubectl get broker
NAME      URL                                                                        AGE   READY   REASON
default   http://broker-ingress.knative-eventing.svc.cluster.local/default/default   46h   True
           

2.Broker------>Trigger

broker-ingress

****得到

Broker

的信息(name namespace)之后,可以得到

Broker

status

信息,如下得到

channelAddress

的地址

(http://default-kne-trigger-kn-channel.default.svc.cluster.local)

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: default
  namespace: default
status:
  address:
    url: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
  annotations:
    knative.dev/channelAPIVersion: messaging.knative.dev/v1beta1
    knative.dev/channelAddress: http://default-kne-trigger-kn-channel.default.svc.cluster.local
    knative.dev/channelKind: NatssChannel
    knative.dev/channelName: default-kne-trigger
           

channelAddress

的地址是个svc 的地址,通过

externalName

指向

natss-ch-dispatcher

# kubectl get svc default-kne-trigger-kn-channel
NAME                             TYPE           CLUSTER-IP   EXTERNAL-IP                                              PORT(S)   AGE
default-kne-trigger-kn-channel   ExternalName   <none>       natss-ch-dispatcher.knative-eventing.svc.cluster.local   <none>    46h
           

natss-ch-dispatcher

负责将 消息(主题为

channel.Name + "." + channel.Namespace

) 发布到

Natss-Streaming

组件

3. Trigger------->SubScriber

natss-ch-dispatcher

****不仅负责发布,还负责订阅消息,

natss-ch-dispatcher

****watch

natssChannel

(见下面的

NatssChannel

), 获取

natssChannel

subscriber

的地址

subscriberUri

,通过

subscriberUri

发送消息给

broker-filter

,跟

broker-ingress

一样,

subscriberUri

的地址是

broker-filter

的 地址,通过请求

path

区分哪个

trigger

,请求 path :

/triggers/<trigger namespace>/<trigger name>/<trigger UID>

apiVersion: messaging.knative.dev/v1beta1
kind: NatssChannel
metadata:
  name: default-kne-trigger
  namespace: default
spec:
  subscribers:
  - generation: 1
    replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
    subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger2/c0f3f1bb-9a25-4fb2-b803-fc5cd74e57da
    uid: 9b4c991a-8912-4333-aa5a-caf053e5ee9c
  - generation: 1
    replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
    subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger3/28e7ba73-d514-4aea-a0de-7946fc21e7cc
    uid: 56d00260-e69f-4d7a-bd05-c81479774a95
  - generation: 1
    replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
    subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger1/3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3
    uid: 963fd671-097f-489a-a857-c1803ad3fb19
status:
  address:
    url: http://default-kne-trigger-kn-channel.default.svc.cluster.local
           

broker-filter

获取到

Trigger

的信息后,通过 根据 Trigger 的filter 将消息过滤,再决定是否将消息发给 对应的

subscriber

,

subscriber

可以从

Trigger

status

subcriberUri

获取到,见下图,对于

subscriber

Reply 的消息,

broker-filter

发送到

replyUri

地址上,

http://broker-ingress.knative-eventing.svc.cluster.local/default/default

,也就是发送给 Broker (实际是

broker-ingress

)。

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: trigger1
  namespace: default
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.sources.ping
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: service1
      namespace: default
status:
  subscriberUri: http://service1.default.svc.cluster.local
           

至此数据面的通信流就完成了,接下来看控制面的数据流。

3.2 控制平面

控制平面主要看上图中的实线部分,此处再贴一下图
【图解 Knative】剖析 Eventing Broker-Trigger 实现原理1. 背景介绍

接下来按流程讲解

  • 1.1

    mt-broker-controller

    watch

    Broker

    的创建
  • 1.2

    mt-broker-controller

    根据

    Broker

    的配置,创建对应的

    Channel

    ,此处为

    NatssChannel

  • 1.3

    Natss-ch-controller

    watch

    NatssChannel

    的创建,更新

    Natss-Streaming

    的服务端状态到

    NatssChannel

    的 status
  • 1.4

    Natss-ch-controller

    创建 svc,externalname指向

    natss-ch-dispatcher

  • 1.5

    mt-broker-controller

    watch

    NatssChannel

    的status
  • 1.6

    mt-broker-controller

    更新

    Broker 的status,

    其中包含:

    Broker

    address

    broker-ingress

    的地址),

    channel

    address

    ,供

    broker-ingress

    使用
  • 2.1

    mt-broker-controller

    watch

    Trigger

  • 2.2

    mt-broker-controller

    根据

    Trigger(含 subscriber 的信息)

    创建

    subscription,

    其中包含

    subsciber(broker-filter)

    的地址

    和 Broker

    的信息
apiVersion: messaging.knative.dev/v1
kind: Subscription
metadata:
  name: default-trigger1-3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3
  namespace: default
spec:
  channel:
    apiVersion: messaging.knative.dev/v1beta1
    kind: NatssChannel
    name: default-kne-trigger
  reply:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
      namespace: default
  subscriber:
    uri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger1/3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3
           
  • 2.3

    eventing-controller

    watch

    subscription

    ,解析

    subcriber

    replyUri

    的地址(

    broker-ingress

    的地址)
  • 2.4

    eventing-controller

    根据

    subcription

    更新

    Natss-Channel

    :

    subsriberUrl

    replyUrl

spec:
  subscribers:
  - generation: 1
    replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
    subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger2/c0f3f1bb-9a25-4fb2-b803-fc5cd74e57da
    uid: 9b4c991a-8912-4333-aa5a-caf053e5ee9c
  - generation: 1
    replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
    subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger3/28e7ba73-d514-4aea-a0de-7946fc21e7cc
    uid: 56d00260-e69f-4d7a-bd05-c81479774a95
  - generation: 1
    replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default
    subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/trigger1/3f6e7a8e-f7d1-4c8f-a25a-1970b6ab73e3
    uid: 963fd671-097f-489a-a857-c1803ad3fb19
status:
  address:
    url: http://default-kne-trigger-kn-channel.default.svc.cluster.local
           

2.5

mt-broker-controller

根据 subscription 和 broker 的状态 更新 Trigger 的状态

控制面的逻辑就完了,和数据面的逻辑结合的地方在于:

broker-ingress

broker-filter

  1. broker-ingress

    watch

    Broker

    ,从

    Broker

    status

    中获取

    channel

    的地址(

    natss-ch-dispatch

    svc

    的地址)
  2. broker-filter

    watch

    Trigger

    ,根据 trigger 中subscriber 地址,将event filter 后决定是否发送到 对应的 target

至此,整个流程都讲完了,可以按照附录中的资源,创建一下,看看其中的资源。

附录

  1. broker.yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: default
  namespace: default
           
  1. trigger1.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: trigger1
spec:
  filter:
    attributes:
      type: foo
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: service1
           
  1. trigger2.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: trigger2
spec:
  filter:
    attributes:
      type: foo
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: service2
           
  1. trigger3.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: trigger3
spec:
  filter:
    attributes:
      type: bar
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: service3
           
  1. service1.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: service1
spec:
  template:
    spec:
      containers:
        - image: docker.io/zhaojizhuang66/event-display:v1
           

5.service3.yaml (service1和service3 一样都是打印接收到的 event)

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: service1
spec:
  template:
    spec:
      containers:
        - image: docker.io/zhaojizhuang66/event-display:v1
           
  1. service2.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: service2
spec:
  template:
    spec:
      containers:
        - image: docker.io/zhaojizhuang66/event-display-with-reply:v1
           

关注公众号: Knative,了解更多 Serverless 、Knative,云原生相关资讯

关注公众号,回复 "进群",即可进群与众多云原生 Serverless 技术大佬探讨技术,探讨人生。
【图解 Knative】剖析 Eventing Broker-Trigger 实现原理1. 背景介绍

继续阅读