天天看點

Python自定義程序池(生産者/消費者模型)

代碼說明一切:

<code>#encoding=utf-8</code>

<code>#author: walker</code>

<code>#date: 2014-05-21</code>

<code>#summary: 自定義程序池周遊目錄下檔案</code>

<code>from</code> <code>multiprocessing </code><code>import</code> <code>Process, Queue, Lock</code>

<code>import</code> <code>time, os</code>

<code>#消費者</code>

<code>class</code> <code>Consumer(Process):</code>

<code>    </code><code>def</code> <code>__init__(</code><code>self</code><code>, queue, ioLock):</code>

<code>        </code><code>super</code><code>(Consumer, </code><code>self</code><code>).__init__()</code>

<code>        </code><code>self</code><code>.queue </code><code>=</code> <code>queue</code>

<code>        </code><code>self</code><code>.ioLock </code><code>=</code> <code>ioLock</code>

<code>        </code> 

<code>    </code><code>def</code> <code>run(</code><code>self</code><code>):</code>

<code>        </code><code>while</code> <code>True</code><code>:</code>

<code>            </code><code>task </code><code>=</code> <code>self</code><code>.queue.get()   </code><code>#隊列中無任務時,會阻塞程序</code>

<code>            </code><code>if</code> <code>isinstance</code><code>(task, </code><code>str</code><code>) </code><code>and</code> <code>task </code><code>=</code><code>=</code> <code>'quit'</code><code>:</code>

<code>                </code><code>break</code><code>;</code>

<code>            </code><code>time.sleep(</code><code>1</code><code>)   </code><code>#假定任務處理需要1秒鐘</code>

<code>            </code><code>self</code><code>.ioLock.acquire()</code>

<code>            </code><code>print</code><code>( </code><code>str</code><code>(os.getpid()) </code><code>+</code> <code>'  '</code> <code>+</code> <code>task)</code>

<code>            </code><code>self</code><code>.ioLock.release()</code>

<code>        </code><code>self</code><code>.ioLock.acquire()</code>

<code>        </code><code>print</code> <code>'Bye-bye'</code>

<code>        </code><code>self</code><code>.ioLock.release()</code>

<code>#生産者       </code>

<code>def</code> <code>Producer():</code>

<code>    </code><code>queue </code><code>=</code> <code>Queue()    </code><code>#這個隊列是程序/線程安全的</code>

<code>    </code><code>ioLock </code><code>=</code> <code>Lock()</code>

<code>    </code><code>subNum </code><code>=</code> <code>4</code>    <code>#子程序數量</code>

<code>    </code><code>workers </code><code>=</code> <code>build_worker_pool(queue, ioLock, subNum)</code>

<code>    </code><code>start_time </code><code>=</code> <code>time.time()</code>

<code>    </code> 

<code>    </code><code>for</code> <code>parent, dirnames, filenames </code><code>in</code> <code>os.walk(r</code><code>'D:\test'</code><code>):</code>

<code>        </code><code>for</code> <code>filename </code><code>in</code> <code>filenames:</code>

<code>            </code><code>queue.put(filename)</code>

<code>            </code><code>ioLock.acquire()   </code>

<code>            </code><code>print</code><code>(</code><code>'qsize:'</code> <code>+</code> <code>str</code><code>(queue.qsize()))</code>

<code>            </code><code>ioLock.release()</code>

<code>            </code><code>while</code> <code>queue.qsize() &gt; subNum </code><code>*</code> <code>10</code><code>:  </code><code>#控制隊列中任務數量</code>

<code>                </code><code>time.sleep(</code><code>1</code><code>)</code>

<code>            </code> 

<code>    </code><code>for</code> <code>worker </code><code>in</code> <code>workers:</code>

<code>        </code><code>queue.put(</code><code>'quit'</code><code>)</code>

<code>        </code><code>worker.join()</code>

<code>    </code><code>ioLock.acquire()   </code>

