天天看點

rabbitMQ的工作原理及簡單使用

一、RabbitMQ簡介

     在介紹RabbitMQ之前實作要介紹一下MQ,MQ是什麼?MQ全稱是Message Queue,可以了解為消息隊列的意思,簡單來說就是消息以管道的方式進行傳遞。

RabbitMQ是一個實作了AMQP(Advanced Message Queuing Protocol)進階消息隊列協定的消息隊列服務,用Erlang語言的。

二、使用場景

        在我們秒殺搶購商品的時候,系統會提醒我們稍等排隊中,而不是像幾年前一樣頁面卡死或報錯給使用者。像這種排隊結算就用到了消息隊列機制,放入通道裡面一個一個結算處理,而不是某個時間斷突然湧入大批量的查詢新增把資料庫給搞當機,是以RabbitMQ本質上起到的作用就是削峰填谷,為業務保駕護航。

三、為什麼選擇RabbitMQ

      現在的市面上有很多MQ可以選擇,比如ActiveMQ、ZeroMQ、Appche Qpid,那問題來了為什麼要選擇RabbitMQ?

     (1)除了Qpid,RabbitMQ是唯一一個實作了AMQP标準的消息伺服器;

     (2)可靠性,RabbitMQ的持久化支援,保證了消息的穩定性;

     (3)高并發,RabbitMQ使用了Erlang開發語言,Erlang是為電話交換機開發的語言,天生自帶高并發光環,和高可用特性;

     (4)叢集部署簡單,正是應為Erlang使得RabbitMQ叢集部署變的超級簡單;

     (5)社群活躍度高,根據網上資料來看,RabbitMQ也是首選;

三、工作機制

1.   生産者、消費者和代理

  在了解消息通訊之前首先要了解3個概念:生産者、消費者和代理。

    (1)生産者:消息的建立者,負責建立和推送資料到消息伺服器;

    (2)消費者:消息的接收方,用于處理資料和确認消息;

    (3)代理:就是RabbitMQ本身,用于扮演“快遞”的角色,本身不生産消息,隻是扮演“快遞”的角色。

2.消息發送原理

      首先你必須連接配接到Rabbit才能釋出和消費消息,那怎麼連接配接和發送消息的呢?

      你的應用程式和Rabbit Server之間會建立一個TCP連接配接,一旦TCP打開,并通過了認證,認證就是你試圖連接配接Rabbit之前發送的Rabbit伺服器連接配接資訊和使用者名和密碼,有點像程式連接配接資料庫,使用Java有兩種連接配接認證的方式,後面代碼會詳細介紹,一旦認證通過你的應用程式和Rabbit就建立了一條AMQP信道(Channel)。  

      信道是建立在“真實”TCP上的虛拟連接配接,AMQP指令都是通過信道發送出去的,每個信道都會有一個唯一的ID,不論是釋出消息,訂閱隊列或者介紹消息都是通過信道完成的。

3.為什麼不通過TCP直接發送指令?

      對于作業系統來說建立和銷毀TCP會話是非常昂貴的開銷,假設高峰期每秒有成千上萬條連接配接,每個連接配接都要建立一條TCP會話,這就造成了TCP連接配接的巨大浪費,而且作業系統每秒能建立的TCP也是有限的,是以很快就會遇到系統瓶頸。

      如果我們每個請求都使用一條TCP連接配接,既滿足了性能的需要,又能確定每個連接配接的私密性,這就是引入信道概念的原因。

4.RabbitMQ相關名詞

      想要真正的了解Rabbit有些名詞是你必須知道的。包括:ConnectionFactory(連接配接管理器)、Channel(信道)、Exchange(交換器)、Queue(隊列)、RoutingKey(路由鍵)、BindingKey(綁定鍵)。

      ConnectionFactory(連接配接管理器):應用程式與Rabbit之間建立連接配接的管理器,程式代碼中使用;

      Channel(信道):消息推送使用的通道;

      Exchange(交換器):用于接受、配置設定消息;

      Queue(隊列):用于存儲生産者的消息;

      RoutingKey(路由鍵):用于把生成者的資料配置設定到交換器上;

      BindingKey(綁定鍵):用于把交換器的消息綁定到隊列上;

