天天看點

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html

0.目錄

RabbitMQ-從基礎到實戰(2)— 防止消息丢失 RabbitMQ-從基礎到實戰(3)— 消息的交換(上) RabbitMQ-從基礎到實戰(4)— 消息的交換(中) RabbitMQ-從基礎到實戰(5)— 消息的交換(下) RabbitMQ-從基礎到實戰(6)— 與Spring內建

1.簡介

本篇博文介紹了在windows平台下安裝RabbitMQ Server端,并用JAVA代碼實作收發消息

2.安裝RabbitMQ

  1. RabbitMQ是用Erlang開發的,是以需要先安裝Erlang環境,在 這裡 下載下傳對應系統的Erlang安裝包進行安裝
  2. 點選 下載下傳對應平台的RabbitMQ安裝包進行安裝

Windows平台安裝完成後如圖

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

3.啟用RabbitMQ Web控制台

RabbitMQ提供一個控制台,用于管理和監控RabbitMQ,預設是不啟動的,需要運作以下指令進行啟動

  1. 點選上圖的Rabbit Command Prompt,打開rabbitMQ控制台
  2. 官方介紹管理控制台的頁面 ,可以看到,輸入以下指令啟動背景控制插件
    rabbitmq-plugins enable rabbitmq_management
  3. 登入背景頁面: http://localhost:15672/    密碼和使用者名都是 guest ,界面如下
RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

目前可以先不用理會此界面,後面使用到時會詳細介紹,也可以到

檢視官方文檔。

4.編寫MessageSender

Spring對RabbitMQ已經進行了封裝,正常使用中,會使用Spring內建,第一個項目中,我們先不考慮那麼多

在IDE中建立一個Maven項目,并在pom.xml中貼入如下依賴,RabbitMQ的最新版本依賴可以在

找到

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>      

等待Maven下載下傳完成後,就可以在Maven Dependencies中看到RabbitMQ的JAR

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

在這裡,我們發現,RabbitMQ的日志依賴了slf4j-api這個包,slf4j-api并不是一個日志實作,這樣子是打不出日志的,是以,我們給pom加上一個日志實作,這裡用了logback

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.1</version>
</dependency>      

之後maven依賴如下,可以放心寫代碼了

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

建立一個MessageSender類,代碼如下

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語
1 import java.io.IOException;
 2 import java.util.concurrent.TimeoutException;
 3 
 4 import org.slf4j.Logger;
 5 import org.slf4j.LoggerFactory;
 6 
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10 
11 public class MessageSender {
12     
13     private Logger logger = LoggerFactory.getLogger(MessageSender.class);
14 
15     //聲明一個隊列名字
16     private final static String QUEUE_NAME = "hello";
17     
18     public boolean sendMessage(String message){
19         //new一個RabbitMQ的連接配接工廠
20         ConnectionFactory factory = new ConnectionFactory();
21         //設定需要連接配接的RabbitMQ位址,這裡指向本機
22         factory.setHost("127.0.0.1");
23         Connection connection = null;
24         Channel channel = null;
25         try {
26             //嘗試擷取一個連接配接
27             connection = factory.newConnection();
28             //嘗試建立一個channel
29             channel = connection.createChannel();
30             //這裡的參數在後面詳解
31             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
32             //注意這裡調用了getBytes(),發送的其實是byte數組,接收方收到消息後,需要重新組裝成String
33             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
34             logger.info("Sent '" + message + "'");
35             //關閉channel和連接配接
36             channel.close();
37             connection.close();
38         } catch (IOException | TimeoutException e) {
39             //失敗後記錄日志,傳回false,代表發送失敗
40             logger.error("send message faild!",e);
41             return false;
42         }
43         return true;
44     }
45 }      
RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

然後在App類的main方法中調用sendMessage

1 public class App {
2     public static void main( String[] args ){
3         MessageSender sender = new MessageSender();
4         sender.sendMessage("hello RabbitMQ!");
5     }
6 }      

列印日志如下

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

打開RabbitMQ的控制台,可以看到消息已經進到了RabbitMQ中

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

點進去,用控制台自帶的getMessage功能,可以看到消息已經成功由RabbitMQ管理了

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

至此,MessageSender已經寫好了,在該類的31和33行,我們分别調用了隊列聲明和消息發送

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());      

queueDeclare,有很多參數,我們可以看一下他的源碼,注釋上有詳細的解釋,我簡單翻譯了一下

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語
1 /**
 2      * Declare a queue 聲明一個隊列
 3      * @see com.rabbitmq.client.AMQP.Queue.Declare
 4      * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
 5      * @param queue the name of the queue隊列的名字
 6      * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,為true則在rabbitMQ重新開機後生存
 7      * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是否是排他性隊列(别人看不到),隻對目前連接配接有效,目前連接配接斷開後,隊列删除(設定了持久化也删除)
 8      * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自動删除,在最後一個連接配接斷開後删除隊列
 9      * @param arguments other properties (construction arguments) for the queue 其他參數
10      * @return a declaration-confirm method to indicate the queue was successfully declared
11      * @throws java.io.IOException if an error is encountered
12      */
13     Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
14                                  Map<String, Object> arguments) throws IOException;      
RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