<code>    </code><code>print</code><code>(</code><code>'Done! Time taken: {}'</code><code>.</code><code>format</code><code>(time.time() </code><code>-</code> <code>start_time))</code>

<code>    </code><code>ioLock.release()</code>

<code>#建立程序池</code>

<code>def</code> <code>build_worker_pool(queue, ioLock, size):</code>

<code>    </code><code>workers </code><code>=</code> <code>[]</code>

<code>    </code><code>for</code> <code>_ </code><code>in</code> <code>range</code><code>(size):</code>

<code>        </code><code>worker </code><code>=</code> <code>Consumer(queue, ioLock)</code>

<code>        </code><code>worker.start()</code>

<code>        </code><code>workers.append(worker)</code>

<code>    </code><code>return</code> <code>workers</code>

<code>if</code> <code>__name__ </code><code>=</code><code>=</code> <code>'__main__'</code><code>:</code>

<code>    </code><code>Producer()</code>

ps:

<code>self</code><code>.ioLock.acquire()  </code>

<code>...</code>

<code>self</code><code>.ioLock.release()</code>

可用

<code>with </code><code>self</code><code>.ioLock:</code>

<code>    </code><code>...</code>

替代。

<code>#date: 2017-03-14</code>

<code>#summary: 一個子程序生産,一個子程序消費</code>

<code> </code> 

<code>import</code> <code>os, sys, time</code>

<code>from</code> <code>multiprocessing </code><code>import</code> <code>Process, Pool, Queue, Manager</code>

<code>#生産</code>

<code>def</code> <code>Produce(q):</code>

<code>    </code><code>print</code><code>(</code><code>'Produce %d ...'</code> <code>%</code> <code>os.getpid())</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>1</code><code>, </code><code>20</code><code>):</code>

<code>        </code><code>while</code> <code>q.full():</code>

<code>            </code><code>print</code><code>(</code><code>'sleep %d/%d ...'</code> <code>%</code> <code>(i, q.qsize()))</code>

<code>            </code><code>time.sleep(</code><code>1</code><code>)</code>

<code>        </code><code>q.put(i)</code>

<code>    </code><code>q.put(</code><code>0</code><code>)        </code><code>#用0通知結束</code>

<code>#消費</code>

<code>def</code> <code>Consume(q):</code>

<code>    </code><code>print</code><code>(</code><code>'Consume %d ...'</code> <code>%</code> <code>os.getpid())</code>

<code>    </code><code>while</code> <code>True</code><code>:</code>

<code>        </code><code>num </code><code>=</code> <code>q.get()</code>

<code>        </code><code>if</code> <code>0</code> <code>=</code><code>=</code> <code>num: </code><code>#收到結束信号</code>

<code>            </code><code>print</code><code>(</code><code>'receive 0'</code><code>)</code>

<code>            </code><code>break</code>

<code>        </code><code>print</code><code>(</code><code>'Consumer '</code> <code>+</code> <code>str</code><code>(num))</code>

<code>        </code><code>time.sleep(</code><code>2</code><code>)</code>

<code>        </code><code>print</code><code>(</code><code>'Consumer end '</code> <code>+</code> <code>str</code><code>(num))</code>

<code>if</code> <code>__name__ </code><code>=</code><code>=</code> <code>'__main__'</code><code>: </code>

<code>    </code><code>q </code><code>=</code> <code>Queue(</code><code>10</code><code>)             </code><code>#可用</code>

<code>    </code><code>q </code><code>=</code> <code>Manager().Queue(</code><code>10</code><code>)       </code><code>#可用</code>

<code>    </code><code>print</code><code>(os.getpid())</code>

<code>    </code><code>producerProcess </code><code>=</code> <code>Process(target</code><code>=</code><code>Produce, args</code><code>=</code><code>(q,))     </code><code>#生産程序</code>

<code>    </code><code>consumerProcess </code><code>=</code> <code>Process(target</code><code>=</code><code>Consume, args</code><code>=</code><code>(q,))     </code><code>#消費程序</code>