四、消息的持久化

1.消息持久化    

     Rabbit隊列和交換器有一個不可告人的秘密,就是預設情況下重新開機伺服器會導緻消息丢失,那麼怎麼保證Rabbit在重新開機的時候不丢失呢?答案就是消息持久化。

當你把消息發送到Rabbit伺服器的時候,你需要選擇你是否要進行持久化,但這并不能保證Rabbit能從崩潰中恢複,想要Rabbit消息能恢複必須滿足3個條件:

(1)投遞消息的時候durable設定為true,消息持久化,代碼:channel.queueDeclare(x, true, false, false, null),參數2設定為true持久化;

(2)設定投遞模式deliveryMode設定為2(持久),代碼:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),參數3設定為存儲純文字到磁盤;

(3)消息已經到達持久化交換器上;

(4)消息已經到達持久化的隊列;

2.持久化工作原理

      Rabbit會将你的持久化消息寫入磁盤上的持久化日志檔案,等消息被消費之後,Rabbit會把這條消息辨別為等待垃圾回收。

3.持久化的缺點

      消息持久化的優點顯而易見,但缺點也很明顯,那就是性能,因為要寫入硬碟要比寫入記憶體性能較低很多,進而降低了伺服器的吞吐量,盡管使用SSD硬碟可以使事情得到緩解,但他仍然吸幹了Rabbit的性能,當消息成千上萬條要寫入磁盤的時候,性能是很低的。是以使用者要根據自己的情況,選擇适合自己的方式。

五、代碼實作

1.建立maven的java項目,在pom.xml中引入依賴

[XML] 純文字檢視 複制代碼

?

1

2

3

4

5

<code>&lt;</code><code>dependency</code><code>&gt;</code>

<code>  </code><code>&lt;</code><code>groupId</code><code>&gt;com.rabbitmq&lt;/</code><code>groupId</code><code>&gt;</code>

<code>  </code><code>&lt;</code><code>artifactId</code><code>&gt;amqp-client&lt;/</code><code>artifactId</code><code>&gt;</code>

<code>  </code><code>&lt;</code><code>version</code><code>&gt;5.2.0&lt;/</code><code>version</code><code>&gt;</code>

<code>&lt;/</code><code>dependency</code><code>&gt;</code>

2.實作連接配接

java實作代碼分為兩個類,第一個是建立Rabbit連接配接,第二是應用類使用最簡單的方式釋出和消費消息。連接配接分為兩種方式:

方式一:

[Java] 純文字檢視 複制代碼

06

07

08

09

10

11

12

13

14

15

<code>public</code> <code>static</code> <code>Connection GetRabbitConnection() {</code>

<code>    </code><code>ConnectionFactory factory =</code><code>new</code> <code>ConnectionFactory();</code>

<code>    </code><code>factory.setUsername(Config.UserName);</code>

<code>    </code><code>factory.setPassword(Config.Password);</code>

<code>    </code><code>factory.setVirtualHost(Config.VHost);</code>

<code>    </code><code>factory.setHost(Config.Host);</code>

<code>    </code><code>factory.setPort(Config.Port);</code>

<code>    </code><code>Connection conn =</code><code>null</code><code>;</code>

<code>    </code><code>try</code> <code>{</code>

<code>        </code><code>conn = factory.newConnection();</code>

<code>    </code><code>}</code><code>catch</code> <code>(Exception e) {</code>

<code>        </code><code>e.printStackTrace();</code>

<code>    </code><code>}</code>

<code>    </code><code>return</code> <code>conn;</code>

<code>}</code>

方式二:

<code>public</code> <code>static</code> <code>Connection GetRabbitConnection2() {</code>

<code>    </code><code>// 連接配接格式:amqp://userName:password@hostName:portNumber/virtualHost</code>

<code>    </code><code>String uri = String.format(</code><code>"amqp://%s:%s@%s:%d%s"</code><code>, Config.UserName, Config.Password, Config.Host, Config.Port,</code>

<code>            </code><code>Config.VHost);</code>

<code>        </code><code>factory.setUri(uri);</code>

<code>        </code><code>factory.setVirtualHost(Config.VHost);</code>

3.釋出消息和消費消息

使用最簡單的方式釋出和消費消息

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

<code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) {</code>

<code>    </code><code>Publisher();</code><code>// 推送消息</code>

<code>    </code><code>Consumer();</code><code>// 消費消息</code>

<code>/**</code>

<code> </code><code>* 推送消息</code>

<code> </code><code>*/</code>

<code>public</code> <code>static</code> <code>void</code> <code>Publisher() {</code>

<code>    </code><code>// 建立一個連接配接</code>

<code>    </code><code>Connection conn = ConnectionFactoryUtil.GetRabbitConnection();</code>

<code>    </code><code>if</code> <code>(conn !=</code><code>null</code><code>) {</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>// 建立通道</code>

<code>            </code><code>Channel channel = conn.createChannel();</code>

<code>            </code><code>// 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨占模式;參數四:消費者斷開連接配接時是否删除隊列;參數五:消息其他參數】</code>

<code>            </code><code>channel.queueDeclare(Config.QueueName,</code><code>false</code><code>,</code><code>false</code><code>,</code><code>false</code><code>,</code><code>null</code><code>);</code>

<code>            </code><code>String content = String.format(</code><code>"目前時間:%s"</code><code>,</code><code>new</code> <code>Date().getTime());</code>

<code>            </code><code>// 發送内容【參數說明:參數一:交換機名稱;參數二:隊列名稱,參數三:消息的其他屬性-routing headers,此屬性為MessageProperties.PERSISTENT_TEXT_PLAIN用于設定純文字消息存儲到硬碟;參數四:消息主體】</code>

<code>            </code><code>channel.basicPublish(</code><code>""</code><code>, Config.QueueName,</code><code>null</code><code>, content.getBytes(</code><code>"UTF-8"</code><code>));</code>

<code>            </code><code>System.out.println(</code><code>"已發送消息:"</code> <code>+ content);</code>

<code>            </code><code>// 關閉連接配接</code>

<code>            </code><code>channel.close();</code>

<code>            </code><code>conn.close();</code>

<code>        </code><code>}</code><code>catch</code> <code>(Exception e) {</code>

<code>            </code><code>e.printStackTrace();</code>

<code>        </code><code>}</code>

<code> </code><code>* 消費消息</code>

<code>public</code> <code>static</code> <code>void</code> <code>Consumer() {</code>

<code>            </code><code>// 建立訂閱器,并接受消息</code>

<code>            </code><code>channel.basicConsume(Config.QueueName,</code><code>false</code><code>,</code><code>""</code><code>,</code><code>new</code> <code>DefaultConsumer(channel) {</code>

<code>                </code><code>@Override</code>

<code>                </code><code>public</code> <code>void</code> <code>handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,</code>

<code>                        </code><code>byte</code><code>[] body)</code><code>throws</code> <code>IOException {</code>

<code>                    </code><code>String routingKey = envelope.getRoutingKey();</code><code>// 隊列名稱</code>

<code>                    </code><code>String contentType = properties.getContentType();</code><code>// 内容類型</code>

<code>                    </code><code>String content =</code><code>new</code> <code>String(body,</code><code>"utf-8"</code><code>);</code><code>// 消息正文</code>

<code>                    </code><code>System.out.println(</code><code>"消息正文:"</code> <code>+ content);</code>

<code>                    </code><code>channel.basicAck(envelope.getDeliveryTag(),</code><code>false</code><code>);</code><code>// 手動确認消息【參數說明:參數一:該消息的index;參數二:是否批量應答,true批量确認小于index的消息】</code>

<code>                </code><code>}</code>

<code>            </code><code>});</code>

更多免費技術資料可關注:annalin1203