前面4個都非常好了解,最後一個“其他參數”,到底是什麼其他參數,這個東西真的很難找,用到再解釋吧,官方文檔如下

  • TTL  Time To Live  存活時間

basicPublish的翻譯如下

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語
1  /**
 2      * Publish a message.發送一條消息
 3      *
 4      * Publishing to a non-existent exchange will result in a channel-level
 5      * protocol exception, which closes the channel.
 6      *
 7      * Invocations of <code>Channel#basicPublish</code> will eventually block if a
 8      * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
 9      *
10      * @see com.rabbitmq.client.AMQP.Basic.Publish
11      * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
12      * @param exchange the exchange to publish the message to 交換模式,會在後面講,        官方文檔在這裡               13      * @param routingKey the routing key 控制消息發送到哪個隊列
14      * @param props other properties for the message - routing headers etc 其他參數
15      * @param body the message body 消息,byte數組
16      * @throws java.io.IOException if an error is encountered
17      */
18     void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;      
RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

這裡又有個其他參數,它的類型是這樣的,設定消息的一些詳細屬性

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

5.編寫MessageConsumer

為了和Sender區分開,建立一個Maven項目MessageConsumer

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語
1 package com.liyang.ticktock.rabbitmq;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 import com.rabbitmq.client.AMQP;
10 import com.rabbitmq.client.Channel;
11 import com.rabbitmq.client.Connection;
12 import com.rabbitmq.client.ConnectionFactory;
13 import com.rabbitmq.client.Consumer;
14 import com.rabbitmq.client.DefaultConsumer;
15 import com.rabbitmq.client.Envelope;
16 
17 public class MessageConsumer {
18     
19     private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
20     
21     public boolean consume(String queueName){
22         //連接配接RabbitMQ
23         ConnectionFactory factory = new ConnectionFactory();
24         factory.setHost("127.0.0.1");
25         Connection connection = null;
26         Channel channel = null;
27         try {
28             connection = factory.newConnection();
29             channel = connection.createChannel();
30             //這裡聲明queue是為了取消息的時候,queue肯定會存在
31             //注意,queueDeclare是幂等的,也就是說,消費者和生産者,不論誰先聲明,都隻會有一個queue
32             channel.queueDeclare(queueName, false, false, false, null);
33             
34             //這裡重寫了DefaultConsumer的handleDelivery方法,因為發送的時候對消息進行了getByte(),在這裡要重新組裝成String
35             Consumer consumer = new DefaultConsumer(channel){
36                 @Override
37                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
38                           throws IOException {
39                         String message = new String(body, "UTF-8");
40                         logger.info("Received '" + message + "'");
41                 }
42             };
43             //上面是聲明消費者,這裡用聲明的消費者消費掉隊列中的消息
44             channel.basicConsume(queueName, true, consumer);
45             
46             //這裡不能關閉連接配接,調用了消費方法後,消費者會一直連接配接着rabbitMQ等待消費
47            
48         } catch (IOException | TimeoutException e) {
49             //失敗後記錄日志,傳回false,代表消費失敗
50             logger.error("send message faild!",e);
51             return false;
52         }
53         
54         
55         return true;
56     }
57 }      
RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

然後在App的main方法中調用Cunsumer進行消費

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語
1 public class App 
 2 {
 3     //這個隊列名字要和生産者中的名字一樣,否則找不到隊列
 4     private final static String QUEUE_NAME = "hello";
 5     
 6     public static void main( String[] args )
 7     {
 8         MessageConsumer consumer = new MessageConsumer();
 9         consumer.consume(QUEUE_NAME);
10     }
11 }      
RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

結果如下,消費者一直在等待消息,每次有消息進來,就會立刻消費掉

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

6.多個消費者同時消費一個隊列

改造一下Consumer

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

在App中new多個消費者

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

改造Sender,使它不停的往RabbitMQ中發送消息

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

啟動Sender

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

啟動Consumer,發現消息很平均的發給四個用戶端,一人一個,誰也不插隊

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

如果我們把速度加快呢?把Sender的休息時間去掉,發現消費開始變得沒有規律了,其實呢,它還是有規律的,這個是RabbitMQ的特性,稱作“Round-robin dispatching”,消息會平均的發送給每一個消費者,可以看第一第二行,消息分别是56981和56985,相應的82、82、84都被分給了其他線程,隻是在目前線程的時間片内,可以處理這麼多任務,是以就一次列印出來了

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ轉載請注明出處:http://www.cnblogs.com/4----/p/6518801.html0.目錄1.簡介2.安裝RabbitMQ3.啟用RabbitMQ Web控制台4.編寫MessageSender5.編寫MessageConsumer6.多個消費者同時消費一個隊列7.結束語

7.結束語

這一章介紹了從安裝到用JAVA語言編寫生産者與消費者,在這裡隻是簡單的消費消息并列印日志,如果一個消息需要處理的時間很長,而處理的過程中,這個消費者挂掉了,那消息會不會丢失呢?答案是肯定的,而且已經配置設定給這個消費者,但還沒來得及處理的消息也會一并丢失掉,這個問題,RabbitMQ早就考慮到了,并且提供了解決方案,下一篇博文将進行詳細介紹