天天看點

SpringCloudStream內建RabbitMQ實作消息收發

作者:java小悠

一、SpringCloudStream

SpringCloudStream 是一個建構高擴充和事件驅動的微服務系統的架構,用于連接配接共有消息系統,官網位址: spring.io/projects/sp… 。整體上是把各種花裡胡哨的MQ産品抽象成了一套非常簡單的統一的程式設計架構,以實作事件驅動的程式設計模型。社群官方實作了RabbitMQ,Apache Kafka,Kafka Stream和Amazon Kinesis這幾種産品,而其他還有很多産品比如RocketMQ,都是由産品方自行提供擴充實作。

SpringCloudStream內建RabbitMQ實作消息收發

是以可以看到,對于RabbitMQ,使用SpringCloudStream架構算是一種比較成熟的內建方案。但是需要主要注意的是,SpringCloudStream架構內建的版本通常是比RabbitMQ落後幾個版本的,使用時需要注意。

​ SpringCloudStream架構封裝出了三個最基礎的概念來對各種消息中間件提供統一的抽象:

  • Destination Binders:負責內建外部消息系統的元件。
  • Destination Binding:由Binders建立的,負責溝通外部消息系統、消息發送者和消息消費者的橋梁。
  • Message:消息發送者與消息消費者溝通的簡單資料結構。

​ 可以看到,這個模型非常簡單,使用時也會非常友善。但是簡單,意味着SCStream中的各種概念模型,與RabbitMQ的基礎概念之間是有比較大的差距的,例如Exchange、Queue這些原生概念,內建到SCStream架構時,都需要注意如何配置,如何轉換。

1-1、引入依賴

RabbitMQ的SpringCloudStream支援是由Spring社群官網提供的,是以這也是相當成熟的一種內建方案。但是要注意,SpringCloudStream架構內建的版本通常是比RabbitMQ産品本身落後幾個版本的,使用時需要注意。

​ 他的核心依賴也就一個:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <!-- artifactId>spring-cloud-starter-stream-rabbit</artifactId -->
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
複制代碼           

這兩個Maven依賴沒有什麼特别大的差別,實際上,他們的github代碼庫是在一起的。倉庫位址:github.com/spring-clou…

依賴的版本通常建議使用SpringCloud的整體版本控制。 org.springframework.cloud#spring-cloud-dependencies#Hoxton.SR6,這樣各個元件之間的版本比較安全。不建議貿然嘗試新版本。

1-2、配置mq相關參數

spring.rabbitmq.addresses=192.168.253.131:5672,192.168.253.132:5672,192.168.253.133:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/mirror
複制代碼           

1-3、配置啟動類

需要在springboot啟動類上加上如下注解

@EnableBinding({Source.class, Sink.class})
複制代碼           

1-4、聲明消息收費者

@Component
@EnableBinding(Sink.class)
public class MessageReceiver {
   private Logger logger = LoggerFactory.getLogger(MessageReceiver.class);

   @EventListener
   @StreamListener(Sink.INPUT)
   public void process(Object message) {
        System.out.println("received message : " + message);
        logger.info("received message : {}", message);
    }
}
複制代碼           

1-5、聲明消息發送者

@RestController
@EnableBinding(Source.class)
public class SendMessageController {

   @Autowired
   private Source source;
   
