天天看點

spring cloud bus的使用及使用bus釋出自定義事件1. spring cloud bus介紹2. 建構項目3. 端點4. 事件追蹤5. 源碼分析6. 通過bus實作自定義事件發送

1. spring cloud bus介紹

Spring cloud bus使用輕量級消息代理将分布式系統的節點連接配接起來,可以使用此代理,廣播狀态更改(例如配置更改)或其他管理指令。它可以用作應用程式之間的通信通道。該項目提供了兩種消息傳輸處理:AMQP broker 和Kafka 2. 如果你想使用activemq或其他的消息中間件作為消息傳輸,那麼需要實作spring cloud stream消息驅動的綁定(spring cloud bus其實也是使用stream實作的rabbitmq和kafka)

2. 建構項目

我們使用rabbitmq作為中間件

引入依賴:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
           

application.yml中添加連接配接rabbitmq的配置:

spring:
  rabbitmq:
    host: mybroker.com
    port: 5672
    username: user
    password: secret
           

3. 端點

spring cloud bus 提供了兩個端點

/actuator/bus-refresh

 and 

/actuator/bus-env,他們分别對應了

Spring Cloud Commons中的

/actuator/refresh和/actuator/env

3.1 /actuator/bus-refresh

可以清除RefreshScope

中的緩存并且重新綁定@ConfigurationProperties,一般是配合config元件使用,當config server 中的配置修改時可動态重新整理config client中加載的yml配置

暴露端口:

management.endpoints.web.exposure.include=bus-refresh
           

3.2 /actuator/bus-env

更新每個執行個體中指定的environment 鍵值對

暴露端口:

management.endpoints.web.exposure.include=bus-env
           

3.3 destination參數

可以為端口後添加destination參數來指定你想廣播的服務執行個體。

假設我們現在使用/bus-refresh端點

當你想指定某個具體服務執行個體更新時:/actuator/bus-refresh?destination=customers:9000

   參數值為(spring.application.name:server.port)

當你想指定服務更新時:/actuator/bus-refresh?destination=customers:**

   **表示customers服務的所有執行個體

4. 事件追蹤

bus中的事件大緻隻有3中:

  • EnvironmentChangeRemoteApplicationEvent:對應/bus-env的端點事件
  • RefreshRemoteApplicationEvent:對應/bus-refresh端點的事件
  • AckRemoteApplicationEvent:這是一個确認事件,沒有什麼含義,隻是說确認事件已發送或已接受
spring.cloud.bus.trace.enabled=true
           

預設是關閉的,因為它是靠本地項目的記憶體來存儲追蹤記錄的,會比較消耗資源。

開啟後會顯示發送的每個事件和來自每個服務執行個體的所有确認事件(ack)

通過通路端點/actuator/trace可檢視追蹤記錄

