1.Message Queue(MQ)
消息隊列Message Queue
是一種跨程序的通信機制,用于在系統之間進行傳遞消息。
MQ作為消息中間件,可以進行異步處理請求,進而減少請求響應時間和解耦。
2.消息隊列的使用場景
-
應用解耦:
多應用間通過消息隊列對同一消息進行處理,避免調用接口失敗導緻整個過程失敗;
系統間消息傳遞通過MQ進行解耦,“消息隊列”是在消息的傳輸過程中儲存消息的容器
RabbitMq簡介及使用 -
異步處理:
應用間并發處理消息,相比串行處理,減少處理時間;
比如使用者為了使用某個應用,進行注冊,系統需要發送注冊郵件,驗證短信
RabbitMq簡介及使用 -
限流削峰:
廣泛應用于秒殺或搶購活動中,避免流量過大導緻處理請求阻塞的情況
RabbitMq簡介及使用
3.RabbitMQ簡介
RabbitMQ是支援多種消息協定,易于部署和使用的開源消息代理伺服器,用于在分布式系統中存儲轉發消息
由以高性能、健壯以及可伸縮性出名的Erlang語言編寫;提供了成熟的高并發,高可用的解決方案
- 可以根據實際業務情況動态地擴充叢集節點
- 在叢集中的機器上設定鏡像,使得在部分節點出現問題的情況下仍然可用
- 支援多種用戶端語言,如:Python、Ruby、.NET、Java等
- RabbitMQ提供了一個易用的使用者界面,使得使用者可以監控和管理消息、叢集中的節點等。
4.Window環境下安裝與使用RabbitMQ
-
安裝Erlang程式運作環境
下載下傳位址http://www.erlang.org/downloads
-
安裝RabbitMQ伺服器
下載下傳位址http://www.rabbitmq.com/
啟動RabbitMQ服務
-
激活RabbitMQ管理控制台
cd sbin
rabbitmq-plugins.bat enable rabbitmq_management
-
通過浏覽器進行通路
http://localhost:15672/
使用者名:guest 密碼:guest
5.Linux環境下安裝
-
下載下傳Erlang運作環境RPM包
https://www.erlang-solutions.com/resources/download.html
-
下載下傳RabbitMQ伺服器安裝包
http://www.rabbitmq.com/install-rpm.html#downloads
-
安裝rpm包
rpm -ivh --nodeps esl-erlang_21.2.6-1_centos_7_amd64.rpm
rpm -ivh --nodeps rabbitmq-server-3.7.13-1.el7.noarch.rpm
-
啟用服務
rabbitmq-server
-
啟用控制台
rabbitmq-plugins enable rabbitmq_management
6.RabbitMQ相關概念
-
Producer
生産者,即消息的提供者
-
Consumer
消費者,即消息的使用者
-
Message
消息,即程序之間進行通信的資料
-
Queue
隊列,即消息存放的容器,消息以先進先出的方式進行存儲
-
Vhost
虛拟主機,用于存儲消息隊列
7. Java 用戶端通路RabbitMQ
- maven工程的pom檔案中添加依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
- 建立消息生産者
- 建立消息消費者
8.RabbitMQ消息的狀态
-
Ready
消息已經被送到隊列,等待被消費
-
Unacked
消息已經被消費者認領,但是還沒有被确認“已經被消費”
-
Unacked狀态下消費者斷開連接配接則消息回到“Ready”
沒有确認,但是用戶端也沒有斷開連接配接則一直處于“Unacked”
-
Finished
調用basicAsk()方法後,表示消息已經被消費,從隊列中移除
9. RabbitMQ的工作模式
-
Simple簡單模式
一個生産者,一個消費者
RabbitMq簡介及使用 - Work工作模式
一個生産者,多個消費者,每個消費者擷取到的消息唯一
在多個消息的情況下,WorkQueue會将消息分派給不同的消費者,每個消費者會接收到不同
的消息,并且可以根據消息的處理速度來接受不同數量的消息,進而讓消費者程式發揮最大
的性能
适合在叢集的環境中做異步處理,最大發揮每一台伺服器的性能
- publish/subscribe釋出訂閱模式:
一個生産者發送的消息會被多個消費者擷取
釋出訂閱模式中生産者不在直接與隊列進行綁定,而是将資料發送給交換機(Exchan{ge)
交換機Exchage負責将資料按照某種規則送入與之綁定的隊列,進而供消費者使用
該模式下交換機的類型為扇形交換中心(Fanout exchange)
- Routing 路由模式
釋出訂閱模式是交換機無條件将所有消息分發給綁定的隊列;而路由模式則是根據routing
key有條件的将資料篩選後分發給綁定的隊列。
發送消息到交換機時要指定路由key ,消費者将隊列綁定到交換機時也需要指定路由key
路由模式下交換機的類型是直接交換中心 (Direct exchange)
- Topic主題模式
主題模式是在Routing模式比對基礎上,提供了對RouteKey模糊比對的功能,可以簡化編
程,主題模式下交換機的類型為主題交換中心(Topic exchange)
主題模式下,模糊比對表達式規則為
*比對單個關鍵字
#比對所有關鍵字
10.RabbitMQ消息确認機制
- RabbitMQ提供了監聽器來接收消息投遞的狀态
- 消息确認涉及兩種狀态:
Confirm代表生産者将消息送達了Broker時産生的狀态,後續會出現兩種情況:
ack代表Broker已經将資料接收
nack代表Broker未接收到消息
Return代表消息被正常接收,但Broker沒有對應的隊列進行投遞,消息被退回給生産者的狀
态
上述狀态表示生産者與Broker之間的消息投遞情況,與消費者是否接收/确認消息無關
11.SpringBoot整合RabbitMQ
11.1生産者端
- 導入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置檔案
spring:
rabbitmq:
username: root
password: root
host: 192.168.8.105
port: 5672
virtual-host: /myvhost
publisher-returns: true #開啟消息傳回機制
publisher-confirm-type: correlated #消息确認
template:
mandatory: true #将消息退回給生産者
- 生産者端,配置MQ
@Configuration
public class RabbitMQConfig {
//開啟确認、傳回
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("消息的ID:"+correlationData.getId());
if(!b){
System.out.println("消息拒收的原因:"+s);
}
}
};
RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("==================");
System.out.println("傳回的錯誤編碼:"+i); System.out.println("傳回的錯誤描述:"+s);
System.out.println("交換機:"+s1);
System.out.println("路由key:"+s2);
System.out.println("消息主體:"+new String(message.getBody()));
System.out.println("==================");
}
};
//配置交換機
@Bean
public TopicExchange orderExchange(){
return new TopicExchange("order_exchange");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory){
RabbitTemplate template = new RabbitTemplate(factory);
template.setMessageConverter(messageConverter());
template.setMandatory(true);//強制退回給生成者
template.setConfirmCallback(confirmCallback);
template.setReturnCallback(returnCallback);
return template;
}
}
- 注入RabbitTemplate
@Component
public class OrderProducer {
@Autowired
private RabbitTemplate template;
public void sendOrder(String exchange, String routingKey, OrderDTO orderDTO){
CorrelationData data = new CorrelationData(orderDTO.getOrderSn());
//參數data:是發送消息的附加資料,用來自定以消息的ID
template.convertAndSend(exchange,routingKey,orderDTO,data); }
11.2消費者端
- 添加依賴
- 配置檔案
spring:
rabbitmq:
username: root
password: root
virtual-host: /myvhost
port: 5672
host: 192.168.8.105 l
istener:
simple:
acknowledge-mode: manual #手動簽收
- 配置MQ
@Configuration
public class RabbitMQConfig implements RabbitListenerConfigurer {
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
@Bean
public MessageHandlerMethodFactory messageHandlerMethodFactory(){
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(mappingJackson2MessageConverter());
return factory;
}
//消息反序列化轉換器
@Bean
public MappingJackson2MessageConverter mappingJackson2MessageConverter(){
return new MappingJackson2MessageConverter();
}
}
- 綁定隊列到交換機,監聽隊列處理消息
@Component
public class OrderConsumer {
@Bean
public Queue orderQueue(){
return new Queue("order_queue",true,false,false);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("order_exchange");
}//綁定對列到交換機
@Bean
public Binding binding(){
return BindingBuilder.bind(orderQueue()).to(topicExchange()).with("order.#");
}
@RabbitListener(queues = "order_queue")
@RabbitHandler
public void processOrder(@Payload OrderDTO orderDTO, Channel channel,@Headers Map header){
System.out.println("訂單消費者處理訂單消息:"+orderDTO.getOrderSn()+"-- ->"+orderDTO.getCreateTime());
Long tag = (Long)header.get(AmqpHeaders.DELIVERY_TAG);
try {
channel.basicAck(tag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}