由于JMS存在跨語言跨平台的缺陷,是以出現了AMQP(Advanced Message Queuing Protocol),一個提供統一消息服務的應用層标準進階消息隊列協定,代表RabbitMQ
一、下載下傳安裝RabbitMQ
-
安裝
從https://www.rabbitmq.com/download.html下載下傳RabbitMQ
由于RabbitMQ是用Erlang語言編寫的,安裝RabbitMQ需要先安裝Erlang運作平台
http://www.erlang.org/downloads
配置環境變量:
ERLANG_HOME:D:\software\erl10.4 (安裝後可能已自動配置)
path加入%ERLANG_HOME%\bin\erl.exe
重新開機計算機
-
啟動RabbitMQ
cmd視窗需要管理者權限運作,否則啟動時報錯(發生系統錯誤 5)
也可以在服務清單中啟動spring-boot學習:二十、spring-boot內建RabbitMQ -
安裝管理界面management ui
參考:https://www.rabbitmq.com/management.html
營運指令:rabbitmq-plugins enable rabbitmq_management
- 通路http://127.0.0.1:15672/
spring-boot學習:二十、spring-boot內建RabbitMQ -
配置使用者權限
1) 通過指令rabbitmqctl.bat list_users檢視使用者
spring-boot學習:二十、spring-boot內建RabbitMQ 2)添加使用者
rabbitmqctl.bat add_user root root
spring-boot學習:二十、spring-boot內建RabbitMQ 3)設定角色
rabbitmqctl.bat set_user_tags root administrator
4)設定權限
rabbitmqctl.bat set_permissions -p / root “." ".” “.*”
5)使用賬号登入
6)删除使用者
rabbitmqctl delete_user username
7) 修改改密碼
rabbimqctl change_password username newpassword
二、RabbitMQ詳解
參考:
https://www.rabbitmq.com/getstarted.html
https://www.cnblogs.com/dongkuo/p/6001791.html 依托于官方文檔詳細解釋
https://www.jianshu.com/p/80eefec808e5
0. 概念
消息隊列服務一般由生産者、消息隊列和消費者組成,RabbitMQ加入了交換機 (Exchange)的概念,這樣生産者和隊列就沒有直接聯系, 轉而變成生産者把消息給交換器, 交換器根據排程政策再把消息再給隊列。
1)Server(Broker):接收用戶端連接配接,實作AMQP協定的消息隊列和路由功能的程序;
2)Virtual Host:虛拟主機的概念,類似權限控制組,一個Virtual Host裡可以有多個Exchange和Queue;
3)Exchange:交換機,接收生産者發送的消息,并根據Routing Key将消息路由到伺服器中的隊列Queue。
4)ExchangeType:交換機類型決定了路由消息行為,RabbitMQ中有四種類型Exchange,分别是fanout、direct、topic、headers
5)Message Queue:消息隊列,用于存儲還未被消費者消費的消息;
6)Message:由Header和body組成,Header是由生産者添加的各種屬性的集合,包括Message是否被持久化、優先級是多少、由哪個Message Queue接收等;body是真正需要發送的資料内容;
7)BindingKey:綁定關鍵字,将一個特定的Exchange和一個特定的Queue綁定起來。
1. 最簡單的消息發送Hello World
功能:一個生産者P發送消息到隊列Q,一個消費者C接收
生産者:
通過連接配接工廠ConnectionFactory建立連接配接connection,使用連接配接建立通道channel, channel聲明隊列queue,channel使用queue發送消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel());
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
消費者:跟生産者一樣,建立連接配接,建立channel,聲明queue,然後監聽queue
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
2. 工作隊列work queues
功能:一個生産者(Boss)向隊列發消息(任務),多個消費者(worker)從隊列接受消息(任務)
特點:
1)一條消息隻會被一個消費者接收;
2)消息是平均配置設定給消費者的;
3)消費者隻有在處理完某條消息後,才會收到下一條消息。
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.
預設情況下,RabbitMQ将按順序向消費者發送消息,平均每個消費者收到相同數量的消息,這種分發消息叫做輪詢。可以通過三個以上的worker(消費者)來進行測試。
由于worker執行任務需要一定時間,為了確定消息不丢失,RabbitMQ 支援
1)消息确認機制
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);//可使用sleep模拟
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
2)消息持久化
// 将第二個參數設為true,表示聲明一個需要持久化的隊列。
// 需要注意的是,若你已經定義了一個非持久的,同名字的隊列,要麼将其先删除(不然會報錯),要麼換一個名字。
channel.queueDeclare("hello", true, false, false, null);
// 修改了第三個參數,這是表明消息需要持久化
channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
3. 釋出/訂閱Publish/Subscribe
生産者是把消息發送到了交換機(exchange)中,然後交換機負責(決定)将消息發送到(哪一個)消息隊列中。
前面的兩個案例其實是經過了預設交換機(Default Exchange,用空字元串表示),每一個被建立的隊列都會被自動的綁定到預設交換機上,并且路由鍵就是隊列的名字。路由鍵用來指定交換機将消息發到指定的隊列。
交換機有4種不同的類型,分别是direct,fanout,topic,headers:
direct:要求和它綁定的隊列帶有一個路由鍵K,若有一個帶有路由鍵R的消息到達了交換機,交換機會将此消息路由到路由鍵K = R的隊列。預設交換機便是該類型。
fanout:會路由每一條消息到所有和它綁定的隊列,忽略路由鍵。即廣播形式。
4. Routing模式
當有些消息隻能部分消費者消費時,可使用Routing模式;隊列綁定交換機,同時帶上routing key,以多次調用隊列綁定方法,調用時,隊列名和交換機名都相同,而routing key不同,這樣可以使一個隊列帶有多個routing key。
5. Topic
通配符比對消息發送隊列,routing key由多個關鍵詞組成,詞與詞之間由點号(.)隔開,規定*表示任意的一個詞,#号表示任意的0個或多個詞。
6. RPC
參考文檔
三、springboot rabbitmq詳解
參考:
https://docs.spring.io/spring-amqp/docs/2.1.6.RELEASE/reference/html/#sending-messages
https://www.cnblogs.com/ityouknow/p/6120544.html
1.pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123
3. 注冊Queue,RabbitConfig.java
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}
4. 生産者,RabbitMQProducer.java
@Component
public class RabbitMQProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMsg(String routingKey, String msg) {
amqpTemplate.convertAndSend(routingKey, msg);
}
}
5. 測試
@Component
@OpenAPI
public class ApiTest {
@Autowired
private RabbitMQProducer rabbitMQProducer;
@OpenAPIMethod(methodName = "testRabbitMQSender")
public Object testRabbitMQSender(final String msg) throws Exception {
rabbitMQProducer.sendMsg("hello",msg);
return null;
}
}
發送消息:http://10.0.0.57:9001/api/testRabbitMQSender?msg=Hello%20World
檢視控制台,建立了一個叫hello的queue,積壓了1條消息
消息内容:
6. 消費者
@Component
public class RabbitMQConsumer {
@RabbitListener(queues = "hello")
public void consumerHelloQueueMessage(String message){
System.out.println("收到hello-queue封包:"+message);
}
}
啟動消費者後,會立即收到積壓的消息
收到hello-queue封包:Hello World
7. 測試多個消費者監聽同一個通道,如1個hello queue,3個消費者
發送9條消息,列印日志日下:
收到hello-queue封包:Hello World
收到hello-queue3封包:Hello World
收到hello-queue2封包:Hello World
收到hello-queue封包:Hello World
收到hello-queue封包:Hello World
收到hello-queue2封包:Hello World
收到hello-queue3封包:Hello World
收到hello-queue3封包:Hello World
收到hello-queue2封包:Hello World
結果表示RabbitMQ會輪詢發送消息給消費者,一條消息隻能被一個消費者接收
8. Direct Exchange
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
@Bean
public DirectExchange helloExchange() {
return new DirectExchange("helloExchange");
}
@Bean
public Binding helloBinding() {
return BindingBuilder.bind(helloQueue()).to(helloExchange()).with("helloDirect");
}
}
注冊了一個叫helloExchange的Direct Exchange
檢視helloExchange 詳情:direct類型,綁定了hello queue
檢視hello queue,除了綁定預設exchange還綁定了helloExchange
-
測試demo1
一個生産者,一個helloExchage,一個叫hello的queue,一個路由helloDirect,四個消費者(三個監聽叫hello的queue,一個監聽路由helloDirect對應hello queue且綁定了helloExchage交換器)
發送消息:
消費消息:public void sendMsg(String exchange, String routingKey, String msg) { amqpTemplate.convertAndSend(exchange, routingKey, msg); } public Object testRabbitMQSender(final String msg) throws Exception { for(int i=0; i<9; i++) { rabbitMQProducer.sendMsg("helloExchange", "helloDirect",msg); } return null; }
測試結果:四個消費者均勻收到消息@RabbitListener(queues = "hello") public void consumerHelloQueueMessage(String message){ System.out.println("收到hello-queue封包:"+message); } @RabbitListener(queues = "hello") public void consumerHelloQueueMessage2(String message){ System.out.println("收到hello-queue2封包:"+message); } @RabbitListener(queues = "hello") public void consumerHelloQueueMessage3(String message){ System.out.println("收到hello-queue3封包:"+message); } // 當RabbitMQ中不存在綁定關系時,自動生成相應的綁定 // 如不存在helloExchange、hello queue或helloDirect任意一個,都會自動生成并綁定關系 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "hello", durable = "true"), exchange = @Exchange(value = "helloExchange", durable = "true"), key = "helloDirect" ) ) public void consumerQueueMessage(String message){ System.out.println("收到helloExchage->hello queue->helloDirect route封包:"+message); }
收到hello-queue2封包:Hello World 收到hello-queue封包:Hello World 收到hello-queue3封包:Hello World 收到helloExchage->hello queue->helloDirect route封包:Hello World 收到hello-queue2封包:Hello World 收到hello-queue封包:Hello World 收到hello-queue3封包:Hello World 收到hello-queue封包:Hello World 收到helloExchage->hello queue->helloDirect route封包:Hello World
結果說明:
關聯示意圖如下
當P通過helloExchange發送消息,經過路由helloDirect到達hello queue,然後均勻下發給所監聽的C,是以四個消費者都收到了消息spring-boot學習:二十、spring-boot內建RabbitMQ -
測試demo2
在測試demo1的基礎上增加一個hello2 queue, 并與helloExchange綁定,路由key為helloDirect2,一個消費者監聽,示意圖如下:
當P通過helloExchange發送消息,經過路由helloDirect2到達hello2 queue,監聽的消費者收到消息spring-boot學習:二十、spring-boot內建RabbitMQ
9. Fanout Exchange
以廣播模式,給 Fanout 交換機發送消息,綁定了這個交換機的所有隊列都收到這個消息。
// 定義exchange、queue、binding
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout1");
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout2");
}
@Bean
public Queue fanoutQueue3() {
return new Queue("fanout3");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding fanoutBinding() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding3() {
return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}
// 發送消息
public Object testRabbitMQSender(final String msg) throws Exception {
rabbitMQProducer.sendMsg("fanoutExchange", "",msg);
return null;
}
// 消費消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout1", durable = "true"),
exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
)
)
public void consumerFanoutMessage1(String message){
System.out.println("收到fanoutExchange->fanout封包1:"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout2", durable = "true"),
exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
)
)
public void consumerFanoutMessage2(String message){
System.out.println("收到fanoutExchange->fanout封包2:"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout3", durable = "true"),
exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
)
)
public void consumerFanoutMessage3(String message){
System.out.println("收到fanoutExchange->fanout封包3:"+message);
}
運作結果:
收到fanoutExchange->fanout封包1:Hello World fanout
收到fanoutExchange->fanout封包3:Hello World fanout
收到fanoutExchange->fanout封包2:Hello World fanout
10. Topic Exchange
使用通配符(*和#)路由
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
// 定義exchange、queue、binding
@Bean
public Queue topicQueue1() {
return new Queue("topic.q1");
}
@Bean
public Queue topicQueue2() {
return new Queue("topic.q2");
}
@Bean
public Queue topicQueue3() {
return new Queue("topic.q3");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.q1");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
}
@Bean
public Binding topicBinding3() {
return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("topic.*");
}
// 消費者
@RabbitListener(queues = "topic.q1")
public void consumerTopicMessage(String message){
System.out.println("收到topic.q1封包:"+message);
}
@RabbitListener(queues = "topic.q2")
public void consumerTopicMessage2(String message){
System.out.println("收到topic.q2封包:"+message);
}
@RabbitListener(queues = "topic.q3")
public void consumerTopicMessage3(String message){
System.out.println("收到topic.q3封包:"+message);
}
// 發送消息
1)rabbitMQProducer.sendMsg("topicExchange", "topic.q1",msg);
列印日志:(滿足topic.q1、topic.#、topic.*的監聽)
收到topic.q1封包:Hello World topic
收到topic.q2封包:Hello World topic
收到topic.q3封包:Hello World topic
2)rabbitMQProducer.sendMsg("topicExchange", "topic.q2",msg);
列印日志:(滿足topic.#、topic.*的監聽)
收到topic.q2封包:Hello World topic
收到topic.q3封包:Hello World topic
3)rabbitMQProducer.sendMsg("topicExchange", "topic.q2.x",msg);
列印日志:(滿足topic.#的監聽)
收到topic.q2封包:Hello World topic
常見異常
1. 未定義queue,直接監聽queue報錯:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[hello2]
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'hello2' in vhost '/', class-id=50, method-id=10)
解決方法:
1) 控制台手動建立queue
2)配置queue
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}