一、RabbitMQ概述
1.1 概念
RabbitMQ是一個開源的消息代理和隊列伺服器,用來通過普通協定在不同的應用之間共享資料(跨平台跨語言)。RabbitMQ是使用Erlang語言編寫,并且基于AMQP協定實作。
1.2 優勢
**可靠性(Reliablity):**使用了一些機制來保證可靠性,比如持久化、傳輸确認、釋出确認。
**靈活的路由(Flexible Routing):**在消息進入隊列之前,通過Exchange來路由消息。對于典型的路由功能,Rabbit已經提供了一些内置的Exchange來實作。針對更複雜的路由功能,可以将多個Exchange綁定在一起,也通過插件機制實作自己的Exchange。
**消息叢集(Clustering):**多個RabbitMQ伺服器可以組成一個叢集,形成一個邏輯Broker。
**高可用(Highly Avaliable Queues):**隊列可以在叢集中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
**多種協定(Multi-protocol):**支援多種消息隊列協定,如STOMP、MQTT等。
**多種語言用戶端(Many Clients):**幾乎支援所有常用語言,比如Java、.NET、Ruby等。
**管理界面(Management UI):**提供了易用的使用者界面,使得使用者可以監控和管理消息Broker的許多方面。
**跟蹤機制(Tracing):**如果消息異常,RabbitMQ提供了消息的跟蹤機制,使用者可以找出發生了什麼。
**插件機制(Plugin System):**提供了許多插件,來從多方面進行擴充,也可以編輯自己的插件。
二、RabbitMQ元件
**Broker:**辨別消息隊列伺服器實體.
**Virtual Host:**虛拟主機。辨別一批交換機、消息隊列和相關對象。虛拟主機是共享相同的身份認證和加密環境的獨立伺服器域。每個vhost本質上就是一個mini版的RabbitMQ伺服器,擁有自己的隊列、交換器、綁定和權限機制。vhost是AMQP概念的基礎,必須在連結時指定,RabbitMQ預設的vhost是 /。
**Exchange:**交換器,用來接收生産者發送的消息并将這些消息路由給伺服器中的隊列。
direct交換機:完全比對,才會路由過去,點對點通信模式。
fanout交換機:廣播類型
topic交換機:釋出-訂閱模式,消息交于所有訂閱的交換機。
**Queue:**消息隊列,用來儲存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裡面,等待消費者連接配接到這個隊列将其取走。
**Banding:**綁定,用于消息隊列和交換機之間的關聯。一個綁定就是基于路由鍵将交換機和消息隊列連接配接起來的路由規則,是以可以将交換器了解成一個由綁定構成的路由表。
**Channel:**信道,多路複用連接配接中的一條獨立的雙向資料流通道。信道是建立在真實的TCP連接配接内地虛拟連結,AMQP指令都是通過信道發出去的,不管是釋出消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于作業系統來說,建立和銷毀TCP都是非常昂貴的開銷,是以引入了信道的概念,以複用一條TCP連接配接。
**Connection:**網絡連接配接,比如一個TCP連接配接。
**Publisher:**消息的生産者,也是一個向交換器釋出消息的用戶端應用程式。
**Consumer:**消息的消費者,表示一個從一個消息隊列中取得消息的用戶端應用程式。
**Message:**消息,消息是不具名的,它是由消息頭和消息體組成。消息體是不透明的,而消息頭則是由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(優先級)、delivery-mode(消息可能需要持久性存儲[消息的路由模式])等。
三、安裝RabbitMQ
3.1 docker 安裝
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
4369 25672 【Erlang發現 叢集端口】
5672 5671 【AMQP端口】
15672 【web管理控制台端口】
61613 61614 【STOMP協定端口】
1883 8883 【mqtt協定端口】
開機自啟動
docker update rabbitmq --restart=always
通路:http://ip:15672
賬号密碼均為 guest
3.2 控制台體驗Direct Exchange交換機
1.系統預設的交換機
2.建立一個交換機
3.為交換機綁定隊列—建立隊列
4.綁定交換機與隊列
綁定交換機與隊列,添加好隊列的名字與 路由鍵的名字
5.發消息,需要現發消息給交換機,交換機才能路由給消息隊列
重新整理隊列,既可以看到指定的隊列中用準備好了一條消息,南無如何消費這條消息呢?
需要點選這條消息隊列中去,擷取該條消息。
3.3 控制台體驗fanout Exchange交換機
1.建立fanout exchange 交換機
這種交換機會發送至所有的隊列-----
2.為其綁定多個消息隊列…
建立幾個消息隊列,綁定
3.向其中一個消息隊列的發送消息,指定其中一個路由鍵【廣播,可以不設定路由鍵】。
4.檢視消息隊列,發現所有的隊列全部收到消息。
3.3 控制台體驗Topic Exchange交換機
1.建立主題交換機
2.綁定消息隊列,路由鍵如何設定???
xxx.# 代表xxx以後比對0個或者多個
*.xxx 代表比對以.xxx結尾的消息隊列消息路由鍵
3.測試【略】
四、Springboot整合RabbitMQ
4.1 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2 配置檔案
spring.rabbitmq.host=192.168.52.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
4.3 啟動類開啟注解
4.4 AmqpAdmin建立Exchange/Queue/Binding
@Autowired
AmqpAdmin amqpAdmin;
/*
* 建立交換機
*/
@Test
public void createExchange(){
// 名字,是否持久化,自動删除
DirectExchage directExchange = new DirectExchage("hello-java-exchage",true,false);
amqp.declareExchange(directExchange);
}
/*
* 建立隊列
*/
@Test
public void createQueue(){
//名字 ,持久化,排他,自動删除
Queue queue = new Queue("hello-java-queue",true,false,false);
amqp.declareQueue();
}
/*
* 綁定交換機與隊列
*/
@Test
public void createBinding(){
//目的地,目的地類型,交換機,路由鍵,參數
Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
amqp.declareBinding(binding);
}
4.5 RabbitTemplate發送消息
@Autowired
RabbitTemplate rabbitTemplate;
/*
* 發送消息
*/
@Test
public void sendMessage(){
// 交換機,路由鍵,消息内容【Object類型可以發送對象,但是java序列化了,對象必須序列化】
rabbitTemplate.converAndSend("hello-java-exchage","hello.java","消息體");
}
為了能夠用json類型的展示,需要配以一個序列化器放入IOC容器中
@Configuration
public class MyRabbitConfig{
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
4.6 @RabbitListener監聽接受消息
// 必須放到組建中 方法中的類
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message,實體類 xxx){
// 1.第一種接受消息 【麻煩】
byte[] body = message.getBody();
JSON.parse(body);
// 2.第二種接收消息 直接傳入實體類
System.out.println(message);
}