轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html
0.目錄
RabbitMQ-從基礎到實戰(2)— 防止消息丢失 RabbitMQ-從基礎到實戰(3)— 消息的交換(上) RabbitMQ-從基礎到實戰(4)— 消息的交換(中) RabbitMQ-從基礎到實戰(5)— 消息的交換(下) RabbitMQ-從基礎到實戰(6)— 與Spring內建1.簡介
本篇博文介紹了在windows平台下安裝RabbitMQ Server端,并用JAVA代碼實作收發消息
2.安裝RabbitMQ
- RabbitMQ是用Erlang開發的,是以需要先安裝Erlang環境,在 這裡 下載下傳對應系統的Erlang安裝包進行安裝
- 點選 下載下傳對應平台的RabbitMQ安裝包進行安裝
Windows平台安裝完成後如圖

3.啟用RabbitMQ Web控制台
RabbitMQ提供一個控制台,用于管理和監控RabbitMQ,預設是不啟動的,需要運作以下指令進行啟動
- 點選上圖的Rabbit Command Prompt,打開rabbitMQ控制台
- 在 官方介紹管理控制台的頁面 ,可以看到,輸入以下指令啟動背景控制插件
rabbitmq-plugins enable rabbitmq_management
- 登入背景頁面: http://localhost:15672/ 密碼和使用者名都是 guest ,界面如下
目前可以先不用理會此界面,後面使用到時會詳細介紹,也可以到
檢視官方文檔。
4.編寫MessageSender
Spring對RabbitMQ已經進行了封裝,正常使用中,會使用Spring內建,第一個項目中,我們先不考慮那麼多
在IDE中建立一個Maven項目,并在pom.xml中貼入如下依賴,RabbitMQ的最新版本依賴可以在
找到
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
等待Maven下載下傳完成後,就可以在Maven Dependencies中看到RabbitMQ的JAR
在這裡,我們發現,RabbitMQ的日志依賴了slf4j-api這個包,slf4j-api并不是一個日志實作,這樣子是打不出日志的,是以,我們給pom加上一個日志實作,這裡用了logback
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.1</version>
</dependency>
之後maven依賴如下,可以放心寫代碼了
建立一個MessageSender類,代碼如下
1 import java.io.IOException;
2 import java.util.concurrent.TimeoutException;
3
4 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory;
6
7 import com.rabbitmq.client.Channel;
8 import com.rabbitmq.client.Connection;
9 import com.rabbitmq.client.ConnectionFactory;
10
11 public class MessageSender {
12
13 private Logger logger = LoggerFactory.getLogger(MessageSender.class);
14
15 //聲明一個隊列名字
16 private final static String QUEUE_NAME = "hello";
17
18 public boolean sendMessage(String message){
19 //new一個RabbitMQ的連接配接工廠
20 ConnectionFactory factory = new ConnectionFactory();
21 //設定需要連接配接的RabbitMQ位址,這裡指向本機
22 factory.setHost("127.0.0.1");
23 Connection connection = null;
24 Channel channel = null;
25 try {
26 //嘗試擷取一個連接配接
27 connection = factory.newConnection();
28 //嘗試建立一個channel
29 channel = connection.createChannel();
30 //這裡的參數在後面詳解
31 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
32 //注意這裡調用了getBytes(),發送的其實是byte數組,接收方收到消息後,需要重新組裝成String
33 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
34 logger.info("Sent '" + message + "'");
35 //關閉channel和連接配接
36 channel.close();
37 connection.close();
38 } catch (IOException | TimeoutException e) {
39 //失敗後記錄日志,傳回false,代表發送失敗
40 logger.error("send message faild!",e);
41 return false;
42 }
43 return true;
44 }
45 }
然後在App類的main方法中調用sendMessage
1 public class App {
2 public static void main( String[] args ){
3 MessageSender sender = new MessageSender();
4 sender.sendMessage("hello RabbitMQ!");
5 }
6 }
列印日志如下
打開RabbitMQ的控制台,可以看到消息已經進到了RabbitMQ中
點進去,用控制台自帶的getMessage功能,可以看到消息已經成功由RabbitMQ管理了
至此,MessageSender已經寫好了,在該類的31和33行,我們分别調用了隊列聲明和消息發送
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
queueDeclare,有很多參數,我們可以看一下他的源碼,注釋上有詳細的解釋,我簡單翻譯了一下
1 /**
2 * Declare a queue 聲明一個隊列
3 * @see com.rabbitmq.client.AMQP.Queue.Declare
4 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
5 * @param queue the name of the queue隊列的名字
6 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,為true則在rabbitMQ重新開機後生存
7 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是否是排他性隊列(别人看不到),隻對目前連接配接有效,目前連接配接斷開後,隊列删除(設定了持久化也删除)
8 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自動删除,在最後一個連接配接斷開後删除隊列
9 * @param arguments other properties (construction arguments) for the queue 其他參數
10 * @return a declaration-confirm method to indicate the queue was successfully declared
11 * @throws java.io.IOException if an error is encountered
12 */
13 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
14 Map<String, Object> arguments) throws IOException;
前面4個都非常好了解,最後一個“其他參數”,到底是什麼其他參數,這個東西真的很難找,用到再解釋吧,官方文檔如下
- TTL Time To Live 存活時間
- Dead Lettering 遺言,當消息死亡時,做些什麼
- Length Limit 長度限制
- Priority Queues 優先級
basicPublish的翻譯如下
1 /**
2 * Publish a message.發送一條消息
3 *
4 * Publishing to a non-existent exchange will result in a channel-level
5 * protocol exception, which closes the channel.
6 *
7 * Invocations of <code>Channel#basicPublish</code> will eventually block if a
8 * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
9 *
10 * @see com.rabbitmq.client.AMQP.Basic.Publish
11 * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
12 * @param exchange the exchange to publish the message to 交換模式,會在後面講, 官方文檔在這裡 13 * @param routingKey the routing key 控制消息發送到哪個隊列
14 * @param props other properties for the message - routing headers etc 其他參數
15 * @param body the message body 消息,byte數組
16 * @throws java.io.IOException if an error is encountered
17 */
18 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
這裡又有個其他參數,它的類型是這樣的,設定消息的一些詳細屬性
5.編寫MessageConsumer
為了和Sender區分開,建立一個Maven項目MessageConsumer
1 package com.liyang.ticktock.rabbitmq;
2
3 import java.io.IOException;
4 import java.util.concurrent.TimeoutException;
5
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8
9 import com.rabbitmq.client.AMQP;
10 import com.rabbitmq.client.Channel;
11 import com.rabbitmq.client.Connection;
12 import com.rabbitmq.client.ConnectionFactory;
13 import com.rabbitmq.client.Consumer;
14 import com.rabbitmq.client.DefaultConsumer;
15 import com.rabbitmq.client.Envelope;
16
17 public class MessageConsumer {
18
19 private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
20
21 public boolean consume(String queueName){
22 //連接配接RabbitMQ
23 ConnectionFactory factory = new ConnectionFactory();
24 factory.setHost("127.0.0.1");
25 Connection connection = null;
26 Channel channel = null;
27 try {
28 connection = factory.newConnection();
29 channel = connection.createChannel();
30 //這裡聲明queue是為了取消息的時候,queue肯定會存在
31 //注意,queueDeclare是幂等的,也就是說,消費者和生産者,不論誰先聲明,都隻會有一個queue
32 channel.queueDeclare(queueName, false, false, false, null);
33
34 //這裡重寫了DefaultConsumer的handleDelivery方法,因為發送的時候對消息進行了getByte(),在這裡要重新組裝成String
35 Consumer consumer = new DefaultConsumer(channel){
36 @Override
37 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
38 throws IOException {
39 String message = new String(body, "UTF-8");
40 logger.info("Received '" + message + "'");
41 }
42 };
43 //上面是聲明消費者,這裡用聲明的消費者消費掉隊列中的消息
44 channel.basicConsume(queueName, true, consumer);
45
46 //這裡不能關閉連接配接,調用了消費方法後,消費者會一直連接配接着rabbitMQ等待消費
47
48 } catch (IOException | TimeoutException e) {
49 //失敗後記錄日志,傳回false,代表消費失敗
50 logger.error("send message faild!",e);
51 return false;
52 }
53
54
55 return true;
56 }
57 }
然後在App的main方法中調用Cunsumer進行消費
1 public class App
2 {
3 //這個隊列名字要和生産者中的名字一樣,否則找不到隊列
4 private final static String QUEUE_NAME = "hello";
5
6 public static void main( String[] args )
7 {
8 MessageConsumer consumer = new MessageConsumer();
9 consumer.consume(QUEUE_NAME);
10 }
11 }
結果如下,消費者一直在等待消息,每次有消息進來,就會立刻消費掉
6.多個消費者同時消費一個隊列
改造一下Consumer
在App中new多個消費者
改造Sender,使它不停的往RabbitMQ中發送消息
啟動Sender
啟動Consumer,發現消息很平均的發給四個用戶端,一人一個,誰也不插隊
如果我們把速度加快呢?把Sender的休息時間去掉,發現消費開始變得沒有規律了,其實呢,它還是有規律的,這個是RabbitMQ的特性,稱作“Round-robin dispatching”,消息會平均的發送給每一個消費者,可以看第一第二行,消息分别是56981和56985,相應的82、82、84都被分給了其他線程,隻是在目前線程的時間片内,可以處理這麼多任務,是以就一次列印出來了
7.結束語
這一章介紹了從安裝到用JAVA語言編寫生産者與消費者,在這裡隻是簡單的消費消息并列印日志,如果一個消息需要處理的時間很長,而處理的過程中,這個消費者挂掉了,那消息會不會丢失呢?答案是肯定的,而且已經配置設定給這個消費者,但還沒來得及處理的消息也會一并丢失掉,這個問題,RabbitMQ早就考慮到了,并且提供了解決方案,下一篇博文将進行詳細介紹