天天看點

SpringCloud微服務知識整理九:消息總線:Spring Cloud Bus什麼是Spring Cloud Bus一、消息代理二、RabbitMQ實作消息總線三、Kafka實作消息總線

什麼是Spring Cloud Bus

在微服務架構的系統中, 我們通常會使用輕量級的消息代理來建構一個共用的消息主題讓系統中所有微服務執行個體都連接配接上來, 由于該主題中産生的消息會被所有執行個體監聽和消費, 是以我們稱它為消息總線。

在總線上的各個執行個體都可以友善地廣播一些需要讓其他連接配接在該主題上的執行個體都知道的消息, 例如配置資訊的變更或者其他一些管理操作等。

通過使用 Spring Cloud Bus 可以非常容易地搭建起消息總線, 同時實作了一些消息總線中的常用功能。

比如,配合Spring Cloud Config 實作微服務應用配置資訊的動态更新等。

一、消息代理

消息代理 (Message Broker) 是一種消息驗證、 傳輸、 路由的架構模式。 它在應用程式之間起到通信排程并最小化應用之間的依賴的作用, 使得應用程式可以高效地解耦通信過程。

消息代理是一個中間件産品, 它的核心是一個消息的路由程式, 用來實作接收和分發消息, 并根據設定好的消息處理流來轉發給正确的應用。

它包括獨立的通信和消息傳遞協定, 能夠實作組織内部群組織間的網絡通信。 設計代理的目的就是為了能夠從應用程式中傳入消息, 并執行一些特别的操作, 下面這些是在企業應用中, 我們經常需要使用消息代

理的場景:

将消息路由到一個或多個目的地

消息轉化為其他的表現方式

執行消息的聚集、 消息的分解, 并将結果發送到它們的目的地, 然後重新組合響應傳回給消息使用者

調用Web服務來檢索資料

響應事件或錯誤

使用釋出-訂閱模式來提供内容或基于主題的消息路由

目前版本的Spring Cloud Bus僅支待兩款中間件産品: RabbitMQ和Kafka。

二、RabbitMQ實作消息總線

AMQP是Advanced Message Queuing Protocol的簡稱,它是一個面向消息中間件的開放式标準應用層協定。AMQP定義了這些特性:

消息方向

消息隊列

消息路由(包括:點到點和釋出-訂閱模式)

可靠性

安全性

RabbitMQ就是以AMQP協定實作的一種中間件産品,它可以支援多種作業系統,多種程式設計語言,幾乎可以覆寫所有主流的企業級技術平台。

安裝略

快速入門

建立一個Spring Boot工程,命名為:“rabbitmq-hello”。

在pom.xml中引入如下依賴内容,其中spring-boot-starter-amqp用于支援RabbitMQ。

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
           

在application.properties中配置關于RabbitMQ的連接配接和使用者資訊

spring.application.name=rabbitmq-hello

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=spring
spring.rabbitmq.password=123456
           

建立消息生産者Sender。

通過注入AmqpTemplate接口的執行個體來實作消息的發送,AmqpTemplate接口定義了一套針對AMQP協定的基礎操作。

在Spring Boot中會根據配置來注入其具體實作。在該生産者,我們會産生一個字元串,并發送到名為hello的隊列中。

@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }

}
           

建立消息消費者Receiver。

通過@RabbitListener注解定義該類對hello隊列的監聽,并用@RabbitHandler注解來指定對消息的處理方法。

是以,該消費者實作了對hello隊列的消費,消費操作為輸出消息的字元串内容。

@Component
@RabbitListener(queues = "hello")
public class Receiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver : " + hello);
    }

}
           

建立RabbitMQ的配置類RabbitConfig,用來配置隊列、交換器、路由等進階資訊。

@Configuration
public class RabbitConfig {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }

}
           

建立應用主類:

@SpringBootApplication
public class HelloApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloApplication.class, args);
    }

}
           

建立單元測試類,用來調用消息生産:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = HelloApplication.class)
public class HelloApplicationTests {

    @Autowired
    private Sender sender;

    @Test
    public void hello() throws Exception {
        sender.send();
    }

}
           

嘗試運作

整合Spirng Cloud Bus

準備工作:不做新的應用,用到上一章中已經實作的關于Spring Cloud Config的幾個工程。

config-repo:定義在Git倉庫中的一個目錄,其中存儲了多環境配置檔案,配置檔案中有一個from參數。

config-server-eureka:配置了Git倉庫,并注冊到了Eureka的服務端。

config-client-eureka:通過Eureka發現Config Server的用戶端,用來通路配置伺服器以擷取配置資訊。該應用中提供了一個/from接口,它會擷取properties中的from屬性傳回。

擴充config-client-eureka應用

修改pom.xml增加spring-cloud-starter-bus-amqp子產品(注意spring-boot-starter-actuator子產品也是必須的)。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
           

在配置檔案中增加關于RabbitMQ的連接配接和使用者資訊

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=springcloud
spring.rabbitmq.password=123456

           

啟動config-server-eureka,再啟動兩個config-client-eureka(分别在不同的端口上,比如7002、7003),我們可以在config-client-eureka中的控制台中看到如下内容,在啟動時候,用戶端程式多了一個/bus/refresh請求。

先通路兩個config-client-eureka的/from請求,會傳回目前properties中的from屬性。

接着,我們修改properties中的from屬性值,并發送POST請求到其中的一個/bus/refresh。

最後,再分别通路啟動的兩個config-client-eureka的/from請求,此時這兩個請求都會傳回最新的properties中的from屬性。