   @GetMapping("/send")
   public Object send(String message) {
      MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
      source.output().send(messageBuilder.build());
      return "message sended : "+message;
   }
複制代碼           

1-6、啟動服務測試收發消息

啟動服務之後,會在RabbitMQ服務中自動建立topic類型的交換機(scstreamExchange)及一個比對所有routingKey(#)的隊列(scstreamExchange.myinput-1),并且會進行綁定,如下:

SpringCloudStream內建RabbitMQ實作消息收發

1-6-1、發送消息

SpringCloudStream內建RabbitMQ實作消息收發

1-6-2、接收消息

這裡可以看到,目前消費者不光收到了MQ消息,還收到了一些系統事件(received message相關資訊)。這些系統事件需要添加@EventListener注解才能接收到。

SpringCloudStream內建RabbitMQ實作消息收發

下面去掉@EventListener再次測試一下

SpringCloudStream內建RabbitMQ實作消息收發

1-7、使用現有路由及隊列發送消息

1-7-1、使用fanout模式

SpringCloudStream在使用的時候預設會建立自己的交換機和隊列,如果要使用我們自己已有的,就需要進行一下配置,如一個fanout類型的exchange,綁定了四個隊列的模式

SpringCloudStream內建RabbitMQ實作消息收發

配置資訊:

#-----設定消息生産者
spring.cloud.stream.bindings.output.destination=fanoutExchange
#隊列類型
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=fanout
#不用自己建立、用現有
spring.cloud.stream.rabbit.bindings.output.producer.bind-queue=false


#-----設定消息消費者
spring.cloud.stream.bindings.input.destination=fanoutExchange
spring.cloud.stream.rabbit.bindings.input.consumer.exchange-type=fanout

#接收消息的隊列
spring.cloud.stream.bindings.input.group=fanout.q1
#不自動建立隊列
spring.cloud.stream.rabbit.bindings.input.consumer.bind-queue=false
#設定queue的名字隻有group的名字,不包括destination
spring.cloud.stream.rabbit.bindings.input.consumer.queue-name-group-only=true
spring.cloud.stream.bindings.input.content-type=text/plain
複制代碼           

通過以上的設定,發送消息的時候就可以給fanoutExchange交換機發送消息,這樣和fanoutExchange交換機綁定的隊列就都可以收到消息。

需要注意的是,SpringCloudStream中建立交換機和隊列的時候,會将交換機的名稱作為字首如下:

SpringCloudStream內建RabbitMQ實作消息收發

是以使用我們自己建立的交換機和隊列的時候,需要觀察一下是否也是按照如上規則建立的,如果隊列的字首沒有交換機的名稱,則需要加如下配置

spring.cloud.stream.rabbit.bindings.input.consumer.queue-name-group-only=true
複制代碼           

如下,測試項fanoutExchange發送消息之後,隊列fanout.q1就可以收到消息

SpringCloudStream內建RabbitMQ實作消息收發

而和fanoutExchange綁定的其他三個隊列的消息則仍處于待消費狀态,如下

SpringCloudStream內建RabbitMQ實作消息收發

1-7-2、使用topic模式

1-7-2-1、配置檔案

使用topic或者direct模式的時候,都會使用routingkey,但是使用SpringCloudStream的時候是無法直接穿routingKey的,這就需要在消息發送的時候設定header來進行設定

如使用topic的模式來發送,首先需要修改配置資訊,如下:

#--------------使用routingkey------
#-----設定消息生産者
spring.cloud.stream.bindings.output.destination=topicExchange
#隊列類型
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=topic
#是否持久化
spring.cloud.stream.rabbit.bindings.output.producer.exchange-durable=true
#不用自己建立、用現有
spring.cloud.stream.rabbit.bindings.output.producer.bind-queue=false
#設定routingkey
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.routingkey

#-----設定消息消費者
spring.cloud.stream.bindings.input.destination=topicExchange
spring.cloud.stream.rabbit.bindings.input.consumer.exchange-type=topic
spring.cloud.stream.rabbit.bindings.input.consumer.exchange-durable=true
#接收消息的隊列
spring.cloud.stream.bindings.input.group=hebei.eco
#不自動建立隊列
spring.cloud.stream.rabbit.bindings.input.consumer.bind-queue=false
#設定queue的名字隻有group的名字,不包括destination
spring.cloud.stream.rabbit.bindings.input.consumer.queue-name-group-only=true
#設定接收消息的routingkey
spring.cloud.stream.rabbit.bindings.input.consumer.binding-routing-key=*.eco
spring.cloud.stream.bindings.input.content-type=text/plain
複制代碼           

有routingkey的配置和fanout類型沒有routingkey配置不同的有

1、在發送端需要指定routingkey,headers為固定設定,routingkey為具體的key值,如name=zhangsan,則可以設定headers.name

#設定routingkey 
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.routingkey
複制代碼           

2、在消費端配置消費的routingkey,此處配置的routingkey,就可以設定*或者#進行比對,如下

#設定接收消息的routingkey 
spring.cloud.stream.rabbit.bindings.input.consumer.binding-routing-key=*.eco
複制代碼           

1-7-2-2、發送端代碼修改

如下在MessageBuilder中需要設定Header的key和value,也就是routingkey的key和value值,在上配置中設定的key為routing,則在發送端的代碼中header就設定為routingkey

@RestController
@EnableBinding(Source.class)
public class SendMessageController {

   @Autowired
   private Source source;
   
   @GetMapping("/send")
   public Object send(String message,String routingkey) {
//    MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
      MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message).setHeader("routingkey",routingkey);
      source.output().send(messageBuilder.build());
      return "message sended : "+message;
   }
複制代碼           

1-7-2-3、測試

首先設定routingkey的值為abcd,這樣是無法收到消息的,因為消費端設定的routingkey的值為:*.eco

SpringCloudStream內建RabbitMQ實作消息收發

設定routingkey為:abcd.eco,消息就可以正常接收了,如果設定abcd.123.eco,消息就無法接收了,除非将routingkey設定為#.eco

SpringCloudStream內建RabbitMQ實作消息收發

二、了解SpringCloudStream都幹了什麼

2-1、配置RabbitMQ服務

在SpringBoot的autoconfigure包當中,有個 RabbitProperties類,這個類就會解析application.properties中以spring.rabbitmq開頭的配置。裡面配置了跟RabbitMQ相關的主要參數,包含伺服器位址等。裡面對每個參數也都提供了預設值。如果不進行配置,預設就是通路本地的RabbitMQ服務。

#這幾個是預設配置。  
spring.rabbitmq.host=localhost  
spring.rabbitmq.port=5672  
spring.rabbitmq.username=guest  
spring.rabbitmq.password=guest  
spring.rabbitmq.virtual-host=/
複制代碼           

2-2、在RabbitMQ中聲明Exchange和Queue

既然是要對接RabbitMQ,那麼最終還是需要與RabbitMQ伺服器進行互動的。從RabbitMQ的管理頁面上來看,SCStream幫我們在RabbitMQ的根虛拟機上建立了一個topic類型的scstreamExchange交換機,然後在這個交換機上綁定了一個scstreamExchange.stream隊列,綁定的RoutingKey是#。 而程式中的消息發送者是将消息發送到scstreamExchange交換機,然後RabbitMQ将消息轉發到scstreamExchange.stream隊列,消息接收者從隊列接收到消息。這個流程,就是Spring Cloud Stream在背後為我們做的事情。 在這裡可以嘗試對應RabbitMQ的基礎概念以及SCStream架構中的基礎概念,整理一下他們之間的對應關系。

​ SCStream架構幫我們屏蔽了與消息中間件的互動細節,開發人員甚至都不需要感覺消息中間件的存在,将更多的關注點放到業務處理的細節裡。實際上,就我們這個簡單的示例,隻需要将maven中的spring-cloud-starter-stream-rabbit依賴,換成spring-cloud-starter-stream-kafka,就可以完成與本地Kafka服務的互動,代碼不需要做任何的改動。

2-3、常用配置

在RabbitMQ的實作中,所有個性化的屬性配置實作都是以spring.cloud.stream.rabbit開頭,支援對binder、producer、consumer進行單獨配置。

#綁定exchange  
spring.cloud.stream.binding.<bindingName>.destination=fanoutExchange  
#綁定queue  
spring.cloud.stream.binding.<bindingName>.group=myQueue  
#不自動建立queue  
spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.bindQueue=false  
#不自動聲明exchange(自動聲明的exchange都是topic)  
spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.declareExchange=false  
#隊列名隻聲明組名(前面不帶destination字首)  
spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.queueNameGroupOnly=true  
#綁定rouytingKey  
spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.bindingRoutingKey=myRoutingKey  
#綁定exchange類型  
spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.exchangeType=<type>  
#綁定routingKey  
spring.cloud.stream.rabbit.bindings.<bindingName>.producer.routingKeyExpression='myRoutingKey'
複制代碼           

通過這些配置可以按照RabbitMQ原生的方式進行聲明。例如,SCStream自動建立的Exchange都是Topic類型的,如果想要用其他類型的Exchange交換機,就可以手動建立交換機,然後在應用中聲明不自動建立交換機。

​ 所有可配置的屬性,參見github倉庫中的說明。例如,如果需要聲明一個Quorum仲裁隊列,那麼隻要給這個Binding配置quorum.enabled屬性,值為true就可以了。

Stream隊列目前尚不支援。RabbitMQ周邊生态的發展肯定是比産品自身的發展速度要慢的,由此也可見,目前階段,Stream隊列離大規模使用還是有一點距離的。

2-4、分組消費模式

分組可以讓消息實作負載均衡的政策,例如大并發過來之後,生成端會發送大量消息,而消費端消費速度較慢就可以生成多個分組,然後生産端根據政策向不同的分組發送消息,就可以加快消息的消費速度

2-4-1、配置資訊

SCStream中的消費者分組政策,其實整體來看是一種類似于Kafka的分組消費機制。即,不同group的消費者,都會消費到所有的message消息,而在同一個goup中,每個message消息,隻會被消費一次。這種分組消費的政策,嚴格來說,在RabbitMQ中是不存在的,RabbitMQ是通過不同類型的Exchange來實作不同的消費政策。而使用SCStream架構,就可以直接在RabbitMQ中實作這種分組消費的政策

#消息生産者端配置  
#啟動發送者分區  
spring.cloud.stream.bindings.output.producer.partitioned=true  
#指定參與消息分區的消費端節點數量  
spring.cloud.stream.bindings.output.producer.partition-count=2  
#隻有消費端分區ID為1的消費端能接收到消息  
spring.cloud.stream.bindings.output.producer.partition-key-expression=1  
  
#消息消費者端配置  
#啟動消費分區  
spring.cloud.stream.bindings.input.consumer.partitioned=true  
#參與分區的消費端節點個數  
spring.cloud.stream.bindings.input.consumer.instance-count=2  
#設定該執行個體的消費端分區ID  
spring.cloud.stream.bindings.input.consumer.instance-index=1
複制代碼           

通過這樣的分組政策,目前這個消費者執行個體就隻會消費奇數編号的消息,而偶數編号的消息則不會發送到這個消費者中。**注意:**這并不是說偶數編号的消息就不會被消費,隻是不會被目前這個執行個體消費而已。

SCStream架構雖然實作了這種分組政策機制,但是其實是不太嚴謹的,當把分區數量和分區ID不按套路配置設定時,并沒有太多的檢查和日志資訊,但是就是收不到消息。

另外,在@StreamListener注解中還有condition屬性也可以配置消費者的配置設定邏輯,該屬性支援一個SPELl表達式,隻接收滿足條件的消息。

當設定了分組消費的時候,綁定的隊列及routingkey就變成了如下關系

SpringCloudStream內建RabbitMQ實作消息收發

2-4-2、通過header靈活指定消費分組

上面的配置隻設定固定的消費分組,實際場景中顯然是不行的,這樣就可以通過使用header來進行處理

可以配置headers.routingkey來進行動态發送

spring.cloud.stream.bindings.output.destination=scstreamExchange
#指定參與消息分區的消費端節點數量
spring.cloud.stream.bindings.output.producer.partition-count=2
#隻有消費端分區ID為1的消費端能接收到消息
#spring.cloud.stream.bindings.output.producer.partition-key-expression=0
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers.routingkey

#這個input就對應Sink.INPUT strem中預設的消費隊列
spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=myinput
#參與分區的消費端節點個數
spring.cloud.stream.bindings.input.consumer.instance-count=2
#設定該執行個體的消費端分區ID
spring.cloud.stream.bindings.input.consumer.instance-index=0
#啟動消費分區
spring.cloud.stream.bindings.input.consumer.partitioned=true
複制代碼           

然後在發送端代碼就可以通過設定header的routingkey來指定發送的分組了

@GetMapping("/send")
   public Object send(String message,String routingkey) {
//    MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
      MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message).setHeader("routingkey",routingkey);
      source.output().send(messageBuilder.build());
      return "message sended : "+message;
   }
複制代碼           

測試:

無法收到消息: http://localhost:8080/send?message=fdsfgg&routingkey=1

可以收到消息: http://localhost:8080/send?message=fdsfgg&routingkey=0

原文連結:https://juejin.cn/post/7228198543764848701