天天看點

rabbitmq叢集削峰解耦

作者:一起玩程式設計

MQ的作用

1)解耦:在項目啟動之初是很難預測未來會遇到什麼困難的,消息中間件在處理過程中插入了一個隐含的,基于資料的接口層,兩邊都實作這個接口,這樣就允許獨立的修改或者擴充兩邊的處理過程,隻要兩邊遵守相同的接口限制即可。

2)備援(存儲):在某些情況下處理資料的過程中會失敗,消息中間件允許把資料持久化直到他們完全被處理

3)擴充性:消息中間件解耦了應用的過程,是以提供消息入隊和處理的效率是很容易的,隻需要增加處理流程就可以了。

4)削峰:在通路量劇增的情況下,但是應用仍然需要發揮作用,但是這樣的突發流量并不常見。而使用消息中間件采用隊列的形式可以減少突發通路壓力,不會因為突發的逾時負荷要求而崩潰

5)可恢複性:當系統一部分元件失效時,不會影響到整個系統。消息中間件降低了程序間的耦合性,當一個處理消息的程序挂掉後,加入消息中間件的消息仍然可以在系統恢複後重新處理

6)順序保證:在大多數場景下,處理資料的順序也很重要,大部分消息中間件支援一定的順序性

7)緩沖:消息中間件通過一個緩沖層來幫助任務最高效率的執行

8)異步通信:通過把把消息發送給消息中間件,消息中間件并不立即處。

下面從rabbitmq叢集環境搭建、使用、削峰等過程了解rabbitmq的使用

搭建Rabbitmq叢集

節點規劃

2台虛拟機做叢集,IP分别為192.168.56.101,192.168.56.102

修改每台虛拟機的hosts檔案,設定各個節點主機名

vim /etc/hosts

#添加内容

192.168.56.101 rabbit-101

192.168.56.102 rabbit-102

重新開機網卡讓hosts生效

systemctl restart network

Rabbitmq安裝

先各個節點安裝好Rabbitmq

因為RabbitMQ是由Erlang語言開發的是以需要安裝Erlang的開發環境,再安裝RabbitMQ

安裝Erlang

  • 增加Erlang的yum源

curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash

  • yum安裝Erlang

yum install erlang -y

  • 安裝完erlang後檢驗版本

erl -version

rabbitmq叢集削峰解耦

安裝rabbitmq

  • 增加rabbitmq的yum源

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

  • yum安裝rabbitmq

yum install rabbitmq-server -y

配置rabbitmq

#開機啟動
systemctl enable rabbitmq-server.service
#啟動rabbitmq
systemctl start rabbitmq-server.service
# 開啟背景管理
rabbitmq-plugins enable rabbitmq_management
# 設定背景管理者
rabbitmqctl add_user admin 'admin'
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
# 重新開機rabbitmq
systemctl restart rabbitmq-server.service           

防火牆開放5672服務端口和15672 web界面端口

#檢視防火牆狀态
systemctl status firewalld
#開放端口
firewall-cmd --permanent --zone=public --add-port=5672/tcp
firewall-cmd --permanent --zone=public --add-port=15672/tcp
#重新開機防火牆
firewall-cmd --reload
#檢視開放的端口
firewall-cmd --permanent --zone=public --list-port           

叢集搭建

同步.erlang.cookie檔案

選擇rabbit-101主機,将 /var/lib/rabbitmq/.erlang.cookie檔案同步到另外1台機器上。

scp /var/lib/rabbitmq/.erlang.cookie [email protected]:/var/lib/rabbitmq/

同步完成後,重新開機rabbit-102機器的rabbitmq

systemctl restart rabbitmq-server.service

開放叢集端口

每台機器的防火牆開放叢集端口

#開放端口
firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
#重新開機防火牆
firewall-cmd --reload           

建構叢集