<code>    </code><code>producerProcess.start()</code>

<code>    </code><code>consumerProcess.start()</code>

<code>    </code><code>producerProcess.join()</code>

<code>    </code><code>consumerProcess.join()</code>

<code>#summary: 一個子程序生産,程序池消費</code>

<code>def</code> <code>Produce(q, poolSize):</code>

<code>    </code><code>print</code><code>(</code><code>'Produce ...'</code><code>)</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>1</code><code>, </code><code>100</code><code>):</code>

<code>    </code><code>for</code> <code>_ </code><code>in</code> <code>range</code><code>(</code><code>0</code><code>, poolSize):</code>

<code>        </code><code>q.put(</code><code>0</code><code>)        </code><code>#用0通知結束</code>

<code>    </code><code>print</code><code>(</code><code>'Consume ...'</code><code>)</code>

<code>    </code><code>#q = Queue(10)                #不可用</code>

<code>    </code><code>poolSize </code><code>=</code> <code>4</code>

<code>    </code><code>producerProcess </code><code>=</code> <code>Process(target</code><code>=</code><code>Produce, args</code><code>=</code><code>(q, poolSize))       </code><code>#生産程序</code>

<code>    </code><code>consumerPool </code><code>=</code> <code>Pool(processes</code><code>=</code><code>poolSize)   </code><code>#消費程序池,預設子程序個數為os.cpu_count()</code>

<code>        </code><code>consumerPool.apply_async(func</code><code>=</code><code>Consume, args</code><code>=</code><code>(q,))</code>

<code>    </code><code>consumerPool.close()</code>

<code>    </code><code>consumerPool.join()</code>

<code>   </code> 

<code>#summary: 主程序生産,程序池消費</code>

<code>    </code><code>num </code><code>=</code> <code>q.get()</code>

<code>    </code><code>print</code><code>(</code><code>'Consume %d ...'</code> <code>%</code> <code>num)</code>

<code>    </code><code>time.sleep(</code><code>2</code><code>)</code>

<code>    </code><code>print</code><code>(</code><code>'Consumer %d over'</code> <code>%</code> <code>num)</code>

<code>     </code> 

<code>if</code> <code>__name__ </code><code>=</code><code>=</code> <code>'__main__'</code><code>:    </code>

<code>    </code><code>pool </code><code>=</code> <code>Pool(processes </code><code>=</code> <code>4</code><code>)</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>1</code><code>, </code><code>100</code><code>):     </code><code>#生産</code>

<code>            </code><code>print</code><code>(</code><code>'sleep %d ...'</code> <code>%</code> <code>q.qsize())</code>

<code>        </code><code>print</code><code>(i)</code>

<code>        </code><code>pool.apply_async(Consume, (q,))</code>

<code>         </code> 

<code>    </code><code>pool.close()</code>

<code>    </code><code>pool.join()</code>

*** Updated 2016-01-06 ***

一個好玩的例子:

<code>#date: 2016-01-06</code>

<code>#summary: 一個多程序的好玩例子</code>

<code>from</code> <code>multiprocessing </code><code>import</code> <code>Pool</code>

<code>cur_dir_fullpath </code><code>=</code> <code>os.path.dirname(os.path.abspath(__file__))</code>

<code>g_List </code><code>=</code> <code>[</code><code>'a'</code><code>]</code>

<code>#修改全局變量g_List</code>

<code>def</code> <code>ModifyDict_1():</code>

<code>    </code><code>global</code> <code>g_List</code>

<code>    </code><code>g_List.append(</code><code>'b'</code><code>)</code>

<code>#修改全局變量g_List    </code>

<code>def</code> <code>ModifyDict_2():</code>

<code>    </code><code>g_List.append(</code><code>'c'</code><code>)</code>

<code>#處理一個</code>

<code>def</code> <code>ProcOne(num):</code>

