天天看點

方法三:Spring Cloud Bus 消息總線介紹

本期我們來了解下 Spring Cloud 體系中的另外一個元件 Spring Cloud Bus (建議先熟悉 Spring Cloud Stream,不然無法了解 Spring Cloud Bus 内部的代碼)。

Spring Cloud Bus 對自己的定位是 Spring Cloud 體系内的消息總線,使用 message broker 來連接配接分布式系統的所有節點。Bus 官方的 Reference 文檔 比較簡單,簡單到連一張圖都沒有。

這是最新版的 Spring Cloud Bus 代碼結構(代碼量比較少):

方法三:Spring Cloud Bus 消息總線介紹

Bus 執行個體示範

在分析 Bus 的實作之前,我們先來看兩個使用 Spring Cloud Bus 的簡單例子。

所有節點的配置新增

Bus 的例子比較簡單,因為 Bus 的 AutoConfiguration 層都有了預設的配置,隻需要引入消息中間件對應的 Spring Cloud Stream 以及 Spring Cloud Bus 依賴即可,之後所有啟動的應用都會使用同一個 Topic 進行消息的接收和發送。

Bus 對應的 Demo 已經放到了 github 上(位址:

https://github.com/fangjian0423/rocketmq-binder-demo/tree/master/rocketmq-bus-demo

), 該 Demo 會模拟啟動 5 個節點,隻需要對其中任意的一個執行個體新增配置項,所有節點都會新增該配置項。

通路任意節點提供的 Controller 提供的擷取配置的位址(key為

hangzhou

):

curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'      

所有節點傳回的結果都是 unknown,因為所有節點的配置中沒有

hangzhou

這個 key。

Bus 内部提供了

EnvironmentBusEndpoint

這個 Endpoint 通過 message broker 用來新增/更新配置。

通路任意節點該 Endpoint 對應的 url: /actuator/bus-env?name=hangzhou&value=alibaba 進行配置項的新增(比如通路 node1 的url):

curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'      

然後再次通路所有節點

/bus/env

擷取配置:

$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
alibaba%      

可以看到,所有節點都新增了一個 key 為

hangzhou

的配置,且對應的 value 是

alibaba

。這個配置項是通過 Bus 提供的

EnvironmentBusEndpoint

完成的。

這裡引用 程式猿DD 畫的一張圖檔,Spring Cloud Config 配合 Bus 完成所有節點配置的重新整理來描述之前的執行個體(本文執行個體不是重新整理,而是新增配置,但是流程是一樣的):

方法三:Spring Cloud Bus 消息總線介紹

部分節點的配置修改

比如在 node1 上指定 destination 為 rocketmq-bus-node2 ( node2 配置了 spring.cloud.bus.id 為

rocketmq-bus-node2:10002

,可以比對上) 進行配置的修改:

curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'      

通路

/bus/env

擷取配置(由于在 node1 上發送消息,Bus 也會對發送方的節點 node1 進行配置修改):

~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
xihu%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
xihu%      

可以看到,隻有 node1 和 node2 修改了配置,其餘的 3 個節點配置未改變。

Bus 的實作

Bus 概念介紹

事件

Bus 中定義了遠端事件

RemoteApplicationEvent

,該事件繼承了 Spring 的事件

ApplicationEvent

,而且它目前有 4 個具體的實作:

方法三:Spring Cloud Bus 消息總線介紹
  • EnvironmentChangeRemoteApplicationEvent: 遠端環境變更事件。主要用于接收一個

    Map

    類型的資料并更新到 Spring 上下文中

    Environment

    中的事件。文中的執行個體就是使用這個事件并配合

    EnvironmentBusEndpoint

    EnvironmentChangeListener

  • AckRemoteApplicationEvent: 遠端确認事件。Bus 内部成功接收到遠端事件後會發送回

    AckRemoteApplicationEvent

    确認事件進行确認。
  • RefreshRemoteApplicationEvent: 遠端配置重新整理事件。配合

    @RefreshScope

    以及所有的

    @ConfigurationProperties

    注解修飾的配置類的動态重新整理。
  • UnknownRemoteApplicationEvent:遠端未知事件。Bus 内部消息體進行轉換遠端事件的時候如果發生異常會統一包裝成該事件。

Bus 内部還存在一個非

RemoteApplicationEvent

事件 -

SentApplicationEvent

消息發送事件,配合 Trace 進行遠端消息發送的記錄。

這些事件會配合

ApplicationListener

進行操作,比如

EnvironmentChangeRemoteApplicationEvent

配了

EnvironmentChangeListener

進行配置的新增/修改:

public class EnvironmentChangeListener
        implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {
    private static Log log = LogFactory.getLog(EnvironmentChangeListener.class);
    @Autowired
    private EnvironmentManager env;
    @Override
    public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
        Map<String, String> values = event.getValues();
        log.info("Received remote environment change request. Keys/values to update "
                + values);
        for (Map.Entry<String, String> entry : values.entrySet()) {
            env.setProperty(entry.getKey(), entry.getValue());
        }
    }
}      

