天天看點

【轉載】消息隊列RabbitMQ入門介紹

(一)基本概念

      rabbitmq 是流行的開源消息隊列系統,用 erlang 語言開發。我曾經對這門語言挺有興趣,學過一段時間,後來沒堅持。rabbitmq 是 amqp(進階消息隊列協定)的标準實作。如果不熟悉 amqp,直接看 rabbitmq 的文檔會比較困難。不過它也隻有幾個關鍵概念,這裡簡單介紹。

rabbitmq 的結構圖如下:

【轉載】消息隊列RabbitMQ入門介紹

幾個概念說明:

broker:簡單來說就是消息隊列伺服器實體。

exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。

queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。

binding:綁定,它的作用就是把 exchange 和 queue 按照路由規則綁定起來。

routing key:路由關鍵字,exchange 根據這個關鍵字進行消息投遞。

vhost:虛拟主機,一個 broker 裡可以開設多個 vhost,用作不同使用者的權限分離。

producer:消息生産者,就是投遞消息的程式。

consumer:消息消費者,就是接受消息的程式。

channel:消息通道,在用戶端的每個連接配接裡,可建立多個 channel,每個 channel 代表一個會話任務。

消息隊列的使用過程大概如下:

(1)用戶端連接配接到消息隊列伺服器,打開一個 channel 。

(2)用戶端聲明一個 exchange,并設定相關屬性。

(3)用戶端聲明一個 queue,并設定相關屬性。

(4)用戶端使用 routing key(本質上是 binding key,後文已做相應糾正),在 exchange 和 queue 之間建立好綁定關系。

(5)用戶端投遞消息到 exchange 。

      exchange 接收到消息後,就根據消息的 routing key 和已經設定的 binding 關系,進行消息路由,将消息投遞到一個或多個隊列裡。

      exchange 也有幾個類型:

完全根據 routing key 進行投遞的叫做 direct 交換機。例如,綁定時設定了 binding key 為 "abc",那麼用戶端送出的消息,隻有設定了routing key 為 "abc" 的才會被投遞到該隊列。

對 routing key 進行模式比對後進行投遞的叫做 topic 交換機。符号 "#" 比對一個或多個詞,符号 "*" 比對正好一個詞。例如 "abc.#" 可以比對 "abc.def.ghi" ,"abc.*" 隻能比對 "abc.def" 。

還有一種不需要 routing key 的,叫做 fanout 交換機。它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。

      rabbitmq 支援消息的持久化,也就是資料寫在磁盤上,為了資料安全考慮,我想大多數使用者都會選擇持久化。消息隊列持久化包括 3 個部分:

(1)exchange 持久化,在聲明時指定 durable => 1

(2)queue 持久化,在聲明時指定 durable => 1

(3)消息持久化,在投遞時指定 delivery_mode => 2(1 是非持久化)

      如果 exchange 和 queue 都是持久化的,那麼它們之間的 binding 也是持久化的。

(二)應用實際

使用 linux 伺服器(ubuntu 9.10 64位),安裝 rabbitmq 非常友善。

先運作如下指令安裝 erlang:

<a href="http://my.oschina.net/moooofly/blog/92277#">?</a>

1

<code># apt-get install erlang-nox</code>

再到rabbitmq.com上下載下傳rabbitmq的安裝包,如下安裝:

<code># dpkg -i rabbitmq-server_2.6.1-1_all.deb</code>

安裝完後,使用

<code># /etc/init.d/rabbitmq-server start|stop|restart</code>

來啟動、停止、重新開機 rabbitmq。

在正式應用之前,我們先在 rabbitmq 裡建立一個 vhost ,加一個使用者,并設定該使用者的權限。

使用 rabbitmqctl 用戶端工具,在根目錄下建立 "/pyhtest" 這個 vhost :

<code># rabbitmqctl add_vhost /pyhtest</code>

建立一個使用者名 "pyh" ,設定密碼 "pyh1234" :

<code># rabbitmqctl add_user pyh pyh1234</code>

設定pyh使用者對/pyhtest這個vhost擁有全部權限:

<code># rabbitmqctl set_permissions -p /pyhtest pyh “.*” “.*” “.*”</code>

後面三個”*”代表pyh使用者擁有對/pyhtest的配置、寫、讀全部權限

設定好後,開始程式設計,我用 perl 寫一個消息投遞程式(producer):

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

<code>#!/usr/bin/perl</code>

<code>use</code> <code>strict;</code>

<code>use</code> <code>net::rabbitmq;</code>