{
  "timestamp": "2015-11-26T10:24:44.411+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "stores:8081",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.864+0000",
  "info": {
    "signal": "spring.cloud.bus.sent",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.862+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
}
           

這裡列印的追蹤大緻意思就是

RefreshRemoteApplicationEvent

 事件從customers:9000服務執行個體發出(signal=...sent),并廣播了所有的服務執行個體。

軌迹:我們這裡是檢視的customers:9000該執行個體的追蹤記錄,當請求bus-refresh端口後,該執行個體顯示确認了該事件(第3個json),然後發送了事件到消息中間件中(第2個json),最後因為接受到了來自消息中間件的遠端事件,是以再次列印了ack确認日志(第一個json)

5. 源碼分析

我們分析下/bus-refresh的整個操作流程

5.1 Spring本地事件釋出

需檢視類:RefreshBusEndpoint

因為我們是通過/bus-refresh端點作為入口更新系統的配置資訊的,是以我們可以找到RefreshBusEndpoint類:

@Endpoint(
    id = "bus-refresh"
)
public class RefreshBusEndpoint extends AbstractBusEndpoint {
     ...
    @WriteOperation
    public void busRefresh() {
        this.publish(new RefreshRemoteApplicationEvent(this, this.getInstanceId(), (String)null));
    }
}
           

通過該類,我們發現,其實在我們調用/bus-refresh端口時,它是釋出了一個RefreshRemoteApplicationEvent事件(spring本地事件),注意是本地事件。RefreshRemoteApplicationEvent的父類是RemoteApplicationEvent

5.2 通過Spring本地事件監聽,将消息發送到了消息中間件中

檢視類:BusAutoConfiguration

在BusAutoConfiguration配置類中,我們找到了這麼一段代碼:

@EventListener(
        classes = {RemoteApplicationEvent.class}
    )
    public void acceptLocal(RemoteApplicationEvent event) {
        if (this.serviceMatcher.isFromSelf(event) && !(event instanceof AckRemoteApplicationEvent)) {
            this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
        }

    }
           

發現在這個配置類中,有個本地事件監聽器,且監聽的是RemoteApplicationEvent事件,而在RefreshBusEndpoint中釋出的事件對象的父類正好就是RemoteApplicationEvent事件,是以我們可以斷定,這個監聽器是能夠監聽到剛剛釋出的事件的。

 從代碼中可以看出來它進行了判斷,從方法名上就可以看出來,判斷了這個監聽到的事件是否是自己發的,并且是否是Ack事件,當然我們這裡是滿足的,是以執行了this.cloudBusOutboundChannel.send()方法,這個方法的意思就是向管道中發送消息,這個管道其實就是spring cloud stream 與 消息中間件綁定的管道,通俗來講就是它把這個事件發送到rabbitmq上去了。

補充:this.cloudBusOutboundChannel對象其實是和消息中間件的channel通道相連接配接的,查找BusAutoConfiguration類發現,其實該對象和rabbitmq相連接配接的exchange名稱是springCloudBus

@Autowired
    @Output("springCloudBusOutput")
    public void setCloudBusOutboundChannel(MessageChannel cloudBusOutboundChannel) {
        this.cloudBusOutboundChannel = cloudBusOutboundChannel;
    }
           

spring cloud bus的使用及使用bus釋出自定義事件1. spring cloud bus介紹2. 建構項目3. 端點4. 事件追蹤5. 源碼分析6. 通過bus實作自定義事件發送

5.3 Stream遠端事件監聽

檢視類:BusAutoConfiguration

通過上面的操作,事件已經被發送到rabbitmq上去了,那我們本地就要進行遠端事件的監聽了,監聽的就是this.cloudBusOutboundChannel對象和rabbitmq相連接配接的通道,遠端事件的監聽使用 @StreamListener()注解

@StreamListener("springCloudBusInput")
    public void acceptRemote(RemoteApplicationEvent event) {
        if (event instanceof AckRemoteApplicationEvent) {
            if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event) && this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent(event);
            }

        } else {
            if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) {
                if (!this.serviceMatcher.isFromSelf(event)) {
                    this.applicationEventPublisher.publishEvent(event);
                }

                if (this.bus.getAck().isEnabled()) {
                    AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getServiceId(), this.bus.getAck().getDestinationService(), event.getDestinationService(), event.getId(), event.getClass());
                    this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(ack).build());
                    this.applicationEventPublisher.publishEvent(ack);
                }
            }

            if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass()));
            }

        }
    }
           
檢視代碼我們可以發現接受到事件後會經過很多判斷,判斷是否是ack事件,或該事件是不是自己發送的(this.serviceMatcher.isFromSelf(event),或者該事件是否為我而來,即消息是不是發給我的(his.serviceMatcher.isForSelf(event):上面講過當/bus-refresh時可以指定服務執行個體發送事件,是以雖然能監聽到事件,但不一定這個事件就是針對你的
           

截取下代碼:

if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) {
                if (!this.serviceMatcher.isFromSelf(event)) {
                    this.applicationEventPublisher.publishEvent(event);
                }
}
           
這段代碼意思是:事件就是針對我來的且不是我自己發送的這個事件,那麼就又會在本地發送該事件,發送後會被5.4節中的RefreshListener所監聽到。如果是自己發的,那麼就不做任何處理了,因為他在發送消息到中間件時,就已經被RefreshListener監聽器處理過了。
           

5.4 Spring本地事件監聽

檢視:RefreshListener類

public class RefreshListener implements ApplicationListener<RefreshRemoteApplicationEvent> {
    private static Log log = LogFactory.getLog(RefreshListener.class);
    private ContextRefresher contextRefresher;

    public RefreshListener(ContextRefresher contextRefresher) {
        this.contextRefresher = contextRefresher;
    }

    public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
        Set<String> keys = this.contextRefresher.refresh();
        log.info("Received remote refresh request. Keys refreshed " + keys);
    }
}
           

該類是通過實作ApplicationListener的接口來監聽RefreshRemoteApplicationEvent事件的,然後執行了this.contextRefresher.refresh()方法來重新整理程式中的context上下文。