收到其它節點發送來 EnvironmentChangeRemoteApplicationEvent 事件之後調用

EnvironmentManager#setProperty

進行配置的設定,該方法内部針對每一個配置項都會發送一個

EnvironmentChangeEvent

事件,然後被

ConfigurationPropertiesRebinder

所監聽,進行 rebind 操作新增/更新配置。

Actuator Endpoint

Bus 内部暴露了 2 個 Endpoint,分别是

EnvironmentBusEndpoint

RefreshBusEndpoint

,進行配置的新增/修改以及全局配置重新整理。它們對應的 Endpoint id 即 url 是

bus-env

bus-refresh

配置

Bus 對于消息的發送必定涉及到 Topic、Group 之類的資訊,這些内容都被封裝到了

BusProperties

中,其預設的配置字首為

spring.cloud.bus

,比如:

  • spring.cloud.bus.refresh.enabled

    用于開啟/關閉全局重新整理的 Listener。
  • spring.cloud.bus.env.enabled

    用于開啟/關閉配置新增/修改的 Endpoint。
  • spring.cloud.bus.ack.enabled

    用于開啟開啟/關閉

    AckRemoteApplicationEvent

    事件的發送。
  • spring.cloud.bus.trace.enabled

    用于開啟/關閉息記錄 Trace 的 Listener。

消息發送涉及到的 Topic 預設用的是

springCloudBus

,可以配置進行修改,Group 可以設定成廣播模式或使用 UUID 配合 offset 為 lastest 的模式。

每個 Bus 應用都有一個對應的 Bus id,官方取值方式較複雜:

${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.application.index:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}

建議手動配置 Bus id,因為 Bus 遠端事件中的 destination 會根據 Bus id 進行比對:

spring.cloud.bus.id=${spring.application.name}-${server.port}      

Bus 底層分析

Bus 的底層分析無非牽扯到這幾個方面:

  • 消息是如何發送的;
  • 消息是如何接收的;
  • destination 是如何比對的;
  • 遠端事件收到後如何觸發下一個 action;

BusAutoConfiguration

自動化配置類被 @EnableBinding(SpringCloudBusClient.class)所修飾。

@EnableBinding

的用法在上期文章

《幹貨|Spring Cloud Stream 體系及原理介紹

》 中已經說明,且它的 value 為

SpringCloudBusClient.class

,會在

SpringCloudBusClient

中基于代理建立出 input 和 output 的

DirectChannel

public interface SpringCloudBusClient {
    String INPUT = "springCloudBusInput";
    String OUTPUT = "springCloudBusOutput";
    @Output(SpringCloudBusClient.OUTPUT)
    MessageChannel springCloudBusOutput();
    @Input(SpringCloudBusClient.INPUT)
    SubscribableChannel springCloudBusInput();
}      

springCloudBusInput 和 springCloudBusOutput 這兩個 Binding 的屬性可以通過配置檔案進行修改(比如修改 topic):

spring.cloud.stream.bindings:
  springCloudBusInput:
    destination: my-bus-topic
  springCloudBusOutput:
    destination: my-bus-topic      

消息的接收的發送:

// BusAutoConfiguration
@EventListener(classes = RemoteApplicationEvent.class) // 1
public void acceptLocal(RemoteApplicationEvent event) {
    if (this.serviceMatcher.isFromSelf(event)
            && !(event instanceof AckRemoteApplicationEvent)) { // 2
        this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3
    }
}
@StreamListener(SpringCloudBusClient.INPUT) // 4
public void acceptRemote(RemoteApplicationEvent event) {
    if (event instanceof AckRemoteApplicationEvent) {
        if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
                && this.applicationEventPublisher != null) { // 5
            this.applicationEventPublisher.publishEvent(event);
        }
        // If it's an ACK we are finished processing at this point
        return;
    }
    if (this.serviceMatcher.isForSelf(event)
            && this.applicationEventPublisher != null) { // 6
        if (!this.serviceMatcher.isFromSelf(event)) { // 7
            this.applicationEventPublisher.publishEvent(event);
        }
        if (this.bus.getAck().isEnabled()) { // 8
            AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
                    this.serviceMatcher.getServiceId(),
                    this.bus.getAck().getDestinationService(),
                    event.getDestinationService(), event.getId(), event.getClass());
            this.cloudBusOutboundChannel
                    .send(MessageBuilder.withPayload(ack).build());
            this.applicationEventPublisher.publishEvent(ack);
        }
    }
    if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // 9
        // We are set to register sent events so publish it for local consumption,
        // irrespective of the origin
        this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
                event.getOriginService(), event.getDestinationService(),
                event.getId(), event.getClass()));
    }
}      

1. 利用 Spring 事件的監聽機制監聽本地所有的

RemoteApplicationEvent