将rabbitmq加入叢集,調用rabbitmqctl join_cluster 指令即可。在rabbit-102上執行叢集加入指令。

  • 先關閉rabbit-102的rabbitmq app,(rabbitmq在啟動時,會啟動Erlang節點 和 rabbitmq app2個程序)
  • rabbitmqctl stop_app
  • 如果不關閉,直接調用rabbitmqctl join_cluster會提示如下錯誤:
Error: this command requires the 'rabbit' app to be stopped on the target node. Stop it with 'rabbitmqctl stop_app'.
Arguments given:
join_cluster rabbit@rabbit-101           
  • 確定啟動rabbit-101的rabbitmq app已啟動,在rabbit-102上執行 rabbitmqctl join_cluster rabbit@rabbit-101 指令。
  • 如果rabbit-101的rabbitmq app未啟動,會提示如下錯誤:
[root@rabbit-102 ~]# rabbitmqctl join_cluster rabbit@rabbit-101
Clustering node rabbit@rabbit-102 with rabbit@rabbit-101
Error:
mnesia_not_running           
  • 在rabbit-102上啟動rabbitmq app rabbitmqctl start_app

檢視叢集資訊

檢視叢集資訊:rabbitmqctl cluster_status

rabbitmq叢集削峰解耦

連接配接RabbitMQ叢集

  • 添加rabbitmq依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>           
  • application.yml配置叢集位址address(非host、port)和使用者密碼
spring:  
  rabbitmq:  
    #本地叢集  
    addresses: 192.168.56.101:5672,192.168.56.102:5672  
    username: admin  
    password: admin             

使用RabbitMQ叢集

關鍵概念

Publisher消息的生成者

向交換機釋出消息的用戶端程式。

交換機Exchange

用來接受消息生成者發送的消息并将這些消息路由(即轉發)至伺服器中的隊列。

隊列Queue

消息隊列,用來儲存消息直到發送到消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或者多個隊列。消息一直在消息隊列裡,等待消費者連接配接到這個隊列來擷取消息。

綁定Binding

用于 消息隊列 和 交換機 之間的關聯。一個綁定就是基于 路由鍵 (Routing-Key)将交換器和消息隊列連接配接起來的路由規則,是以可以将交換機了解成一個由綁定構成的路由表。

Virtual Host虛拟主機

表示一批交換機、消息隊列和相關對象

交換機類型

direct

消息中的路由鍵(Routing Key)如果和Binding中的Binding Key一緻,交換機就将消息發到對應的隊列中。路由鍵和隊列名完全比對。它是完全比對、單點傳播模式。

測試代碼

發送端:

  • 項目結構
  • 添加rabbitmq依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>           
  • 編寫配置,聲明隊列和交換機并用路由鍵綁定。
package com.s.t.mq.rabbit.conf;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {
@Bean
public Queue testDirectQueue() {
return new Queue("testDirectQueue");
}
//Direct交換機 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
//交換機持久化
return new DirectExchange("TestDirectExchange",true,false);
}
/**
* 綁定 将隊列和交換機綁定, 并設定用于比對鍵:testDirectRouting。綁定後消息發送時會直接建立隊列,不綁定則不會建立隊列
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(testDirectQueue()).to(TestDirectExchange()).with("testDirectRouting");
}
}           
  • 編寫控制器執行寫入mq,執行發送時,自動建立交換機和隊列
package com.s.t.mq.rabbit.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
public class SendMessageController {

@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法
/**
* 向TestDirectExchange交換機,testDirectRouting 路由鍵發送消息
* @return
*/
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageData = "test direct message.";
Map<String, Object> map = buildMsg(messageData);
//将消息攜帶綁定鍵值:notDurableKey 發送到交換機TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "testDirectRouting", map);
return "ok";
}
private Map<String, Object> buildMsg(String messageData) {
String messageId = String.valueOf(UUID.randomUUID());
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
return map;
}
}           
  • 編寫啟動類
package com.s.t.mq.rabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitProviderApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitProviderApplication.class, args);
System.out.println("RabbitProviderApplication 啟動成功。");
}
}           
  • 配置rabbitmq位址
