天天看點

Spring Cloud Bus 消息總線介紹Bus 執行個體示範Bus 的實作總結

Spring Cloud Bus 消息總線介紹Bus 執行個體示範Bus 的實作總結

作者 | 洛夜

來源 |

阿裡巴巴雲原生公衆号

在 Spring 生态中玩轉 RocketMQ 系列文章:

本文配套可互動教程已登入阿裡雲知行動手實驗室,PC 端登入 start.aliyun.com 在浏覽器中立即體驗。

Spring Cloud Bus 消息總線介紹Bus 執行個體示範Bus 的實作總結

Spring Cloud Bus 對自己的定位是 Spring Cloud 體系内的消息總線,使用 message broker 來連接配接分布式系統的所有節點。Bus 官方的 Reference 文檔 比較簡單,簡單到連一張圖都沒有。

這是最新版的 Spring Cloud Bus 代碼結構(代碼量比較少):

Spring Cloud Bus 消息總線介紹Bus 執行個體示範Bus 的實作總結

Bus 執行個體示範

在分析 Bus 的實作之前,我們先來看兩個使用 Spring Cloud Bus 的簡單例子。

1. 所有節點的配置新增

Bus 的例子比較簡單,因為 Bus 的 AutoConfiguration 層都有了預設的配置,隻需要引入消息中間件對應的 Spring Cloud Stream 以及 Spring Cloud Bus 依賴即可,之後所有啟動的應用都會使用同一個 Topic 進行消息的接收和發送。

Bus 對應的 Demo 已經放到了 github 上, 該 Demo 會模拟啟動 5 個節點,隻需要對其中任意的一個執行個體新增配置項,所有節點都會新增該配置項。

Demo 位址:

https://github.com/fangjian0423/rocketmq-binder-demo/tree/master/rocketmq-bus-demo

通路任意節點提供的 Controller 提供的擷取配置的位址(key 為hangzhou):

curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'           

所有節點傳回的結果都是 unknown,因為所有節點的配置中沒有hangzhou這個 key。

Bus 内部提供了EnvironmentBusEndpoint這個 Endpoint 通過 message broker 用來新增/更新配置。

通路任意節點該 Endpoint 對應的 url: /actuator/bus-env?name=hangzhou&value=alibaba 進行配置項的新增(比如通路 node1 的url):

curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'           

然後再次通路所有節點/bus/env擷取配置:

$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
alibaba%           

可以看到,所有節點都新增了一個 key 為hangzhou的配置,且對應的 value 是alibaba。這個配置項是通過 Bus 提供的 EnvironmentBusEndpoint 完成的。

這裡引用 程式猿DD 畫的一張圖檔,Spring Cloud Config 配合 Bus 完成所有節點配置的重新整理來描述之前的執行個體(本文執行個體不是重新整理,而是新增配置,但是流程是一樣的):

Spring Cloud Bus 消息總線介紹Bus 執行個體示範Bus 的實作總結

2. 部分節點的配置修改

比如在 node1 上指定 destination 為 rocketmq-bus-node2 ( node2 配置了 spring.cloud.bus.id 為rocketmq-bus-node2:10002,可以比對上) 進行配置的修改:

curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'           

通路/bus/env 擷取配置(由于在 node1 上發送消息,Bus 也會對發送方的節點 node1 進行配置修改):

~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
xihu%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
xihu%           

可以看到,隻有 node1 和 node2 修改了配置,其餘的 3 個節點配置未改變。

Bus 的實作

1. Bus 概念介紹

1)事件

Bus 中定義了遠端事件RemoteApplicationEvent,該事件繼承了 Spring 的事件ApplicationEvent,而且它目前有 4 個具體的實作:

Spring Cloud Bus 消息總線介紹Bus 執行個體示範Bus 的實作總結
  • EnvironmentChangeRemoteApplicationEvent:遠端環境變更事件。主要用于接收一個 Map<String,String> 類型的資料并更新到 Spring 上下文中 Environment 中的事件。文中的執行個體就是使用這個事件并配合 EnvironmentBusEndpoint 和 EnvironmentChangeListener 完成的。
  • AckRemoteApplicationEvent:遠端确認事件。Bus 内部成功接收到遠端事件後會發送回AckRemoteApplicationEvent确認事件進行确認。
  • RefreshRemoteApplicationEvent: 遠端配置重新整理事件。配合 @RefreshScope 以及所有的 @ConfigurationProperties注解修飾的配置類的動态重新整理。
  • UnknownRemoteApplicationEvent:遠端未知事件。Bus 内部消息體進行轉換遠端事件的時候如果發生異常會統一包裝成該事件。

Bus 内部還存在一個非RemoteApplicationEvent事件 -SentApplicationEvent消息發送事件,配合 Trace 進行遠端消息發送的記錄。

這些事件會配合ApplicationListener進行操作,比如EnvironmentChangeRemoteApplicationEvent配了EnvironmentChangeListener進行配置的新增/修改:

public class EnvironmentChangeListener
        implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {
    private static Log log = LogFactory.getLog(EnvironmentChangeListener.class);
    @Autowired
    private EnvironmentManager env;
    @Override
    public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
        Map<String, String> values = event.getValues();
        log.info("Received remote environment change request. Keys/values to update "
                + values);
        for (Map.Entry<String, String> entry : values.entrySet()) {
            env.setProperty(entry.getKey(), entry.getValue());
        }
    }
}           

收到其它節點發送來EnvironmentChangeRemoteApplicationEven事件之後調用EnvironmentManager#setProperty進行配置的設定,該方法内部針對每一個配置項都會發送一個EnvironmentChangeEvent事件,然後被ConfigurationPropertiesRebinder所監聽,進行 rebind 操作新增/更新配置。

2)Actuator Endpoint

Bus 内部暴露了 2 個 Endpoint,分别是EnvironmentBusEndpoint和RefreshBusEndpoint,進行配置的新增/修改以及全局配置重新整理。它們對應的 Endpoint id 即 url 是 bus-env和bus-refresh。

3)配置

Bus 對于消息的發送必定涉及到 Topic、Group 之類的資訊,這些内容都被封裝到了BusProperties中,其預設的配置字首為spring.cloud.bus,比如:

  • spring.cloud.bus.refresh.enabled用于開啟/關閉全局重新整理的 Listener。
  • spring.cloud.bus.env.enabled 用于開啟/關閉配置新增/修改的 Endpoint。
  • spring.cloud.bus.ack.enabled 用于開啟開啟/關閉AckRemoteApplicationEvent事件的發送。
  • spring.cloud.bus.trace.enabled 用于開啟/關閉息記錄 Trace 的 Listener。

消息發送涉及到的 Topic 預設用的是springCloudBus,可以配置進行修改,Group 可以設定成廣播模式或使用 UUID 配合 offset 為 lastest 的模式。

每個 Bus 應用都有一個對應的 Bus id,官方取值方式較複雜:

${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.application.index:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}

建議手動配置 Bus id,因為 Bus 遠端事件中的 destination 會根據 Bus id 進行比對:

spring.cloud.bus.id=${spring.application.name}-${server.port}           

2. Bus 底層分析

Bus 的底層分析無非牽扯到這幾個方面:

  • 消息是如何發送的
  • 消息是如何接收的
  • destination 是如何比對的
  • 遠端事件收到後如何觸發下一個 action

BusAutoConfiguration自動化配置類被@EnableBinding(SpringCloudBusClient.class)所修飾。

@EnableBinding的用法在文章

中已經說明,且它的 value 為SpringCloudBusClient.class,會在SpringCloudBusClient中基于代理建立出 input 和 output 的DirectChannel:

public interface SpringCloudBusClient {
    String INPUT = "springCloudBusInput";
    String OUTPUT = "springCloudBusOutput";
    @Output(SpringCloudBusClient.OUTPUT)
    MessageChannel springCloudBusOutput();
    @Input(SpringCloudBusClient.INPUT)
    SubscribableChannel springCloudBusInput();
}           

springCloudBusInput 和 springCloudBusOutput 這兩個 Binding 的屬性可以通過配置檔案進行修改(比如修改 topic):

spring.cloud.stream.bindings:
  springCloudBusInput:
    destination: my-bus-topic
  springCloudBusOutput:
    destination: my-bus-topic           

消息的接收和發送:

// BusAutoConfiguration
@EventListener(classes = RemoteApplicationEvent.class) // 1
public void acceptLocal(RemoteApplicationEvent event) {
    if (this.serviceMatcher.isFromSelf(event)
            && !(event instanceof AckRemoteApplicationEvent)) { // 2
        this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3
    }
}
@StreamListener(SpringCloudBusClient.INPUT) // 4
public void acceptRemote(RemoteApplicationEvent event) {
    if (event instanceof AckRemoteApplicationEvent) {
        if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
                && this.applicationEventPublisher != null) { // 5
            this.applicationEventPublisher.publishEvent(event);
        }
        // If it's an ACK we are finished processing at this point
        return;
    }
    if (this.serviceMatcher.isForSelf(event)
            && this.applicationEventPublisher != null) { // 6
        if (!this.serviceMatcher.isFromSelf(event)) { // 7
            this.applicationEventPublisher.publishEvent(event);
        }
        if (this.bus.getAck().isEnabled()) { // 8
            AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
                    this.serviceMatcher.getServiceId(),
                    this.bus.getAck().getDestinationService(),
                    event.getDestinationService(), event.getId(), event.getClass());
            this.cloudBusOutboundChannel
                    .send(MessageBuilder.withPayload(ack).build());
            this.applicationEventPublisher.publishEvent(ack);
        }
    }
    if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // 9
        // We are set to register sent events so publish it for local consumption,
        // irrespective of the origin
        this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
                event.getOriginService(), event.getDestinationService(),
                event.getId(), event.getClass()));
    }
}           
  1. 利用 Spring 事件的監聽機制監聽本地所有的RemoteApplicationEvent遠端事件(比如bus-env會在本地發送EnvironmentChangeRemoteApplicationEvent事件,bus-refresh會在本地發送RefreshRemoteApplicationEvent事件,這些事件在這裡都會被監聽到)。
  2. 判斷本地接收到的事件不是AckRemoteApplicationEvent遠端确認事件(不然會死循環,一直接收消息,發送消息...)以及該事件是應用自身發送出去的(事件發送方是應用自身),如果都滿足執行步驟 3。
  3. 構造 Message 并将該遠端事件作為 payload,然後使用 Spring Cloud Stream 構造的 Binding name 為 springCloudBusOutput 的 MessageChannel 将消息發送到 broker。