遠端事件(比如

bus-env

會在本地發送 EnvironmentChangeRemoteApplicationEvent事件,

bus-refresh

會在本地發送

RefreshRemoteApplicationEvent

事件,這些事件在這裡都會被監聽到)。

2. 判斷本地接收到的事件不是

AckRemoteApplicationEvent

遠端确認事件(不然會死循環,一直接收消息,發送消息...)以及該事件是應用自身發送出去的(事件發送方是應用自身),如果都滿足執行步驟 3。

3. 構造 Message 并将該遠端事件作為 payload,然後使用 Spring Cloud Stream 構造的 Binding name 為 springCloudBusOutput 的 MessageChannel 将消息發送到 broker。

4.

@StreamListener

注解消費 Spring Cloud Stream 構造的 Binding name 為 springCloudBusInput 的 MessageChannel,接收的消息為遠端消息。

5. 如果該遠端事件是

AckRemoteApplicationEvent

遠端确認事件并且應用開啟了消息追蹤 trace 開關,同時該遠端事件不是應用自身發送的(事件發送方不是應用自身,表示事件是其它應用發送過來的),那麼本地發送

AckRemoteApplicationEvent

遠端确認事件表示應用确認收到了其它應用發送過來的遠端事件,流程結束。

6. 如果該遠端事件是其它應用發送給應用自身的(事件的接收方是應用自身),那麼進行步驟 7 和 8,否則執行步驟 9。

7. 該遠端事件不是應用自身發送(事件發送方不是應用自身)的話,将該事件以本地的方式發送出去。應用自身一開始已經在本地被對應的消息接收方處理了,無需再次發送。

8. 如果開啟了

AckRemoteApplicationEvent

遠端确認事件的開關,構造

AckRemoteApplicationEvent

事件并在遠端和本地都發送該事件(本地發送是因為步驟 5 沒有進行本地

AckRemoteApplicationEvent

事件的發送,也就是自身應用對自身應用确認; 遠端發送是為了告訴其它應用,自身應用收到了消息)。

9. 如果開啟了消息記錄 Trace 的開關,本地構造并發送

SentApplicationEvent

事件。

方法三:Spring Cloud Bus 消息總線介紹

bus-env

觸發後所有節點的

EnvironmentChangeListener

監聽到了配置的變化,控制台都會列印出以下資訊:

o.s.c.b.event.EnvironmentChangeListener  : Received remote environment change request. Keys/values to update {hangzhou=alibaba}      

如果在本地監聽遠端确認事件

AckRemoteApplicationEvent

,都會收到所有節點的資訊,比如 node5 節點的控制台監聽到的

AckRemoteApplicationEvent

事件如下:

ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670484,"originService":"rocketmq-bus-node5:10005","destinationService":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-node1:10001","destinationService":"**","id":"91f06cf1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670402,"originService":"rocketmq-bus-node2:10002","destinationService":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationService":"**","id":"728b45ee-5e26-46c2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationService":"**","id":"1812fd6d-6f98-4e5b-a38a-4b11aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}      

那麼回到本章節開頭提到的4個問題,我們分别做一下解答:

  • 消息是如何發送的: 在

    BusAutoConfiguration#acceptLocal

    方法中通過 Spring Cloud Stream 發送事件到

    springCloudBus

    topic 中。
  • 消息是如何接收的: 在 BusAutoConfiguration#acceptRemote 方法中通過 Spring Cloud Stream 接收

    springCloudBus

    topic 的消息。
  • destination 是如何比對的: 在 BusAutoConfiguration#acceptRemote 方法中接收遠端事件方法裡對 destination 進行比對。
  • 遠端事件收到後如何觸發下一個 action: Bus 内部通過 Spring 的事件機制接收本地的

    RemoteApplicationEvent

    具體的實作事件再做下一步的動作(比如

    EnvironmentChangeListener

    接收了 EnvironmentChangeRemoteApplicationEvent事件,

    RefreshListener

    接收了

    RefreshRemoteApplicationEvent

    事件)。

總結

Spring Cloud Bus 自身内容還是比較少的,不過還是需要提前了解 Spring Cloud Stream 體系以及 Spring 自身的事件機制,在此基礎上,才能更好地了解 Spring Cloud Bus 對本地事件和遠端事件的處理邏輯。

目前 Bus 内置的遠端事件較少,大多數為配置相關的事件,我們可以繼承

RemoteApplicationEvent

并配合

@RemoteApplicationEventScan

注解建構自身的微服務消息體系。

歡迎加入Spring Cloud Alibaba 開源社群,和我們一起交流與探索 Spring Cloud Alibaba 的實踐以及發展路徑:

👪 Spring Cloud Alibaba 開源釘釘群

方法三:Spring Cloud Bus 消息總線介紹

本文作者:

方劍(花名:洛夜)

GitHub ID @fangjian0423,開源愛好者,阿裡巴巴進階開發工程師,阿裡雲産品 EDAS 開發,Spring Cloud Alibaba 開源項目負責人之一。