<code>use</code> <code>uuid::tiny;</code>

<code>my</code> <code>$channel</code> <code>= 1000;</code><code># channel id,可以随意指定,隻要不沖突</code>

<code>my</code> <code>$queuename</code> <code>= “pyh_queue”;</code><code># 隊列名</code>

<code>my</code> <code>$exchange</code> <code>= “pyh_exchange”;</code><code># 交換機名</code>

<code>my</code> <code>$routing_key</code> <code>= “test”;</code><code># routing key</code>

<code>my</code> <code>$mq</code> <code>= net::rabbitmq-&gt;new();</code><code># 建立一個rabbitmq對象</code>

<code>$mq</code><code>-&gt;</code><code>connect</code><code>(“localhost”, {</code><code>vhost</code> <code>=&gt; “/pyhtest”,</code><code>user</code> <code>=&gt; “pyh”,</code><code>password</code> <code>=&gt; “pyh1234″ });</code><code># 建立連接配接</code>

<code>$mq</code><code>-&gt;channel_open(</code><code>$channel</code><code>);</code><code># 打開一個channel</code>

<code>$mq</code><code>-&gt;exchange_declare(</code><code>$channel</code><code>,</code><code>$exchange</code><code>, {</code><code>durable</code> <code>=&gt; 1});</code><code># 聲明一個持久化的交換機</code>

<code>$mq</code><code>-&gt;queue_declare(</code><code>$channel</code><code>,</code><code>$queuename</code><code>, {</code><code>durable</code> <code>=&gt; 1});</code><code># 聲明一個持久化的隊列</code>

<code>$mq</code><code>-&gt;queue_bind(</code><code>$channel</code><code>,</code><code>$queuename</code><code>,</code><code>$exchange</code><code>,</code><code>$routing_key</code><code>);</code><code># 使用routing key在交換機和隊列間建立綁定</code>

<code>for</code> <code>(</code><code>my</code> <code>$i</code><code>=0;</code><code>$i</code><code>&lt;10000000;</code><code>$i</code><code>++) {</code><code># 循環1000萬次</code>

<code>my</code> <code>$string</code> <code>= create_uuid_as_string(uuid_v1);</code><code># 産生一條uuid作為消息主體</code>

<code>$mq</code><code>-&gt;publish(</code><code>$channel</code><code>,</code><code>$routing_key</code><code>,</code><code>$string</code><code>, {</code><code>exchange</code> <code>=&gt;</code><code>$exchange</code> <code>}, {</code><code>delivery_mode</code> <code>=&gt; 2 });</code><code># 将消息結合key以持久化模式投遞到交換機</code>

<code>}</code>

<code>$mq</code><code>-&gt;disconnect();</code><code># 斷開連接配接</code>

消息接受程式(consumer)大概如下:

<code>my</code> <code>$channel</code> <code>= 1001;</code>

<code>my</code> <code>$queuename</code> <code>= “pyh_queue”;</code>

<code>my</code> <code>$mq</code> <code>= net::rabbitmq-&gt;new();</code>

<code>$mq</code><code>-&gt;</code><code>connect</code><code>(“localhost”, {</code><code>vhost</code><code>=&gt;”/pyhtest”,</code><code>user</code> <code>=&gt; “pyh”,</code><code>password</code> <code>=&gt; “pyh1234″ });</code>

<code>$mq</code><code>-&gt;channel_open(</code><code>$channel</code><code>);</code>

<code>while</code> <code>(1) {</code>

<code>my</code> <code>$hashref</code> <code>=</code><code>$mq</code><code>-&gt;get(</code><code>$channel</code><code>,</code><code>$queuename</code><code>);</code>

<code>last</code> <code>unless</code> <code>defined</code> <code>$hashref</code><code>;</code>

<code>print</code> <code>$hashref</code><code>-&gt;{message_count}, “: “,</code><code>$hashref</code><code>-&gt;{body},”\n”;</code>

<code>$mq</code><code>-&gt;disconnect();</code>

consumer 連接配接後隻要指定隊列就可擷取到消息。

      上述程式共投遞 1000 萬條消息,每條消息 36 位元組(uuid),打開持久化,共耗時 17 分多鐘(包括産生 uuid 的時間),每秒投遞消息約 9500 條。測試機器是 8g 記憶體、8 核志強 cpu 。

     投遞完後,在 /var/lib/rabbitmq/mnesia/rabbit@${hostname}/msg_store_persistent 目錄,産生 2g 多的持久化消息資料。在運作 consumer 程式後,這些資料都會消失,因為消息已經被消費了。