server:  
  port: 8121  
spring:  
  #給項目來個名字  
  application:  
    name: rabbitmq-provider  
  #配置rabbitMq 伺服器  
  rabbitmq:  
  #本地叢集  
    addresses: 192.168.56.101:5672,192.168.56.102:5672  
    username: admin  
    password: admin  
           

消費端:

  • 項目結構
  • 監聽消息服務,@Queue 指定隊列,key 指定路由鍵。(同一個key可以綁定到多個隊列,如果隊列未建立,消費端應用啟動時會自動建立隊列)
package com.s.t.mq.rabbit.receiver;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class DirectBindReceiver {
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "testDirectQueue"), key = {"testDirectRouting"},exchange = @Exchange(name = "TestDirectExchange",type = "direct"))})
public void bindReceiveTestDirectQueue(Map message){
System.out.println("key: testDirectRouting,接收到的direct消息: "+message);
}
}           
  • 測試過程啟動發送端執行發送端的 sendDirectMessage 方法,自動建立 TestDirectExchange 交換機和 testDirectQueue 隊列,并向隊列發送消息
  • 啟動消費端,檢視消費端控制台輸出,消費端啟動則收到發送端傳遞的消息

fanout

Fanout交換機原理有點像廣播的原理一樣,每個發到Fanout類型的交換機的消息都會分到所有綁定的隊列中。Fanout交換機不處理路由鍵,隻是簡單的将隊列綁定到交換機上。

每個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。

測試代碼

發送端:

@GetMapping("/sendFanoutBindMessage")
public String sendFanoutMessage() {
String messageData = "fanout message, durable default.";
Map<String, Object> map = buildMsg(messageData);
//将消息發送到交換機NormalFanoutExchange,不要綁定鍵值
rabbitTemplate.convertAndSend("NormalFanoutExchange", "", map);
return "ok";
}           

消費端:

package com.s.t.mq.rabbit.receiver;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 發送端隻用發送到交換機不用指定routingKey
*/
@Component
public class FanoutBindReceiver {
@RabbitListener(bindings = {@QueueBinding(value = @Queue("fanout-bind-receiver01"),
exchange = @Exchange(name = "NormalFanoutExchange",type = "fanout"))})
public void bindReceive01(Map message){
System.out.println("01接收到的fanout消息: "+message+", no key");
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue("fanout-bind-receiver02"),
exchange = @Exchange(name = "NormalFanoutExchange",type = "fanout"))})
public void bindReceive02(Map message){
System.out.println("02接收到的fanout消息: "+message+", no key");
}
}           

發送端執行發送方法,消費端輸出如下:

topic

Topic交換機通過模式比對配置設定消息路由鍵屬性,将路由鍵和某個模式進行比對,此時隊列需要綁定到一個模式上。它将路由鍵和綁定鍵的字元串切分成單詞,每個單詞之間用點隔開。

該模式隻識别兩個通配符:符号"#“和符号”*"。#比對0個或者多個單詞。"*"比對一個單詞。

測試代碼

發送端:根據輸入參數 model 綁定鍵值

@GetMapping("/sendTopicBindMessage")
public String sendTopicMessage(String model) {
String messageData = model + " this is topic message, durable default.";
Map<String, Object> map = buildMsg(messageData);
//将消息發送到交換機TopicExchange,根據輸入參數綁定鍵值
rabbitTemplate.convertAndSend("TopicExchange", model, map);
return "ok";
}           

消費端:

package com.s.t.mq.rabbit.receiver;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class TopicBindReceiver {
@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic-bind-receiver01"),
key={"aa"}, exchange = @Exchange(name = "TopicExchange",type = "topic"))})
public void bindReceive01(Map message){
System.out.println("01:key: aa,接收到的topic消息: "+message);
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic-bind-receiver02"),
key={"aa.*"}, exchange = @Exchange(name = "TopicExchange",type = "topic"))})
public void bindReceive02(Map message){
System.out.println("02:key: aa.*,接收到的topic消息: "+message);
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic-bind-receiver03"),
key={"aa.#"}, exchange = @Exchange(name = "TopicExchange",type = "topic"))})
public void bindReceive03(Map message){
System.out.println("03:key: aa.#,接收到的topic消息: "+message);
}
}           

