一。spring-cloud-bus是什麼?
回答這個問題之前,我們先回顧先前的分布式配置,當配置中心發生變化後,我們需要利用spring-boot-actuator裡的refresh端點進行手動重新整理:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcuUDM1QmYxcDZhR2MwIGNhBjN4kTZxUzYkJTOjZTMiVzYfdWbp9CXt92Yu4GZjlGbh5SZslmZxl3Lc9CX6MHc0RHaiojIsJye.png)
根據上述示例情況:我們每次要擷取最新配置時,要一個一個的通過refresh重新整理服務節點,這種方式是不是非常low而且非常麻煩,那該怎麼辦呢?
大家還記得zookeeper中watch是做什麼用的嗎?當監控的節點資料發生變化了,那麼是不是所有訂閱到該節點的用戶端都會觸發一個訂閱回調呢?這其實也類似于我們的消息總線。在微服務架構的系統中,我們通常會使用輕量級的消息代理來建構一個公有的消息主題讓系統中所有微服務執行個體都連接配接上來,由于該主題中産生的消息會被所有執行個體監聽和消費,是以我們稱它為消息總線。
那麼在分布式配置中,我們的所有服務都訂閱消息總線的話,當配置改變時,配置中心通知消息總線,然後所有的服務節點接收到訂閱消息後,在從配置中心擷取最新的配置,是不是一個一勞永逸的過程?那麼可以得到如下結構圖:
二、實作分布式配置的消息總線
我先貼出:項目結構圖
我們統一把配置放在config目錄下
1、在對應的服務添加對應的依賴
首先我們先配置config-server。gradle配置檔案:
dependencies {
// testCompile group: 'junit', name: 'junit', version: '4.12'
compile('org.springframework.cloud:spring-cloud-config-server')
compile('org.springframework.kafka:spring-kafka')
compile('org.springframework.cloud:spring-cloud-starter-bus-kafka')
compile('org.springframework.cloud:spring-cloud-starter-eureka-server')
}
View Code
注意我們使用kafka作為消息總線的中間件
然後我們依次在所需的服務中添加對應的依賴
dependencies {
// testCompile group: 'junit', name: 'junit', version: '4.12'
compile('org.springframework.cloud:spring-cloud-starter-config')
compile('org.springframework.kafka:spring-kafka')
compile('org.springframework.cloud:spring-cloud-starter-bus-kafka')
compile('org.springframework.cloud:spring-cloud-starter-eureka-server')
}
2、配置對應的application.yml
config-server的yml檔案:
spring:
application:
name: config-server
cloud:
config:
server:
git:
uri: file://${user.home}/IdeaProjects/spring-cloud
repos:
local:
pattern: '**/local'
uri: file://${user.home}/IdeaProjects/spring-cloud
searchPaths: config
search-paths: config
label: master
kafka:
bootstrap-servers: localhost:9092
server:
port: 8888
endpoints:
refresh:
sensitive: false
bus:
sensitive: false
這裡面注意要把端點先開放出來,然後進行kafka的相關配置,其餘服務的配置檔案也進行這樣的操作,使其能與消息總線通訊。
3、依次啟動服務
當服務啟動成功時,SpringBootActuator給我們提供一個/bus/refresh端點,同時我們可以在kafka主題裡面找到相應的topics,其名字為springCloudBus:
4、通路Config-Server的/bus/refesh
我們先使用kafka的消費端來監聽一下消息内容。運作:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic springCloudBus
緊接着我們在通路 http://localhost:8888/bus/refresh 重新整理config-server的配置
然後我們可以發現消費端,訂閱到如下消息:
其中:type為消息的事件類型
timestamp 為消息的時間戳
orginService:消息的來源服務執行個體
destinationService:消息的目标服務執行個體,**代表了總線上的所有服務執行個體
在本例子中,我們可以看到每個服務的ackId都來自于 type為RefreshRemoteApplicationEvent的服務ID
5、運作服務
我們先通過gradle的build任務進行打包會得到如下檔案:xxxx.jar與xxx.jar.orginal
那麼進入到對應目錄下 啟動兩個服務并注冊到注冊中心 指令如下:
java -Dserver.port=8300 -Dspring.profiles.active=local -jar xxxx.jar
java -Dserver.port=8200 -Dspring.profiles.active=local -jar xxxx.jar
注意一定不要在application.yml配置如上參數,否則通過(-Dxxx=xxx)系統變量設定的值将不會生效
此時我們更改config對應的配置并commit後,在運作步驟4,就可以拿到最新的結果了,再次附上相關代碼:
StudentConfig:
package com.hzgj.lyrk.order.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ConfigurationProperties(prefix = "student")
@Configuration
public class StudentConfig {
private String name;
private String age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
@Override
public String toString() {
return "StudentConfig{" +
"name='" + name + '\'' +
", age='" + age + '\'' +
'}';
}
}
order-server-local:
student:
name: student_local
age: 17