代碼說明一切:
<code>#encoding=utf-8</code>
<code>#author: walker</code>
<code>#date: 2014-05-21</code>
<code>#summary: 自定義程序池周遊目錄下檔案</code>
<code>from</code> <code>multiprocessing </code><code>import</code> <code>Process, Queue, Lock</code>
<code>import</code> <code>time, os</code>
<code>#消費者</code>
<code>class</code> <code>Consumer(Process):</code>
<code> </code><code>def</code> <code>__init__(</code><code>self</code><code>, queue, ioLock):</code>
<code> </code><code>super</code><code>(Consumer, </code><code>self</code><code>).__init__()</code>
<code> </code><code>self</code><code>.queue </code><code>=</code> <code>queue</code>
<code> </code><code>self</code><code>.ioLock </code><code>=</code> <code>ioLock</code>
<code> </code>
<code> </code><code>def</code> <code>run(</code><code>self</code><code>):</code>
<code> </code><code>while</code> <code>True</code><code>:</code>
<code> </code><code>task </code><code>=</code> <code>self</code><code>.queue.get() </code><code>#隊列中無任務時,會阻塞程序</code>
<code> </code><code>if</code> <code>isinstance</code><code>(task, </code><code>str</code><code>) </code><code>and</code> <code>task </code><code>=</code><code>=</code> <code>'quit'</code><code>:</code>
<code> </code><code>break</code><code>;</code>
<code> </code><code>time.sleep(</code><code>1</code><code>) </code><code>#假定任務處理需要1秒鐘</code>
<code> </code><code>self</code><code>.ioLock.acquire()</code>
<code> </code><code>print</code><code>( </code><code>str</code><code>(os.getpid()) </code><code>+</code> <code>' '</code> <code>+</code> <code>task)</code>
<code> </code><code>self</code><code>.ioLock.release()</code>
<code> </code><code>self</code><code>.ioLock.acquire()</code>
<code> </code><code>print</code> <code>'Bye-bye'</code>
<code> </code><code>self</code><code>.ioLock.release()</code>
<code>#生産者 </code>
<code>def</code> <code>Producer():</code>
<code> </code><code>queue </code><code>=</code> <code>Queue() </code><code>#這個隊列是程序/線程安全的</code>
<code> </code><code>ioLock </code><code>=</code> <code>Lock()</code>
<code> </code><code>subNum </code><code>=</code> <code>4</code> <code>#子程序數量</code>
<code> </code><code>workers </code><code>=</code> <code>build_worker_pool(queue, ioLock, subNum)</code>
<code> </code><code>start_time </code><code>=</code> <code>time.time()</code>
<code> </code>
<code> </code><code>for</code> <code>parent, dirnames, filenames </code><code>in</code> <code>os.walk(r</code><code>'D:\test'</code><code>):</code>
<code> </code><code>for</code> <code>filename </code><code>in</code> <code>filenames:</code>
<code> </code><code>queue.put(filename)</code>
<code> </code><code>ioLock.acquire() </code>
<code> </code><code>print</code><code>(</code><code>'qsize:'</code> <code>+</code> <code>str</code><code>(queue.qsize()))</code>
<code> </code><code>ioLock.release()</code>
<code> </code><code>while</code> <code>queue.qsize() > subNum </code><code>*</code> <code>10</code><code>: </code><code>#控制隊列中任務數量</code>
<code> </code><code>time.sleep(</code><code>1</code><code>)</code>
<code> </code>
<code> </code><code>for</code> <code>worker </code><code>in</code> <code>workers:</code>
<code> </code><code>queue.put(</code><code>'quit'</code><code>)</code>
<code> </code><code>worker.join()</code>
<code> </code><code>ioLock.acquire() </code>
<code> </code><code>print</code><code>(</code><code>'Done! Time taken: {}'</code><code>.</code><code>format</code><code>(time.time() </code><code>-</code> <code>start_time))</code>
<code> </code><code>ioLock.release()</code>
<code>#建立程序池</code>
<code>def</code> <code>build_worker_pool(queue, ioLock, size):</code>
<code> </code><code>workers </code><code>=</code> <code>[]</code>
<code> </code><code>for</code> <code>_ </code><code>in</code> <code>range</code><code>(size):</code>
<code> </code><code>worker </code><code>=</code> <code>Consumer(queue, ioLock)</code>
<code> </code><code>worker.start()</code>
<code> </code><code>workers.append(worker)</code>
<code> </code><code>return</code> <code>workers</code>
<code>if</code> <code>__name__ </code><code>=</code><code>=</code> <code>'__main__'</code><code>:</code>
<code> </code><code>Producer()</code>
ps:
<code>self</code><code>.ioLock.acquire() </code>
<code>...</code>
<code>self</code><code>.ioLock.release()</code>
可用
<code>with </code><code>self</code><code>.ioLock:</code>
<code> </code><code>...</code>
替代。
<code>#date: 2017-03-14</code>
<code>#summary: 一個子程序生産,一個子程序消費</code>
<code> </code>
<code>import</code> <code>os, sys, time</code>
<code>from</code> <code>multiprocessing </code><code>import</code> <code>Process, Pool, Queue, Manager</code>
<code>#生産</code>
<code>def</code> <code>Produce(q):</code>
<code> </code><code>print</code><code>(</code><code>'Produce %d ...'</code> <code>%</code> <code>os.getpid())</code>
<code> </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>1</code><code>, </code><code>20</code><code>):</code>
<code> </code><code>while</code> <code>q.full():</code>
<code> </code><code>print</code><code>(</code><code>'sleep %d/%d ...'</code> <code>%</code> <code>(i, q.qsize()))</code>
<code> </code><code>time.sleep(</code><code>1</code><code>)</code>
<code> </code><code>q.put(i)</code>
<code> </code><code>q.put(</code><code>0</code><code>) </code><code>#用0通知結束</code>
<code>#消費</code>
<code>def</code> <code>Consume(q):</code>
<code> </code><code>print</code><code>(</code><code>'Consume %d ...'</code> <code>%</code> <code>os.getpid())</code>
<code> </code><code>while</code> <code>True</code><code>:</code>
<code> </code><code>num </code><code>=</code> <code>q.get()</code>
<code> </code><code>if</code> <code>0</code> <code>=</code><code>=</code> <code>num: </code><code>#收到結束信号</code>
<code> </code><code>print</code><code>(</code><code>'receive 0'</code><code>)</code>
<code> </code><code>break</code>
<code> </code><code>print</code><code>(</code><code>'Consumer '</code> <code>+</code> <code>str</code><code>(num))</code>
<code> </code><code>time.sleep(</code><code>2</code><code>)</code>
<code> </code><code>print</code><code>(</code><code>'Consumer end '</code> <code>+</code> <code>str</code><code>(num))</code>
<code>if</code> <code>__name__ </code><code>=</code><code>=</code> <code>'__main__'</code><code>: </code>
<code> </code><code>q </code><code>=</code> <code>Queue(</code><code>10</code><code>) </code><code>#可用</code>
<code> </code><code>q </code><code>=</code> <code>Manager().Queue(</code><code>10</code><code>) </code><code>#可用</code>
<code> </code><code>print</code><code>(os.getpid())</code>
<code> </code><code>producerProcess </code><code>=</code> <code>Process(target</code><code>=</code><code>Produce, args</code><code>=</code><code>(q,)) </code><code>#生産程序</code>
<code> </code><code>consumerProcess </code><code>=</code> <code>Process(target</code><code>=</code><code>Consume, args</code><code>=</code><code>(q,)) </code><code>#消費程序</code>
<code> </code><code>producerProcess.start()</code>
<code> </code><code>consumerProcess.start()</code>
<code> </code><code>producerProcess.join()</code>
<code> </code><code>consumerProcess.join()</code>
<code>#summary: 一個子程序生産,程序池消費</code>
<code>def</code> <code>Produce(q, poolSize):</code>
<code> </code><code>print</code><code>(</code><code>'Produce ...'</code><code>)</code>
<code> </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>1</code><code>, </code><code>100</code><code>):</code>
<code> </code><code>for</code> <code>_ </code><code>in</code> <code>range</code><code>(</code><code>0</code><code>, poolSize):</code>
<code> </code><code>q.put(</code><code>0</code><code>) </code><code>#用0通知結束</code>
<code> </code><code>print</code><code>(</code><code>'Consume ...'</code><code>)</code>
<code> </code><code>#q = Queue(10) #不可用</code>
<code> </code><code>poolSize </code><code>=</code> <code>4</code>
<code> </code><code>producerProcess </code><code>=</code> <code>Process(target</code><code>=</code><code>Produce, args</code><code>=</code><code>(q, poolSize)) </code><code>#生産程序</code>
<code> </code><code>consumerPool </code><code>=</code> <code>Pool(processes</code><code>=</code><code>poolSize) </code><code>#消費程序池,預設子程序個數為os.cpu_count()</code>
<code> </code><code>consumerPool.apply_async(func</code><code>=</code><code>Consume, args</code><code>=</code><code>(q,))</code>
<code> </code><code>consumerPool.close()</code>
<code> </code><code>consumerPool.join()</code>
<code> </code>
<code>#summary: 主程序生産,程序池消費</code>
<code> </code><code>num </code><code>=</code> <code>q.get()</code>
<code> </code><code>print</code><code>(</code><code>'Consume %d ...'</code> <code>%</code> <code>num)</code>
<code> </code><code>time.sleep(</code><code>2</code><code>)</code>
<code> </code><code>print</code><code>(</code><code>'Consumer %d over'</code> <code>%</code> <code>num)</code>
<code> </code>
<code>if</code> <code>__name__ </code><code>=</code><code>=</code> <code>'__main__'</code><code>: </code>
<code> </code><code>pool </code><code>=</code> <code>Pool(processes </code><code>=</code> <code>4</code><code>)</code>
<code> </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>1</code><code>, </code><code>100</code><code>): </code><code>#生産</code>
<code> </code><code>print</code><code>(</code><code>'sleep %d ...'</code> <code>%</code> <code>q.qsize())</code>
<code> </code><code>print</code><code>(i)</code>
<code> </code><code>pool.apply_async(Consume, (q,))</code>
<code> </code>
<code> </code><code>pool.close()</code>
<code> </code><code>pool.join()</code>
*** Updated 2016-01-06 ***
一個好玩的例子:
<code>#date: 2016-01-06</code>
<code>#summary: 一個多程序的好玩例子</code>
<code>from</code> <code>multiprocessing </code><code>import</code> <code>Pool</code>
<code>cur_dir_fullpath </code><code>=</code> <code>os.path.dirname(os.path.abspath(__file__))</code>
<code>g_List </code><code>=</code> <code>[</code><code>'a'</code><code>]</code>
<code>#修改全局變量g_List</code>
<code>def</code> <code>ModifyDict_1():</code>
<code> </code><code>global</code> <code>g_List</code>
<code> </code><code>g_List.append(</code><code>'b'</code><code>)</code>
<code>#修改全局變量g_List </code>
<code>def</code> <code>ModifyDict_2():</code>
<code> </code><code>g_List.append(</code><code>'c'</code><code>)</code>
<code>#處理一個</code>
<code>def</code> <code>ProcOne(num):</code>
<code> </code><code>print</code><code>(</code><code>'ProcOne '</code> <code>+</code> <code>str</code><code>(num) </code><code>+</code> <code>', g_List:'</code> <code>+</code> <code>repr</code><code>(g_List))</code>
<code>#處理所有</code>
<code>def</code> <code>ProcAll(): </code>
<code> </code><code>#ProcOne(i)</code>
<code> </code><code>#pool.apply(ProcOne, (i,))</code>
<code> </code><code>pool.apply_async(ProcOne, (i,))</code>
<code> </code><code>pool.join() </code>
<code>ModifyDict_1() </code><code>#修改全局變量g_List</code>
<code> </code><code>ModifyDict_2() </code><code>#修改全局變量g_List</code>
<code> </code><code>print</code><code>(</code><code>'In main g_List :'</code> <code>+</code> <code>repr</code><code>(g_List))</code>
<code> </code><code>ProcAll()</code>
Windows7 下運作的結果:
<code>λ python3 demo.py</code>
<code>In main g_List :['a', 'b', 'c']</code>
<code>ProcOne 1, g_List:['a', 'b']</code>
<code>ProcOne 2, g_List:['a', 'b']</code>
<code>ProcOne 3, g_List:['a', 'b']</code>
<code>ProcOne 4, g_List:['a', 'b']</code>
<code>ProcOne 5, g_List:['a', 'b']</code>
<code>ProcOne 6, g_List:['a', 'b']</code>
<code>ProcOne 7, g_List:['a', 'b']</code>
<code>ProcOne 8, g_List:['a', 'b']</code>
<code>ProcOne 9, g_List:['a', 'b']</code>
<code>ProcOne 10, g_List:['a', 'b']</code>
<code>ProcOne 11, g_List:['a', 'b']</code>
<code>ProcOne 12, g_List:['a', 'b']</code>
<code>ProcOne 13, g_List:['a', 'b']</code>
<code>ProcOne 14, g_List:['a', 'b']</code>
<code>ProcOne 15, g_List:['a', 'b']</code>
<code>ProcOne 16, g_List:['a', 'b']</code>
<code>ProcOne 17, g_List:['a', 'b']</code>
<code>ProcOne 18, g_List:['a', 'b']</code>
<code>ProcOne 19, g_List:['a', 'b']</code>
Ubuntu 14.04下運作的結果:
<code>ProcOne 1, g_List:['a', 'b', 'c']</code>
<code>ProcOne 2, g_List:['a', 'b', 'c']</code>
<code>ProcOne 3, g_List:['a', 'b', 'c']</code>
<code>ProcOne 5, g_List:['a', 'b', 'c']</code>
<code>ProcOne 4, g_List:['a', 'b', 'c']</code>
<code>ProcOne 8, g_List:['a', 'b', 'c']</code>
<code>ProcOne 9, g_List:['a', 'b', 'c']</code>
<code>ProcOne 7, g_List:['a', 'b', 'c']</code>
<code>ProcOne 11, g_List:['a', 'b', 'c']</code>
<code>ProcOne 6, g_List:['a', 'b', 'c']</code>
<code>ProcOne 12, g_List:['a', 'b', 'c']</code>
<code>ProcOne 13, g_List:['a', 'b', 'c']</code>
<code>ProcOne 10, g_List:['a', 'b', 'c']</code>
<code>ProcOne 14, g_List:['a', 'b', 'c']</code>
<code>ProcOne 15, g_List:['a', 'b', 'c']</code>
<code>ProcOne 16, g_List:['a', 'b', 'c']</code>
<code>ProcOne 17, g_List:['a', 'b', 'c']</code>
<code>ProcOne 18, g_List:['a', 'b', 'c']</code>
<code>ProcOne 19, g_List:['a', 'b', 'c']</code>
相關閱讀:
本文轉自walker snapshot部落格51CTO部落格,原文連結http://blog.51cto.com/walkerqt/1414703如需轉載請自行聯系原作者
RQSLT