測試

  • 界面傳入model參數為aa
  • 消費端比對到aa和aa.#
  • 界面傳入model參數為aa.bb
  • 消費端比對到aa.#和aa.*

headers

headers交換機和direct交換機完全一緻,但性能差很多目前幾乎用不到,是以這裡不介紹了

屬性

消息持久化

消息預設是持久化的,如果想修改為非持久化,可以在發送端建立一個Config,在建立Exchange時指定durable屬性為false,即該交換機上所有隊列為非持久化隊列;也可以在建立Queue時指定durable屬性為false,即該隊列為非持久化隊列而不影響同一交換機上其他隊列。

如下代碼,發送端将特定隊列 durable 指定為 false

Configuration中指定 NotDurableDirectQueue 隊列 durable 屬性為 false ;指定 DurableDirectQueue 隊列 durable 屬性為 true 并分别綁定到 TestDurableExchange 交換機

@Configuration
public class DirectRabbitConfig {
//-----------------------持久化測試----------------------------
//隊列 起名:NotDurableDirectQueue
@Bean
public Queue NotDurableDirectQueue() {
// durable:是否持久化,預設是false,持久化隊列:會被存儲在磁盤上,當消息代理重新開機時仍然存在,暫存隊列:目前連接配接有效
// exclusive:預設也是false,隻能被目前建立的連接配接使用,而且當連接配接關閉後隊列即被删除。此參考優先級高于durable
// autoDelete:是否自動删除,當沒有生産者或者消費者使用此隊列,該隊列會自動删除。
// return new Queue("NotDurableDirectQueue",true,true,false);
//一般設定一下隊列的持久化就好,其餘兩個就是預設false
return new Queue("NotDurableDirectQueue",false); //交換機持久化,隊列不持久化
}
@Bean
public Queue DurableDirectQueue() {
//一般設定一下隊列的持久化就好,其餘兩個就是預設false
return new Queue("DurableDirectQueue", true); //隊列持久化
}
@Bean
DirectExchange testDurableExchange() {
return new DirectExchange("TestDurableExchange",true,false);
}
@Bean
Binding bindingNotDurableDirect() {
return BindingBuilder.bind(NotDurableDirectQueue()).to(testDurableExchange()).with("notDurableKey");
}
@Bean
Binding bindingDurableDirect() {
return BindingBuilder.bind(DurableDirectQueue()).to(testDurableExchange()).with("durableKey");
}
}           

發送到不同路由鍵

/**
* 向TestDirectExchange交換機,TestDirectRouting路由鍵發送非持久化消息
* @return
*/ 
@GetMapping("/sendNotDurableDirectMessage")
public String sendNotDurableDirectMessage() {
String messageData = "not durable message.";
Map<String, Object> map = buildMsg(messageData);
//将消息攜帶綁定鍵值:notDurableKey 發送到交換機TestDirectExchange
rabbitTemplate.convertAndSend("TestDurableExchange", "notDurableKey", map);
return "ok";
}
/**
* 向TestDirectExchange交換機發送持久化消息
* @param speaker
* @return
*/
@GetMapping("/sendDurableDirectMessage")
public String sendDurableDirectMessage() {
String messageData = "durable message.";
Map<String, Object> map = buildMsg(messageData);
//将消息攜帶綁定鍵值:durableKey 發送到交換機TestDirectExchange
rabbitTemplate.convertAndSend("TestDurableExchange", "durableKey", map);
return "ok";
}           

消費端綁定不同隊列