<code>    </code><code>print</code><code>(</code><code>'ProcOne '</code> <code>+</code> <code>str</code><code>(num) </code><code>+</code> <code>', g_List:'</code> <code>+</code> <code>repr</code><code>(g_List))</code>

<code>#處理所有</code>

<code>def</code> <code>ProcAll():    </code>

<code>        </code><code>#ProcOne(i)</code>

<code>        </code><code>#pool.apply(ProcOne, (i,))</code>

<code>        </code><code>pool.apply_async(ProcOne, (i,))</code>

<code>    </code><code>pool.join()  </code>

<code>ModifyDict_1()  </code><code>#修改全局變量g_List</code>

<code>    </code><code>ModifyDict_2()  </code><code>#修改全局變量g_List</code>

<code>    </code><code>print</code><code>(</code><code>'In main g_List :'</code> <code>+</code> <code>repr</code><code>(g_List))</code>

<code>    </code><code>ProcAll()</code>

Windows7 下運作的結果:

<code>λ python3 demo.py</code>

<code>In main g_List :['a', 'b', 'c']</code>

<code>ProcOne 1, g_List:['a', 'b']</code>

<code>ProcOne 2, g_List:['a', 'b']</code>

<code>ProcOne 3, g_List:['a', 'b']</code>

<code>ProcOne 4, g_List:['a', 'b']</code>

<code>ProcOne 5, g_List:['a', 'b']</code>

<code>ProcOne 6, g_List:['a', 'b']</code>

<code>ProcOne 7, g_List:['a', 'b']</code>

<code>ProcOne 8, g_List:['a', 'b']</code>

<code>ProcOne 9, g_List:['a', 'b']</code>

<code>ProcOne 10, g_List:['a', 'b']</code>

<code>ProcOne 11, g_List:['a', 'b']</code>

<code>ProcOne 12, g_List:['a', 'b']</code>

<code>ProcOne 13, g_List:['a', 'b']</code>

<code>ProcOne 14, g_List:['a', 'b']</code>

<code>ProcOne 15, g_List:['a', 'b']</code>

<code>ProcOne 16, g_List:['a', 'b']</code>

<code>ProcOne 17, g_List:['a', 'b']</code>

<code>ProcOne 18, g_List:['a', 'b']</code>

<code>ProcOne 19, g_List:['a', 'b']</code>

Ubuntu 14.04下運作的結果:

<code>ProcOne 1, g_List:['a', 'b', 'c']</code>

<code>ProcOne 2, g_List:['a', 'b', 'c']</code>

<code>ProcOne 3, g_List:['a', 'b', 'c']</code>

<code>ProcOne 5, g_List:['a', 'b', 'c']</code>

<code>ProcOne 4, g_List:['a', 'b', 'c']</code>

<code>ProcOne 8, g_List:['a', 'b', 'c']</code>

<code>ProcOne 9, g_List:['a', 'b', 'c']</code>

<code>ProcOne 7, g_List:['a', 'b', 'c']</code>

<code>ProcOne 11, g_List:['a', 'b', 'c']</code>

<code>ProcOne 6, g_List:['a', 'b', 'c']</code>

<code>ProcOne 12, g_List:['a', 'b', 'c']</code>

<code>ProcOne 13, g_List:['a', 'b', 'c']</code>

<code>ProcOne 10, g_List:['a', 'b', 'c']</code>

<code>ProcOne 14, g_List:['a', 'b', 'c']</code>

<code>ProcOne 15, g_List:['a', 'b', 'c']</code>

<code>ProcOne 16, g_List:['a', 'b', 'c']</code>

<code>ProcOne 17, g_List:['a', 'b', 'c']</code>

<code>ProcOne 18, g_List:['a', 'b', 'c']</code>

<code>ProcOne 19, g_List:['a', 'b', 'c']</code>

相關閱讀:

本文轉自walker snapshot部落格51CTO部落格,原文連結http://blog.51cto.com/walkerqt/1414703如需轉載請自行聯系原作者

RQSLT