5.5 流程總結

1. 假設我們通路端口為8080的user服務,當通路端點/bus-refresh時,會被RefreshBusEndpoint執行,該類會發送一個spring本地事件,事件名為RefreshRemoteApplicationEvent(在8080這個執行個體中進行事件傳播)

2. 發送後,會被兩個位置的監聽器監聽:

  • 一個是RefreshListener,該監聽器監聽到後直接refresh上下文
  • 一個是BusAutoConfiguration,該配置類中存在監聽RemoteApplicationEvent事件的監聽器,被這個監聽器監聽後,會向rabbitmq發送事件消息實體

3. 在bus中的應用都會進行strem的遠端事件監聽(@StreamListener("springCloudBusInput"),應用監聽到strem事件後,會判斷事件的發送和發往的情況

  • 如果該事件是發向自己的,并且不是自己發出去的,那麼會再次将這個event通過spring本地事件發出去,讓目前執行個體的RefreshListener監聽器監聽到并執行refresh操作
  • 如果該事件既是自己發出去的,也是發向自己的,那麼就不執行任何操作了,因為在發出去的時候已經被RefreshListener監聽器監聽過了
  • 如果事件不是發向自己的,不執行任何操作

6. 通過bus實作自定義事件發送

6.1 建立事件

事件需要繼承RemoteApplicationEvent類

public class CustomEvent extends RemoteApplicationEvent {

    private CustomEvent() {
        //一定要有,序列化時會用到
    }

    public CustomEvent(String msg, String originService,
                                      String destinationService) {
        super(msg, originService, destinationService);
    }
}
           

6.2 添加配置類

@Configuration
@RemoteApplicationEventScan(basePackageClasses = CustomEvent.class)
public class BusConfiguration {
}
           
@RemoteApplicationEventScan(basePackageClasses = CustomEvent.class)用來告訴bus自己實作的事件在哪個包下,也可直接寫在啟動類上
           

6.3  添加事件監聽器

@Configuration
public class CustomEventListener {

    @Value("${server.port}")
    private String port;

    @EventListener
    public void onCustomRemoteApplicationEvent(CustomEvent event) {
        System.out.printf("CustomRemoteApplicationEvent - " +
                        " port : %s , Source : %s , originService : %s , destinationService : %s \n",
                port,
                event.getSource(),
                event.getOriginService(),
                event.getDestinationService());
    }

}
           

我們這裡監聽器隻是列印了一句話

6.4 事件釋出者

@RestController
public class Controller {
    /**
     * Spring Cloud bus 外部化配置
     */
    @Autowired
    private BusProperties busProperties;

    /**
     * 事件釋出者
     */
    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @PostMapping("/bus/event/publish/custom")
    public boolean publishUserEvent(String msg, @RequestParam(value = "destination", required = false) String destination) {
        //這裡由于我沒有定義ID ,這裡Spring Cloud Bus 自己預設實作了ID
        String instanceId = busProperties.getId();
        CustomEvent event = new CustomEvent(msg, instanceId, destination);
        eventPublisher.publishEvent(event);
        return true;
    }
}
           

代碼中的instanceId可以通過spring.cloud.bus.id屬性設定:

下面是官網的原話,我谷歌翻譯了以下

spring cloud bus的使用及使用bus釋出自定義事件1. spring cloud bus介紹2. 建構項目3. 端點4. 事件追蹤5. 源碼分析6. 通過bus實作自定義事件發送
spring cloud bus的使用及使用bus釋出自定義事件1. spring cloud bus介紹2. 建構項目3. 端點4. 事件追蹤5. 源碼分析6. 通過bus實作自定義事件發送

6.5 配置檔案

server:
  port: 8082
management:
  endpoints:
    web:
      exposure:
        include: "*"
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: springcloud
    password: springcloud
  application:
    name: bus-custom-event
           

6.5啟動測試 

分别以8082和8083端口啟動項目(ideal中可以通過修改program arguments參數來對同一個項目并行啟動)

spring cloud bus的使用及使用bus釋出自定義事件1. spring cloud bus介紹2. 建構項目3. 端點4. 事件追蹤5. 源碼分析6. 通過bus實作自定義事件發送

啟動後,通路:http://localhost:8082/bus/event/publish/custom?destination=bus-custom-event:8083

會發現啟動的兩個項目都可以監聽到事件并列印資訊

繼續閱讀