消息隊列:
消息隊列是在消息傳輸過程中儲存消息的容器。
消息隊列最經典的用法就是消費者和生産者之間通過消息管道來傳遞消息,消費者和生産生是不通的程序。生産者往管道中寫消息,消費者從管道中讀消息。
相當于水管,有一個入口和出口,水從入口流入出口流出,這就是一個消息隊列
線程或程序往隊列裡面添加資料,出口從隊列裡面讀資料
左側多線程往入口處添加完資料,任務就結束了;右側隻要依次從水管裡取資料就行了。
異步完成的任務
比如京東下單,下單後付完錢,相當于把消息堆在了水管裡,背景會有線程去接收這個單的消息,然後去庫房,發貨,走物流,直到接收貨物并簽收完,點選完成,整個流程才走完。
客戶交完錢後,丢了個消息在這個隊列中,會給客戶傳回一個結果,告知你已經買了這個商品;而後面接收訂單消息,發貨,物流都是後面的"程序"或"線程"幹的事情。
是以,一般在異步處理問題時候,都會用到消息隊列處理的這種思想。
使用multiprocessing裡面的Queue來實作消息隊列。
文法:
<code>from</code> <code>mutliprocessing </code><code>import</code> <code>Queue</code>
<code>q </code><code>=</code> <code>Queue</code>
<code>q.put(data)</code>
<code>data </code><code>=</code> <code>q.get(data)</code>
舉例:
<code>from</code> <code>multiprocessing </code><code>import</code> <code>Queue, Process</code>
<code>def</code> <code>write(q):</code>
<code> </code><code>for</code> <code>i </code><code>in</code> <code>[</code><code>'a'</code><code>,</code><code>'b'</code><code>,</code><code>'c'</code><code>,</code><code>'d'</code><code>]:</code>
<code> </code><code>q.put(i)</code>
<code> </code><code>print</code> <code>(</code><code>'put {0} to queue'</code><code>.</code><code>format</code><code>(i))</code>
<code>def</code> <code>read(q):</code>
<code> </code><code>while</code> <code>1</code><code>:</code>
<code> </code><code>result </code><code>=</code> <code>q.get()</code>
<code> </code><code>print</code> <code>(</code><code>"get {0} from queue"</code><code>.</code><code>format</code><code>(result))</code>
<code>def</code> <code>main():</code>
<code> </code><code>q </code><code>=</code> <code>Queue()</code>
<code> </code><code>pw </code><code>=</code> <code>Process(target</code><code>=</code><code>write,args</code><code>=</code><code>(q,))</code>
<code> </code><code>pr </code><code>=</code> <code>Process(target</code><code>=</code><code>read,args</code><code>=</code><code>(q,))</code>
<code> </code><code>pw.start()</code>
<code> </code><code>pr.start()</code>
<code> </code><code>pw.join()</code>
<code> </code><code>pr.terminate() </code><code>#停止</code>
<code> </code><code># 相當于join,等pr完成以後,當whlie沒有任何執行後,結束。</code>
<code>if</code> <code>__name__ </code><code>=</code><code>=</code> <code>'__main__'</code><code>:</code>
<code> </code><code>main()</code>
傳回結果:
<code>put a to queue</code>
<code>get a from queue</code>
<code>put b to queue</code>
<code>get b from queue</code>
<code>put c to queue</code>
<code>get c from queue</code>
<code>put d to queue</code>
<code>get d from queue</code>
PIPE:
多程序裡面有個pipe的方法來實作消息隊列:
1. Pipe 方法傳回(conn1, conn2)代表一個管道的兩端。PIPE方法有個deplex參數,如果deplex參數為True(預設值),那麼這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1隻負責接收消息,conn2負責發送消息。
2.send 和recv方法分别是發送和接受消息的方法。close方法表示關閉管道,當消息接收結束以後,關閉管道。
<code>import</code> <code>time</code>
<code>from</code> <code>multiprocessing </code><code>import</code> <code>Pipe, Process</code>
<code>def</code> <code>proc1(pipe):</code>
<code> </code><code>for</code> <code>i </code><code>in</code> <code>xrange</code><code>(</code><code>1</code><code>, </code><code>10</code><code>):</code>
<code> </code><code>pipe.send(i)</code>
<code> </code><code>print</code> <code>(</code><code>"send {0} to pipe"</code><code>.</code><code>format</code><code>(i))</code>
<code> </code><code>time.sleep(</code><code>1</code><code>)</code>
<code>def</code> <code>proc2(pipe):</code>
<code> </code><code>n </code><code>=</code> <code>9</code>
<code> </code><code>while</code> <code>n > </code><code>0</code><code>:</code>
<code> </code><code>result </code><code>=</code> <code>pipe.recv()</code>
<code> </code><code>print</code> <code>(</code><code>"recv {0} from pipe"</code><code>.</code><code>format</code><code>(result))</code>
<code> </code><code>n </code><code>-</code><code>=</code> <code>1</code>
<code> </code><code>pipe </code><code>=</code> <code>Pipe(duplex</code><code>=</code><code>False</code><code>)</code>
<code> </code><code>print</code> <code>(</code><code>type</code><code>(pipe))</code>
<code> </code><code>p1 </code><code>=</code> <code>Process(target</code><code>=</code><code>proc1, args</code><code>=</code><code>(pipe[</code><code>1</code><code>],))</code>
<code> </code><code>p2 </code><code>=</code> <code>Process(target</code><code>=</code><code>proc2, args</code><code>=</code><code>(pipe[</code><code>0</code><code>],)) </code><code>#接收寫0</code>
<code> </code><code>p1.start()</code>
<code> </code><code>p2.start()</code>
<code> </code><code>p1.join()</code>
<code> </code><code>p2.join()</code>
<code> </code><code>pipe[</code><code>0</code><code>].close()</code>
<code> </code><code>pipe[</code><code>1</code><code>].close()</code>
傳回結果(逐行列印):
本文轉自 聽丶飛鳥說 51CTO部落格,原文連結:http://blog.51cto.com/286577399/2051155