@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "NotDurableDirectQueue", durable = "false"),key = {"notDurableKey"},exchange = @Exchange(name = "TestDurableExchange",type = "direct"))})
public void bindReceiveNotDurableQueue(Map message){
System.out.println("01,key: notDurableKey,接收到的direct消息: "+message);
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue("DurableDirectQueue"),key = {"durableKey"},exchange = @Exchange(name = "TestDurableExchange",type = "direct"))})
public void bindReceiveDurableQueue(Map message){
System.out.println("02,key: durableKey,接收到的direct消息: "+message);
}           

測試流程如下:

  • 啟動rabbitmq、發送端;保持消費端 關閉
  • 執行發送端兩個發送方法(發送消息并建立 TestDurableExchange 交換機、 NotDurableDirectQueue 隊列、DurableDirectQueue 隊列)。可以在管理界面看到兩個隊列分别有1條消息
  • 重新開機rabbitmq 每個節點都執行 systemctl restart rabbitmq-server.service
  • 重新開機rabbitmq後,再到管理界面看,非持久化隊列已銷毀
  • 啟動消費端,可以看到啟動時隻列印02,key: durableKey 不會列印 01,key: notDurableKey,因為隻有 DurableDirectQueue 隊列消息持久化了

注意:發送端和消費端的 duralble 要都顯示設定為 false,因為不設定預設為 true,若發送端、消費端設定不一緻系統将報錯。

削峰

當大量的客戶通路請求到後端,去通路資料庫等,瞬間會爆炸的。

經過前端或者其他的方案進行限流外,還是有大量的請求,這個時候需要削峰了,讓請求平穩的順序執行。

demo

消費端:

  • 設定消費者數量
spring:  
  #配置rabbitMq 伺服器  
  rabbitmq:  
    #本地叢集  
    addresses: 192.168.56.101:5672,192.168.56.102:5672  
    username: admin  
    password: admin  
    listener:  
      type: simple  
      simple:  
        prefetch: 1 # 消費者每次從隊列擷取的消息數量  
        concurrency: 1 # 消費者數量  
        max-concurrency: 1 # 啟動消費者最大數量  
           
  • 模拟消息處理
/**
* 削峰
* @param message
*/
@RabbitListener(bindings = {@QueueBinding(value = @Queue("peak-clipper-queue"),
key = {"peak-clipper-key"},exchange = @Exchange(name = "peak-clipper-exchange",type = "direct"))})
public void bindReceiverPeakClipper(String message){
try {
Thread.sleep(2000L); // 模拟處理需要2秒
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("peak-clipper-key,收到消息: {}", message);
}           

發送端:

循環發送一批消息,模拟瞬間大量請求

@GetMapping("/sendPeakClipperMessage")
public String sendPeakClipperMessage() {
for (int i=0;i<10;i++){
String msg = "message-"+i;
log.info("生産消息:{}", msg);
rabbitTemplate.convertAndSend("peak-clipper-exchange", "peak-clipper-key", msg);
}
return "ok";
}           

通路頁面,執行消息發送。

此時消息會全部放到列隊,但是會一條一條消費。簡單的實作了削峰處理。

rabbitmq叢集削峰解耦
rabbitmq叢集削峰解耦

調整消費者數量

listener:   
  type: simple   
  simple:   
    prefetch: 1 # 消費者每次從隊列擷取的消息數量   
    concurrency: 2 # 消費者數量   
    max-concurrency: 10 # 啟動消費者最大數量             
rabbitmq叢集削峰解耦

此時就會有兩個消費者同時去消費隊列中的消息。是以這個消費者數量需要根據實際的情況去設定所能承受的一個值,也就是峰值。

參考文獻

  1. RabbitMQ如何削峰限流
  2. RabbitMQ叢集搭建
  3. centos7下安裝rabbitmq
  4. RabbitMQ的基本原理
  5. SpringBoot連接配接RabbitMQ叢集
  6. 解決:搭建RabbitMQ叢集時,出現警告:Error:mnesia_not_running