安裝EPEL源
<code>[root@anshengme ~]</code><code># yum -y install epel-release</code>
安裝erlang
<code>[root@anshengme ~]</code><code># yum -y install erlang</code>
安裝RabbitMQ
<code>[root@anshengme ~]</code><code># yum -y install rabbitmq-server</code>
啟動并設定開機器啟動
在啟動<code>RabbitMQ</code>之前需要hostname的解析,要不然啟動不起來
<code>[root@anshengme ~]</code><code># cat /etc/hosts</code>
<code>127.0</code><code>.</code><code>0.1</code> <code>localhost localhost.localdomain localhost4 localhost4.localdomain4 anshengme</code>
<code>::</code><code>1</code> <code>localhost localhost.localdomain localhost6 localhost6.localdomain6</code>
<code>[root@anshengme ~]</code><code># systemctl start rabbitmq-server</code>
<code>[root@anshengme ~]</code><code># systemctl enable rabbitmq-server</code>
<code>Created symlink </code><code>from</code> <code>/</code><code>etc</code><code>/</code><code>systemd</code><code>/</code><code>system</code><code>/</code><code>multi</code><code>-</code><code>user.target.wants</code><code>/</code><code>rabbitmq</code><code>-</code><code>server.service to </code><code>/</code><code>usr</code><code>/</code><code>lib</code><code>/</code><code>systemd</code><code>/</code><code>system</code><code>/</code><code>rabbitmq</code><code>-</code><code>server.service.</code>
檢視啟動狀态
<code>[root@anshengme ~]</code><code># netstat -tulnp |grep 5672</code>
<code>tcp </code><code>0</code> <code>0</code> <code>0.0</code><code>.</code><code>0.0</code><code>:</code><code>25672</code> <code>0.0</code><code>.</code><code>0.0</code><code>:</code><code>*</code> <code>LISTEN </code><code>37507</code><code>/</code><code>beam.smp </code>
<code>tcp6 </code><code>0</code> <code>0</code> <code>:::</code><code>5672</code> <code>:::</code><code>*</code> <code>LISTEN </code><code>37507</code><code>/</code><code>beam.smp</code>
<code>pika</code>子產品是官方認可的操作<code>RabbitMQ</code>的API接口。
安裝pika
測試
<code>>>> </code><code>import</code> <code>pika</code>
如果你啟動了多個消費者,那麼生産者生産的任務會根據順序的依次讓消費者來執行,這就是<code>Work Queues</code>模式
<a href="https://s1.51cto.com/wyfs02/M02/95/6D/wKiom1kVKWmS6h7vAABZtJSB-Ow435.png" target="_blank"></a>
生産者代碼
<code>#!/usr/bin/env python</code>
<code># _*_ codin:utf-8 _*_</code>
<code>import</code> <code>pika</code>
<code># 連接配接到RabbitMQ 這是一個阻塞的連接配接</code>
<code>connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(</code><code>'192.168.56.100'</code><code>))</code>
<code># 生成一個管道</code>
<code>channel </code><code>=</code> <code>connection.channel()</code>
<code># 通過管道建立一個隊列</code>
<code>channel.queue_declare(queue</code><code>=</code><code>'hello'</code><code>)</code>
<code># 在隊列内發送資料,body内容,routing_key隊列,exchange交換器,通過交換器往hello隊列内發送Hello World!資料</code>
<code>channel.basic_publish(exchange</code><code>=</code><code>'</code><code>', routing_key='</code><code>hello</code><code>', body='</code><code>Hello World!')</code>
<code># 關閉連接配接</code>
<code>connection.close()</code>
消費者代碼
<code>#!/usr/bin/env python</code>
<code># 如果消費者連接配接到這個隊列的時候,隊列沒有生成,那麼消費者就生成這個隊列,如果這個隊列已經生成了,那麼就忽略它</code>
<code># 回調函數</code>
<code>def</code> <code>callback(ch, method, properties, body):</code>
<code> </code><code>print</code><code>(</code><code>" [x] Received %r"</code> <code>%</code> <code>body)</code>
<code> </code>
<code># 消費,當收到hello隊列的消息的時候就,就調用callback函數,no_ack消費者在處理任務的時候要不需要确認任務已經處理完成,改為False則要确認</code>
<code>channel.basic_consume(callback, queue</code><code>=</code><code>'hello'</code><code>, no_ack</code><code>=</code><code>True</code><code>)</code>
<code># 開始接受任務,阻塞</code>
<code>channel.start_consuming()</code>
隊列持久化
試想,如果我們的消費者在執行任務執行到一半時,突然down掉了,我們可以更改<code>no_ack=False</code>來讓消費者每次執行完成完成之後确認執行完畢了再把這個任務在隊列中移除移除掉,但是如果RabbitMQ的伺服器停止我們的任務仍然會丢失。
首先,我們需要確定的<code>RabbitMQ</code>永遠不會在我們的隊列中失去,為了做到這一點,我們需要把<code>durable=True</code>,聲明一個新名稱的隊列,為<code>task_queue</code>:
<code>channel.queue_declare(queue</code><code>=</code><code>'task_queue'</code><code>, durable</code><code>=</code><code>True</code><code>)</code>
<code>durable</code>需要在生産者和消費者上面都需要寫上,且<code>durable</code>隻會讓我們的隊列持久化,并不能夠讓消息持久化。
消息持久化
消息持久化隻需要在添加消息的時候添加一個<code>delivery_mode=2</code>
<code>channel.basic_publish(exchange</code><code>=</code><code>'',</code>
<code> </code><code>routing_key</code><code>=</code><code>'world'</code><code>,</code>
<code> </code><code>body</code><code>=</code><code>'Hello World!'</code><code>,</code>
<code> </code><code>properties</code><code>=</code><code>pika.BasicProperties(</code>
<code> </code><code># 2=消息持久化</code>
<code> </code><code>delivery_mode</code><code>=</code><code>2</code><code>,</code>
<code> </code><code>))</code>
在消費者的callback函數内添加以下代碼:
<code>ch.basic_ack(delivery_tag </code><code>=</code> <code>method.delivery_tag)</code>
每一個消費者同時隻處理一個任務,比如說現在有三個消費者,剛開始來了三個任務,平均配置設定給了三個消費者,那麼這三個消費者目前都在同時執行任務,當第四個任務到來的時候依舊會配置設定給第一個消費者,第五個任務到來的時候會配置設定給第二個消費者,以此類推。
那麼以上的狀況有什麼不妥呢?譬如說不同的消費者執行任務的時間不同,我們現在需要的時候,當三個消費者都在執行任務的時候,比如說第二個消費者任務執行完了,其他消費者都還在執行任務,當第四個任務到來的時候希望交給第二個消費者,若要實作此功能,隻需要在消費者加上一下代碼即可:
<code>channel.basic_qos(prefetch_count</code><code>=</code><code>1</code><code>)</code>
完整的代碼如下
<code>import</code> <code>time</code>
<code>connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(</code>
<code> </code><code>host</code><code>=</code><code>'192.168.56.100'</code><code>))</code>
<code>print</code><code>(</code><code>' [*] Waiting for messages. To exit press CTRL+C'</code><code>)</code>
<code> </code><code>time.sleep(</code><code>10</code><code>)</code>
<code> </code><code>print</code><code>(</code><code>" [x] Done"</code><code>)</code>
<code> </code><code>ch.basic_ack(delivery_tag</code><code>=</code><code>method.delivery_tag)</code>
<code>channel.basic_consume(callback,</code>
<code> </code><code>queue</code><code>=</code><code>'task_queue'</code><code>)</code>
<code> </code>
<code>import</code> <code>sys</code>
<code>for</code> <code>n </code><code>in</code> <code>range</code><code>(</code><code>10</code><code>):</code>
<code> </code><code>message </code><code>=</code> <code>"Hello World! %s"</code> <code>%</code> <code>(n </code><code>+</code> <code>1</code><code>)</code>
<code> </code><code>channel.basic_publish(exchange</code><code>=</code><code>'',</code>
<code> </code><code>routing_key</code><code>=</code><code>'task_queue'</code><code>,</code>
<code> </code><code>body</code><code>=</code><code>message,</code>
<code> </code><code>properties</code><code>=</code><code>pika.BasicProperties(</code>
<code> </code><code>delivery_mode</code><code>=</code><code>2</code><code>, </code><code># make message persistent</code>
<code> </code><code>))</code>
<code> </code><code>print</code><code>(</code><code>" [x] Sent %r"</code> <code>%</code> <code>message)</code>
之前的例子都基本都是1對1的消息發送和接收,即消息隻能發送到指定的queue裡,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,
Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息
屬性
描述
<code>fanout</code>
所有bind到此exchange的queue都可以接收消息
<code>direct</code>
通過routingKey和exchange決定的那個唯一的queue可以接收消息
<code>topic</code>
所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
fanout(釋出訂閱)
隻要有消費者,那麼我生産者釋出一條消息的時候所有的消費者都會被收到
<a href="https://s1.51cto.com/wyfs02/M01/95/6E/wKioL1kVKmbR6t5hAAA2MyN-t_g055.png" target="_blank"></a>
<code># 消費者</code>
<code>connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(host</code><code>=</code><code>'192.168.56.100'</code><code>))</code>
<code>channel.exchange_declare(exchange</code><code>=</code><code>'logs'</code><code>, </code><code>type</code><code>=</code><code>'fanout'</code><code>)</code>
<code># 不指定queue名字,rabbit會随機配置設定一個名字,exclusive=True會在使用此queue的消費者斷開後,自動将queue删除</code>
<code>result </code><code>=</code> <code>channel.queue_declare(exclusive</code><code>=</code><code>True</code><code>)</code>
<code># 擷取queue的name</code>
<code>queue_name </code><code>=</code> <code>result.method.queue</code>
<code># 把queue綁定到exchange</code>
<code>channel.queue_bind(exchange</code><code>=</code><code>'logs'</code><code>, queue</code><code>=</code><code>queue_name)</code>
<code> </code><code>print</code><code>(</code><code>" [x] %r"</code> <code>%</code> <code>body)</code>
<code>channel.basic_consume(callback,queue</code><code>=</code><code>queue_name,no_ack</code><code>=</code><code>True</code><code>)</code>
<code># 生産者</code>
<code># fanout發送給所有人</code>
<code>channel.basic_publish(exchange</code><code>=</code><code>'logs'</code><code>, routing_key</code><code>=</code><code>'', body</code><code>=</code><code>"Hello World!"</code><code>)</code>
RabbitMQ還支援根據關鍵字發送,即:隊列綁定關鍵字,發送者将資料根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該将資料發送至指定隊列。
<a href="https://s4.51cto.com/wyfs02/M00/95/6E/wKiom1kVKprTftxpAABNvwKE9II350.png" target="_blank"></a>
<code>channel.exchange_declare(exchange</code><code>=</code><code>'direct_logs'</code><code>,</code>
<code> </code><code>type</code><code>=</code><code>'direct'</code><code>)</code>
<code> </code>
<code>severity </code><code>=</code> <code>sys.argv[</code><code>1</code><code>] </code><code>if</code> <code>len</code><code>(sys.argv) > </code><code>1</code> <code>else</code> <code>'info'</code>
<code>message </code><code>=</code> <code>' '</code><code>.join(sys.argv[</code><code>2</code><code>:]) </code><code>or</code> <code>'Hello World!'</code>
<code>channel.basic_publish(exchange</code><code>=</code><code>'direct_logs'</code><code>,</code>
<code> </code><code>routing_key</code><code>=</code><code>severity,</code>
<code> </code><code>body</code><code>=</code><code>message)</code>
<code>print</code><code>(</code><code>" [x] Sent %r:%r"</code> <code>%</code> <code>(severity, message))</code>
<code> </code><code>host</code><code>=</code><code>'192.168.56.100'</code><code>))</code>
<code>severities </code><code>=</code> <code>sys.argv[</code><code>1</code><code>:]</code>
<code>if</code> <code>not</code> <code>severities:</code>
<code> </code><code>sys.stderr.write(</code><code>"Usage: %s [info] [warning] [error]\n"</code> <code>%</code> <code>sys.argv[</code><code>0</code><code>])</code>
<code> </code><code>sys.exit(</code><code>1</code><code>)</code>
<code>for</code> <code>severity </code><code>in</code> <code>severities:</code>
<code> </code><code>channel.queue_bind(exchange</code><code>=</code><code>'direct_logs'</code><code>,</code>
<code> </code><code>queue</code><code>=</code><code>queue_name,</code>
<code> </code><code>routing_key</code><code>=</code><code>severity)</code>
<code> </code>
<code>print</code><code>(</code><code>' [*] Waiting for logs. To exit press CTRL+C'</code><code>)</code>
<code> </code><code>print</code><code>(</code><code>" [x] %r:%r"</code> <code>%</code> <code>(method.routing_key, body))</code>
<code> </code><code>queue</code><code>=</code><code>queue_name,</code>
<code> </code><code>no_ack</code><code>=</code><code>True</code><code>)</code>
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之後發送者将資料發送到exchange,exchange将傳入”路由值“和 ”關鍵字“進行比對,比對成功,則将資料發送到指定隊列。
表達式符号說明:
符号
<code>#</code>
表示可以比對<code>0個</code>或<code>多個</code>單詞
<code>*</code>
表示隻能比對<code>一個</code>單詞
發送者路由值
隊列中
是否比對
ansheng.me
ansheng.*
不比對
ansheng.#
比對
<code>channel.exchange_declare(exchange</code><code>=</code><code>'topic_logs'</code><code>,</code>
<code> </code><code>type</code><code>=</code><code>'topic'</code><code>)</code>
<code>binding_keys </code><code>=</code> <code>sys.argv[</code><code>1</code><code>:]</code>
<code>if</code> <code>not</code> <code>binding_keys:</code>
<code> </code><code>sys.stderr.write(</code><code>"Usage: %s [binding_key]...\n"</code> <code>%</code> <code>sys.argv[</code><code>0</code><code>])</code>
<code>for</code> <code>binding_key </code><code>in</code> <code>binding_keys:</code>
<code> </code><code>channel.queue_bind(exchange</code><code>=</code><code>'topic_logs'</code><code>,</code>
<code> </code><code>routing_key</code><code>=</code><code>binding_key)</code>
<code>routing_key </code><code>=</code> <code>sys.argv[</code><code>1</code><code>] </code><code>if</code> <code>len</code><code>(sys.argv) > </code><code>1</code> <code>else</code> <code>'anonymous.info'</code>
<code>channel.basic_publish(exchange</code><code>=</code><code>'topic_logs'</code><code>,</code>
<code> </code><code>routing_key</code><code>=</code><code>routing_key,</code>
<code>print</code><code>(</code><code>" [x] Sent %r:%r"</code> <code>%</code> <code>(routing_key, message))</code>
用戶端發送一個任務到服務端,服務端把任務的執行結果再傳回給用戶端
<a href="https://s4.51cto.com/wyfs02/M00/95/6E/wKiom1kVK2rB1RUhAACEQPOtmes668.png" target="_blank"></a>
RPC Server
<code># _*_coding:utf-8_*_</code>
<code># 聲明一個RPC QUEUE</code>
<code>channel.queue_declare(queue</code><code>=</code><code>'rpc_queue'</code><code>)</code>
<code>def</code> <code>fib(n):</code>
<code> </code><code>if</code> <code>n </code><code>=</code><code>=</code> <code>0</code><code>:</code>
<code> </code><code>return</code> <code>0</code>
<code> </code><code>elif</code> <code>n </code><code>=</code><code>=</code> <code>1</code><code>:</code>
<code> </code><code>return</code> <code>1</code>
<code> </code><code>else</code><code>:</code>
<code> </code><code>return</code> <code>fib(n </code><code>-</code> <code>1</code><code>) </code><code>+</code> <code>fib(n </code><code>-</code> <code>2</code><code>)</code>
<code> </code>
<code>def</code> <code>on_request(ch, method, props, body):</code>
<code> </code><code># 接受傳過來的值</code>
<code> </code><code>n </code><code>=</code> <code>int</code><code>(body)</code>
<code> </code><code>print</code><code>(</code><code>" [.] fib(%s)"</code> <code>%</code> <code>n)</code>
<code> </code><code># 交給fib函數進行斐波那契處理</code>
<code> </code><code>response </code><code>=</code> <code>fib(n)</code>
<code> </code><code># 把結果發回去,此時消費者變成生産者</code>
<code> </code><code>ch.basic_publish(exchange</code><code>=</code><code>'',</code>
<code> </code><code>routing_key</code><code>=</code><code>props.reply_to,</code>
<code> </code><code># 用戶端傳過來的UUID順便發回去</code>
<code> </code><code>properties</code><code>=</code><code>pika.BasicProperties(correlation_id</code><code>=</code><code>props.correlation_id),</code>
<code> </code><code>body</code><code>=</code><code>str</code><code>(response))</code>
<code> </code><code># 持久化</code>
<code># 同時隻處理一個任務</code>
<code>channel.basic_consume(on_request, queue</code><code>=</code><code>'rpc_queue'</code><code>)</code>
<code>print</code><code>(</code><code>" [x] Awaiting RPC requests"</code><code>)</code>
RPC Client
<code>import</code> <code>uuid</code>
<code>class</code> <code>FibonacciRpcClient(</code><code>object</code><code>):</code>
<code> </code><code>def</code> <code>__init__(</code><code>self</code><code>):</code>
<code> </code><code>self</code><code>.connection </code><code>=</code> <code>pika.BlockingConnection(pika.ConnectionParameters(</code>
<code> </code><code>host</code><code>=</code><code>'192.168.56.100'</code><code>))</code>
<code> </code>
<code> </code><code>self</code><code>.channel </code><code>=</code> <code>self</code><code>.connection.channel()</code>
<code> </code><code>result </code><code>=</code> <code>self</code><code>.channel.queue_declare(exclusive</code><code>=</code><code>True</code><code>)</code>
<code> </code><code># 服務端傳回處理完畢的資料新Queue名稱</code>
<code> </code><code>self</code><code>.callback_queue </code><code>=</code> <code>result.method.queue</code>
<code> </code><code>self</code><code>.channel.basic_consume(</code><code>self</code><code>.on_response, no_ack</code><code>=</code><code>True</code><code>,</code>
<code> </code><code>queue</code><code>=</code><code>self</code><code>.callback_queue)</code>
<code> </code>
<code> </code><code>def</code> <code>on_response(</code><code>self</code><code>, ch, method, props, body):</code>
<code> </code><code># corr_id等于剛剛發送過去的ID,就代表這條消息是我的</code>
<code> </code><code>if</code> <code>self</code><code>.corr_id </code><code>=</code><code>=</code> <code>props.correlation_id:</code>
<code> </code><code>self</code><code>.response </code><code>=</code> <code>body</code>
<code> </code><code>def</code> <code>call(</code><code>self</code><code>, n):</code>
<code> </code><code>self</code><code>.response </code><code>=</code> <code>None</code>
<code> </code><code># 生成一個唯一ID,相當于每個任務的ID</code>
<code> </code><code>self</code><code>.corr_id </code><code>=</code> <code>str</code><code>(uuid.uuid4())</code>
<code> </code><code>self</code><code>.channel.basic_publish(exchange</code><code>=</code><code>'',</code>
<code> </code><code>routing_key</code><code>=</code><code>'rpc_queue'</code><code>,</code>
<code> </code><code>properties</code><code>=</code><code>pika.BasicProperties(</code>
<code> </code><code># 讓服務端處理完成之後把資料放到這個Queue裡面</code>
<code> </code><code>reply_to</code><code>=</code><code>self</code><code>.callback_queue,</code>
<code> </code><code># 加上一個任務ID</code>
<code> </code><code>correlation_id</code><code>=</code><code>self</code><code>.corr_id,</code>
<code> </code><code>),</code>
<code> </code><code>body</code><code>=</code><code>str</code><code>(n))</code>
<code> </code><code>while</code> <code>self</code><code>.response </code><code>is</code> <code>None</code><code>:</code>
<code> </code><code># 不斷地去Queue接受消息,但不是阻塞的,而是一直循環的去取</code>
<code> </code><code>self</code><code>.connection.process_data_events()</code>
<code> </code><code>return</code> <code>int</code><code>(</code><code>self</code><code>.response)</code>
<code>fibonacci_rpc </code><code>=</code> <code>FibonacciRpcClient()</code>
<code>print</code><code>(</code><code>" [x] Requesting fib(30)"</code><code>)</code>
<code>response </code><code>=</code> <code>fibonacci_rpc.call(</code><code>30</code><code>)</code>
<code>print</code><code>(</code><code>" [.] Got %r"</code> <code>%</code> <code>response)</code>
本文轉自 Edenwy 51CTO部落格,原文連結:http://blog.51cto.com/edeny/1924932,如需轉載請自行聯系原作者