到這裡已經能夠通過Spring Cloud Bus來實時更新總線上的屬性配置了。

原理分析

通過使用Spring Cloud Bus與Spring Cloud Config的整合,并以RabbitMQ作為消息代理,實作了應用配置的動态更新。

SpringCloud微服務知識整理九:消息總線:Spring Cloud Bus什麼是Spring Cloud Bus一、消息代理二、RabbitMQ實作消息總線三、Kafka實作消息總線

指定重新整理範圍

有些特殊場景下(比如:灰階釋出),我們希望可以重新整理微服務中某個具體執行個體的配置。

/bus/refresh接口還提供了destination參數,用來定位具體要重新整理的應用程式。

比如,我們可以請求/bus/refresh?destination=customers:9000,此時總線上的各應用執行個體會根據destination屬性的值來判斷是否為自己的執行個體名,若符合才進行配置重新整理,若不符合就忽略該消息。

destination參數除了可以定位具體的執行個體之外,還可以用來定位具體的服務。定位服務的原理是通過使用Spring的PathMatecher(路徑比對)來實作,

比如:/bus/refresh?destination=customers:**,該請求會觸發customers服務的所有執行個體進行重新整理。

架構優化

Spring Cloud Bus的/bus/refresh接口提供了針對服務和執行個體進行配置更新的參數,那麼我們的架構也相應的可以做出一些調整。

在之前的架構中,服務的配置更新需要通過向具體服務中的某個執行個體發送請求,再觸發對整個服務叢集的配置更新。

這樣指定的應用執行個體就會不同于叢集中的其他應用執行個體。

我們要盡可能的讓服務叢集中的各個節點是對等的。

SpringCloud微服務知識整理九:消息總線:Spring Cloud Bus什麼是Spring Cloud Bus一、消息代理二、RabbitMQ實作消息總線三、Kafka實作消息總線

做了這些改動:

1、在Config Server中也引入Spring Cloud Bus,将配置服務端也加入到消息總線中來。

2、/bus/refresh請求不再發送到具體服務執行個體上,而是發送給Config Server,并通過destination參數來指定需要更新配置的服務或執行個體。

通過上面的改動,服務執行個體就不需要再承擔觸發配置更新的職責。同時,對于Git的觸發等配置都隻需要針對Config Server即可,進而簡化了叢集上的一些維護工作。

三、Kafka實作消息總線

Kafka簡介

Kafka是一個由LinkedIn開發的分布式消息系統,它于2011年初開源,現在由著名的Apache基金會維護與開發。Kafka使用Scala實作,被用作LinkedIn的活動流和營運資料處理的管道,現在也被諸多網際網路企業廣泛地用作為資料流管道和消息系統。

Kafka是基于消息釋出/訂閱模式實作的消息系統,其主要設計目标如下:

消息持久化:以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的通路性能。

高吞吐:在廉價的商用機器上也能支援單機每秒100K條以上的吞吐量

分布式:支援消息分區以及分布式消費,并保證分區内的消息順序

跨平台:支援不同技術平台的用戶端(如:Java、PHP、Python等)

實時性:支援實時資料處理和離線資料處理

伸縮性:支援水準擴充

Kafka中涉及的一些基本概念:

Broker:Kafka叢集包含一個或多個伺服器,這些伺服器被稱為Broker。

Topic:邏輯上同Rabbit的Queue隊列相似,每條釋出到Kafka叢集的消息都必須有一個Topic。(實體上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然儲存于一個或多個Broker上,但使用者隻需指定消息的Topic即可生産或消費資料而不必關心資料存于何處)

Partition:Partition是實體概念上的分區,為了提供系統吞吐率,在實體上每個Topic會分成一個或多個Partition,每個Partition對應一個檔案夾(存儲對應分區的消息内容和索引檔案)。

Producer:消息生産者,負責生産消息并發送到Kafka Broker。

Consumer:消息消費者,向Kafka Broker讀取消息并處理的用戶端。

Consumer Group:每個Consumer屬于一個特定的組(可為每個Consumer指定屬于一個組,若不指定則屬于預設組),組可以用來實作一條消息被組内多個成員消費等功能。

快速入門

環境安裝

我們需要從官網上下載下傳安裝媒體。

解壓,Kafka的設計中依賴了ZooKeeper,根據實際的系統來設定環境變量。

啟動ZooKeeper和Kafka來進行消息的生産和消費。

啟動測試

啟動ZooKeeper,執行指令:zookeeper-server-start config/zookeeper.properties

啟動Kafka,執行指令:kafka-server-start config/server.properties

建立Topic,執行指令:kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test,通過該指令,建立一個名為“test”的Topic,該Topic包含一個分區一個Replica。在建立完成後,可以使用kafka-topics --list --zookeeper localhost:2181指令來檢視目前的Topic。

建立消息生産者,執行指令:kafka-console-producer --broker-list localhost:9092 --topic test。

建立消息消費者,執行指令:kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning。

整合Spring Cloud Bus

要使用Kafka來實作消息總線時,隻需要把spring-cloud-starter-bus-amqp替換成spring-cloud-starter-bus-kafka子產品,在pom.xml的dependenies節點中進行修改

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
           

如果在啟動Kafka時均采用了預設配置,那麼不需要再做任何其他配置就能在本地實作從RabbitMQ到Kafka的切換。

Kafka配置

實際應用中,Kafka和ZooKeeper一般都會獨立部署,是以在應用中都需要來為Kafka和ZooKeeper配置一些連接配接資訊等。

具體見第十章