好了現在我們接着上一篇的随筆,繼續來講。上一篇我們講到,我們如果要去更新所有微服務的配置,在不重新開機的情況下去更新配置,隻能依靠spring cloud config了,但是,是我們要一個服務一個服務的發送post請求,
我們能受的了嗎?這比之前的沒配置中心好多了,那麼我們如何繼續避免挨個挨個的向服務發送Post請求來告知服務,你的配置資訊改變了,需要及時修改記憶體中的配置資訊。
這時候我們就不要忘記消息隊列的釋出訂閱模型。讓所有為服務來訂閱這個事件,當這個事件發生改變了,就可以通知所有微服務去更新它們的記憶體中的配置資訊。這時Bus消息總線就能解決,你隻需要在springcloud Config Server端發出refresh,就可以觸發所有微服務更新了。
如下架構圖所示:
Spring Cloud Bus除了支援RabbitMQ的自動化配置之外,還支援現在被廣泛應用的Kafka。在本文中,我們将搭建一個Kafka的本地環境,并通過它來嘗試使用Spring Cloud Bus對Kafka的支援,實作消息總線的功能。
Kafka使用Scala實作,被用作LinkedIn的活動流和營運資料處理的管道,現在也被諸多網際網路企業廣泛地用作為資料流管道和消息系統。
Kafak架構圖如下:
Kafka是基于消息釋出/訂閱模式實作的消息系統,其主要設計目标如下:
1.消息持久化:以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的通路性能。
2.高吞吐:在廉價的商用機器上也能支援單機每秒100K條以上的吞吐量
3.分布式:支援消息分區以及分布式消費,并保證分區内的消息順序
4.跨平台:支援不同技術平台的用戶端(如:Java、PHP、Python等)
5.實時性:支援實時資料處理和離線資料處理
6.伸縮性:支援水準擴充
Kafka中涉及的一些基本概念:
1.Broker:Kafka叢集包含一個或多個伺服器,這些伺服器被稱為Broker。
2.Topic:邏輯上同Rabbit的Queue隊列相似,每條釋出到Kafka叢集的消息都必須有一個Topic。(實體上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然儲存于一個或多個Broker上,但使用者隻需指定消息的Topic即可生産或消費資料而不必關心資料存于何處)
3.Partition:Partition是實體概念上的分區,為了提供系統吞吐率,在實體上每個Topic會分成一個或多個Partition,每個Partition對應一個檔案夾(存儲對應分區的消息内容和索引檔案)。
4.Producer:消息生産者,負責生産消息并發送到Kafka Broker。
5.Consumer:消息消費者,向Kafka Broker讀取消息并處理的用戶端。
6.Consumer Group:每個Consumer屬于一個特定的組(可為每個Consumer指定屬于一個組,若不指定則屬于預設組),組可以用來實作一條消息被組内多個成員消費等功能。
可以從kafka的架構圖看到Kafka是需要Zookeeper支援的,你需要在你的Kafka配置裡面指定Zookeeper在哪裡,它是通過Zookeeper做一些可靠性的保證,做broker的主從,我們還要知道Kafka的消息是以topic形式作為組織的,Producers發送topic形式的消息,
Consumer是按照組來分的,是以,一組Consumers都會都要同樣的topic形式的消息。在服務端,它還做了一些分片,那麼一個Topic可能分布在不同的分片上面,友善我們拓展部署多個機器,Kafka是天生分布式的。
這裡為了示範,我們隻需要用它的預設配置,在windows上做個小Demo即可。
我們這裡主要針對Spring Cloud Bus對Kafka的支援,實作消息總線的功能,具體的Kafka,RabbitMQ消息隊列希望自己去找資料來學習一下。
有了一些概念的支援後,我們進行一些Demo。如下:
首先建立一個springCloud-config-client1子產品,友善我們進行測試
所引入的依賴如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
<version>1.3.2.RELEASE</version>
</dependency>
接着要注意一下,client1的配置檔案要改為bootstrap.yml,因為這種配置格式,是優先加載的,上一篇随筆有講過,client1的配置如下:
server:
port: 7006
spring:
application:
name: cloud-config
cloud:
config:
#啟動什麼環境下的配置,dev 表示開發環境,這跟你倉庫的檔案的字尾有關,比如,倉庫配置檔案命名格式是cloud-config-dev.properties,是以profile 就要寫dev
profile: dev
discovery:
enabled: true
#這個名字是Config Server端的服務名字,不能瞎寫。
service-id: config-server
#注冊中心
eureka:
client:
service-url:
defaultZone: http://localhost:8888/eureka/,http://localhost:8889/eureka/
#是否需要權限拉去,預設是true,如果不false就不允許你去拉取配置中心Server更新的内容
management:
security:
enabled: false
接着啟動類如下:
@SpringBootApplication
@EnableDiscoveryClient
public class Client1Application {
public static void main(String[] args) {
SpringApplication.run(Client1Application.class, args);
}
}
接着将client中的TestController指派一份到client1中,代碼如下:
@RestController
//這裡面的屬性有可能會更新的,git中的配置中心變化的話就要重新整理,沒有這個注解内,配置就不能及時更新
@RefreshScope
public class TestController {
@Value("${name}")
private String name;
@Value("${age}")
private Integer age;
@RequestMapping("/test")
public String test(){
return this.name+this.age;
}
}
接着還要在先前的随筆中的子產品中的Config Server加入如下配置:
#是否需要權限拉去,預設是true,如果不false就不允許你去拉取配置中心Server更新的内容
management:
security:
enabled: false
接着還要做一點就是,在config-client,config-client1,和config-Server都要引入kafka的依賴,如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
<version>1.3.2.RELEASE</version>
</dependency>
我們工程準備好了,暫時先放在這裡,下面進行Kafka的安裝下載下傳,首先我們去Kafka官網kafka.apache.org/downloads 下來官網推薦的版本,
首先我們進到下載下傳好的Kafka目錄中kafka_2.11-1.1.0\bin\windows 下編輯kafka-run-class.bat如下:
找到這條配置 如下:
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %*
可以看到%CLASSPATH%沒有雙引号,
是以用雙引号括起來,不然啟動不起來的,報你JDK沒安裝好,修改後如下:
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
接着,打開config檔案夾中的server.properties配置如下:
可以看到是連接配接到本地的zookeeper就行了。
接着我們進行先啟動zookeeper,再啟動Kafka,如下:
當看到上面的資訊證明啟動Zookeeper啟動成功。、
接下來再開一個CMD啟動Kafka,如下:
看到這些資訊說明Kafka啟動成功了
好了,接下來把前面的工程,兩個注冊中心,一個springcloud-config-server,兩個springcloud-config-client,springcloud-config-client1啟動起來,
可以看到springcloudBus是在0分片上,如果兩個config-client啟動都出現上面資訊,證明啟動成功了。
好了現在我們進行通路一下config-server端,如下:
再通路兩個client,如下:
好了,好戲開始了,現在我們去git倉庫上修改配置中心的檔案,将年齡改為24,如下:
接下來,我們我們用refresh重新整理配置服務端配置,通知兩個client去更新記憶體中的配置資訊。用postman發送localhost:7000/bus/refresh,如下:
可以看到沒有傳回什麼資訊,但是不要擔心,這是成功的通知所有client去更新了記憶體中的資訊了。
接着我們分别重新請求config-server,兩個client,重新整理頁面,結果如下:
兩個client如下:
可以看到所有client自動更新記憶體中的配置資訊了。
到目前為止,上面都是重新整理說有的配置的資訊的,如果我們想重新整理某個特定服務的配置資訊也是可以的。我們可以指定重新整理範圍,如下:
指定重新整理範圍
上面的例子中,我們通過向服務執行個體請求Spring Cloud Bus的
/bus/refresh
接口,進而觸發總線上其他服務執行個體的
/refresh
。但是有些特殊場景下(比如:灰階釋出),我們希望可以重新整理微服務中某個具體執行個體的配置。
Spring Cloud Bus對這種場景也有很好的支援:
/bus/refresh
接口還提供了
destination
參數,用來定位具體要重新整理的應用程式。比如,我們可以請求
/bus/refresh?destination=服務名字:9000
,此時總線上的各應用執行個體會根據
destination
屬性的值來判斷是否為自己的執行個體名,
若符合才進行配置重新整理,若不符合就忽略該消息。
destination
參數除了可以定位具體的執行個體之外,還可以用來定位具體的服務。定位服務的原理是通過使用Spring的PathMatecher(路徑比對)來實作,比如:
/bus/refresh?destination=customers:**
,該請求會觸發
customers
服務的所有執行個體進行重新整理。