4.@StreamListener注解消費 Spring Cloud Stream 構造的 Binding name 為 springCloudBusInput 的 MessageChannel,接收的消息為遠端消息。

  1. 如果該遠端事件是AckRemoteApplicationEvent遠端确認事件并且應用開啟了消息追蹤 trace 開關,同時該遠端事件不是應用自身發送的(事件發送方不是應用自身,表示事件是其它應用發送過來的),那麼本地發送AckRemoteApplicationEvent遠端确認事件表示應用确認收到了其它應用發送過來的遠端事件,流程結束。
  2. 如果該遠端事件是其它應用發送給應用自身的(事件的接收方是應用自身),那麼進行步驟 7 和 8,否則執行步驟 9。
  3. 該遠端事件不是應用自身發送(事件發送方不是應用自身)的話,将該事件以本地的方式發送出去。應用自身一開始已經在本地被對應的消息接收方處理了,無需再次發送。
  4. 如果開啟了AckRemoteApplicationEvent遠端确認事件的開關,構造AckRemoteApplicationEvent事件并在遠端和本地都發送該事件(本地發送是因為步驟 5 沒有進行本地AckRemoteApplicationEvent事件的發送,也就是自身應用對自身應用确認; 遠端發送是為了告訴其它應用,自身應用收到了消息)。
  5. 如果開啟了消息記錄 Trace 的開關,本地構造并發送SentApplicationEvent事件。
Spring Cloud Bus 消息總線介紹Bus 執行個體示範Bus 的實作總結

bus-env觸發後所有節點的EnvironmentChangeListener監聽到了配置的變化,控制台都會列印出以下資訊:

o.s.c.b.event.EnvironmentChangeListener  : Received remote environment change request. Keys/values to update {hangzhou=alibaba}           

如果在本地監聽遠端确認事件 AckRemoteApplicationEvent,都會收到所有節點的資訊,比如 node5 節點的控制台監聽到的 AckRemoteApplicationEvent事件如下:

ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670484,"originService":"rocketmq-bus-node5:10005","destinationService":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-node1:10001","destinationService":"**","id":"91f06cf1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670402,"originService":"rocketmq-bus-node2:10002","destinationService":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationService":"**","id":"728b45ee-5e26-46c2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationService":"**","id":"1812fd6d-6f98-4e5b-a38a-4b11aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}           

那麼回到本章節開頭提到的 4 個問題,我們分别做一下解答:

  • 消息是如何發送的: 在BusAutoConfiguration#acceptLocal方法中通過 Spring Cloud Stream 發送事件到springCloudBustopic 中。
  • 消息是如何接收的: 在BusAutoConfiguration#acceptRemote方法中通過 Spring Cloud Stream 接收springCloudBustopic 的消息。
  • destination 是如何比對的: 在BusAutoConfiguration#acceptRemote方法中接收遠端事件方法裡對 destination 進行比對。
  • 遠端事件收到後如何觸發下一個 action: Bus 内部通過 Spring 的事件機制接收本地的RemoteApplicationEvent具體的實作事件再做下一步的動作(比如EnvironmentChangeListener接收了EnvironmentChangeRemoteApplicationEvent事件,RefreshListener接收了RefreshRemoteApplicationEvent事件)。

總結

Spring Cloud Bus 自身内容還是比較少的,不過還是需要提前了解 Spring Cloud Stream 體系以及 Spring 自身的事件機制,在此基礎上,才能更好地了解 Spring Cloud Bus 對本地事件和遠端事件的處理邏輯。

目前 Bus 内置的遠端事件較少,大多數為配置相關的事件,我們可以繼承RemoteApplicationEvent并配合@RemoteApplicationEventScan注解建構自身的微服務消息體系。

作者簡介

方劍(花名:洛夜),GitHub ID @fangjian0423,開源愛好者,阿裡巴巴進階開發工程師,阿裡雲産品 EDAS 開發,Spring Cloud Alibaba 開源項目負責人之一。