初識RabbitMq
應用場景
郵箱發送:使用者注冊後投遞消息到rabbitmq中,由消息的消費方異步的發送郵件,提升系統響應速度
流量削峰:一般在秒殺活動中應用廣泛,秒殺會因為流量過大,導緻應用挂掉,為了解決這個問題,一般在應用前端加入消息隊列。用于控制活動人數,将超過此一定閥值的訂單直接丢棄。緩解短時間的高流量壓垮應用。
訂單逾時:利用rabbitmq的延遲隊列,可以很簡單的實作訂單逾時的功能,比如使用者在下單後30分鐘未支付取消訂單。.
各中間件比較
.各種消息中間件性能的比較:
TPS比較 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。
持久化消息比較—zeroMq不支援,activeMq和rabbitMq都支援。持久化消息主要是指:MQ down或者MQ所在的伺服器down了,消息不會丢失的機制。
可靠性、靈活的路由、叢集、事務、高可用的隊列、消息排序、問題追蹤、可視化管理工具、插件系統、社群—RabbitMq最好,ActiveMq次之,ZeroMq最差。
高并發—從實作語言來看,RabbitMQ最高,原因是它的實作語言是天生具備高并發高可用的erlang語言。
綜上所述:RabbitMQ的性能相對來說更好更全面,是消息中間件的首選。
主要元件說明:
Broker:它提供一種傳輸服務,它的角色就是維護一條從生産者到消費者的路線,保證資料能按照指定的方式進行傳輸,
Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。
Queue:消息的載體,每個消息都會被投到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來.
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛拟主機,一個broker裡可以有多個vhost,用作不同使用者的權限分離。
Producer:消息生産者,就是投遞消息的程式.
Consumer:消息消費者,就是接受消息的程式.
Channel:消息通道,在用戶端的每個連接配接裡,可建立多個channel.
windows 安裝
安裝Erlang
是以在安裝rabbitMQ之前,需要先安裝Erlang 。
配置好環境變量
在這裡插入圖檔描述
指令行輸入指令:erl ,驗證是否安裝成功
下載下傳rabbitMq
下載下傳運作rabbitmq-server-3.6.5 ,需要其他版本或者32位系統的,可以去官網下載下傳。
依舊可以不改變預設進行安裝。
需要注意:預設安裝的RabbitMQ 監聽端口是5672
然後是配置環境變量。
激活 RabbitMQ’s Management Plugin
使用RabbitMQ 管理插件,可以更好的可視化方式檢視Rabbit MQ 伺服器執行個體的狀态。
打開指令視窗:
輸入指令:
“D:\JavaApp\RabbitMq\RabbitMQ Server\rabbitmq_server-3.7.13\sbin\rabbitmq-plugins.bat” enable rabbitmq_management
啟動命名:
net stop RabbitMQ;
net start RabbitMQ
常用指令
rabbitmqctl.bat list_users 檢視使用者和權限
add_user xudong 123456 增加使用者
rabbitmqctl.bat set_user_tags xudong administrator 設定使用者權限
代碼實作
引入pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>
配置rabbitConfig類
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by Administrator on 2019/3/22.
* 定義隊列queue
*/
@Configuration
public class RabbitConfig {
public static final String QUEUE_A = "QueueA";
public static final String QUEUE_B = "QueueB";
public static final String QUEUE_C = "QueueC";
public static final String EXCHANGE_A = "exchangeA";
public static final String EXCHANGE_B = "exchangeB";
public static final String EXCHANGE_C = "exchangeC";
public static final String ROUTINGKEY_A = "Routingkey_A";
public static final String ROUTINGKEY_B = "Routingkey_B";
public static final String ROUTINGKEY_C = "Routingkey_C";
/**
* 擷取交換機: EXCHANGE_A
* 針對消費者配置交換機
* 1. 設定交換機類型
* 2. 将隊列綁定到交換機
* FanoutExchange: 将消息分發到所有的綁定隊列,無routingkey的概念
* HeadersExchange :通過添加屬性key-value比對
* DirectExchange:按照routingkey分發到指定隊列
* TopicExchange:多關鍵字比對
*/
@Bean
public DirectExchange getExchangeA() {
return new DirectExchange(EXCHANGE_A);
}
/**
* 擷取隊列 :QUEUE_A
* @return
*/
@Bean
public Queue queueA() {
return new Queue(QUEUE_A, true); //第一個參數隊列名稱,第二個隊列持久化,遇到當機不會丢失資料
}
/**
* binding:将交換機和隊列通過路由關鍵字綁定,不綁定也可以,用預設的交換機
* @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queueC()).to(getExchangeC()).with(RabbitConfig.ROUTINGKEY_C);
}
/**
* 擷取隊列 :QUEUE_B
* @return
*/
@Bean
public Queue queueB() {
return new Queue(QUEUE_B, true); //第一個參數隊列名稱,第二個隊列持久化,遇到當機不會丢失資料
}
/**
* 擷取隊列 :QUEUE_C
* @return
*/
@Bean
public Queue queueC() {
return new Queue(QUEUE_C, true); //第一個參數隊列名稱,第二個隊列持久化,遇到當機不會丢失資料
}
@Bean
public DirectExchange getExchangeC() {
return new DirectExchange(EXCHANGE_C);
}
}
application.properties
##rabbitMq
spring.rabbitmq.username=xudong
spring.rabbitmq.password=123456
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
# 手動ACK 不開啟自動ACK模式,目的是防止報錯後未正确處理消息丢失 預設 為 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消費者
package com.example.listener;
import com.example.config.RabbitConfig;
import com.example.model.Student;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* Created by Administrator on 2019/3/22.
*/
@Component
public class BookHandler {
private static final Logger log = LoggerFactory.getLogger(BookHandler.class);
/**
* <p>TODO 該方案是 spring-boot-data-amqp 預設的方式,不太推薦。具體推薦使用 listenerManualAck()</p>
* 預設情況下,如果沒有配置手動ACK, 那麼Spring Data AMQP 會在消息消費完畢後自動幫我們去ACK
* 存在問題:如果報錯了,消息不會丢失,但是會無限循環消費,一直報錯,如果開啟了錯誤日志很容易就吧磁盤空間耗完
* 解決方案:手動ACK,或者try-catch 然後在 catch 裡面講錯誤的消息轉移到其它的系列中去
* spring.rabbitmq.listener.simple.acknowledge-mode=manual
* <p>
*
* @param stu 監聽的内容
*/
@RabbitListener(queues = {RabbitConfig.QUEUE_B})
public void listenerAutoAck(Student stu, Message message, Channel channel) {
// TODO 如果手動ACK,消息會被監聽消費,但是消息在隊列中依舊存在,如果 未配置 acknowledge-mode 預設是會在消費完畢後自動ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("[listenerAutoAck 監聽的消息] - [{}]", stu.toString());
// TODO 通知 MQ 消息已被成功消費,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// TODO 處理失敗,重新壓入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
@RabbitListener(queues = {RabbitConfig.QUEUE_C})
public void listenerManualAck(Student stu, Message message, Channel channel) {
log.info("[listenerManualAck 監聽的消息] - [{}]", stu.toString());
try {
// TODO 通知 MQ 消息已被成功消費,可以ACK了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
// TODO 如果報錯了,那麼我們可以進行容錯處理,比如轉移目前消息進入其它隊列
try {
// TODO 處理失敗,重新壓入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
controller端添加生産消息
package com.example.controller;
import com.example.config.RabbitConfig;
import com.example.listener.BookHandler;
import com.example.model.Student;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* Created by Administrator on 2019/3/22.
*/
@Controller
@RequestMapping("/rabbigMq")
@Api(value="StudentRabbitMqController",description = "rabbitMq測試學生")
public class StudentRabbitMqController {
private final RabbitTemplate rabbitTemplate;
private static final Logger log = LoggerFactory.getLogger(StudentRabbitMqController.class);
@Autowired
public StudentRabbitMqController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book); 對應 {@link BookHandler#listenerAutoAck}
* this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book); 對應 {@link BookHandler#listenerManualAck}
*/
@ApiOperation(value = "rabbitmq消息隊列")
@RequestMapping(value = "/stu",method = {RequestMethod.GET})
@ResponseBody
public String proMsg() {
Student stu = new Student();
stu.setId((short)1);
stu.setName("學習mq的旭東");
log.info("生産者生産消息:"+stu);
this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_B, stu);
this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_C, stu);
return "success";
}
}
啟動springboot,用swagger調用