上次說了很多Linux下程序相關知識,這邊不再複述,下面來說說Python的并發程式設計,如有錯誤歡迎提出~
如果遇到聽不懂的可以看上一次的文章:https://www.cnblogs.com/dotnetcrazy/p/9363810.html
官方文檔:https://docs.python.org/3/library/concurrency.html
線上預覽:http://github.lesschina.com/python/base/concurrency/2.并發程式設計-程序篇.html
官方文檔:https://docs.python.org/3/library/multiprocessing.html
Code:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/PythonProcess
Python的程序建立非常友善,看個案例:(這種方法通用,fork隻适用于Linux系)
運作結果:
建立子程序時,傳入一個執行函數和參數,用start()方法來啟動程序即可
<code>join()</code>方法是父程序回收子程序的封裝(主要是回收僵屍子程序(點我))
其他參數可以參考源碼 or 文檔,貼一下源碼的<code>init</code>方法:
<code>def __init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)</code>
擴充:<code>name:為目前程序執行個體的别名</code>
<code>p.is_alive()</code> 判斷程序執行個體p是否還在執行
<code>p.terminate()</code> 終止程序(發<code>SIGTERM</code>信号)
上面的案例如果用OOP來實作就是這樣:(如果不指定方法,預設調Run方法)
PS:<code>multiprocessing.Process</code>自行處理僵死程序,不用像<code>os.fork</code>那樣自己建立信号處理程式、安裝信号處理程式
現在說說裡面的一些門道(隻想用的可以忽略)
新版本的封裝可能多層,這時候可以看看Python3.3.X系列(這個算是Python3早期版本了,很多代碼都暴露出來,比較明了直覺)
multiprocessing.process.py
multiprocessing.popen_fork.py
關于斷言的簡單說明:(别泛濫)
如果條件為真,它什麼都不做,反之它觸發一個帶可選錯誤資訊的AssertionError
結果:
運作的時候可以指定<code>-O參數</code>來忽略<code>assert</code>,eg:
<code>python3 -O 0.assert.py</code>
擴充:
https://docs.python.org/3/library/unittest.html
https://www.cnblogs.com/shangren/p/8038935.html
多個程序就不需要自己手動去管理了,有Pool來幫你完成,先看個案例:
圖示:(join可以指定逾時時間,eg:<code>p.join(1)</code>)

調用<code>join()</code>之前必須先調用<code>close()</code>,調用<code>close()</code>之後就不能繼續添加新的<code>Process</code>了(下面會說為什麼)
驗證一下Pool的預設大小是CPU的核數,看源碼:
multiprocessing.pool.py
源碼裡面<code>apply_async</code>方法,是有回調函數(callback)的
來看個例子:(和JQ很像)
輸出:
接着上面繼續拓展,補充說說擷取函數傳回值。<code>上面是通過成功後的回調函數來擷取傳回值</code>,這次說說自帶的方法:
輸出:(<code>apply_async</code>傳回一個<code>ApplyResult</code>類,裡面有個get方法可以擷取傳回值)
再舉個例子,順便把<code>Pool</code>裡面的<code>map</code>和<code>imap</code>方法搞個案例(類比jq)
微微看一眼源碼:(基礎忘了可以檢視==> 點我 )
擴充:優雅殺死子程序的探讨 https://segmentfault.com/q/1010000005077517
官方文檔:https://docs.python.org/3/library/subprocess.html
還記得之前李代桃僵的<code>execlxxx</code>系列嗎?
這不,<code>subprocess</code>就是它的一層封裝,當然了要強大的多,先看個例子:(以<code>os.execlp</code>的例子為引)
輸出
現在看下官方的文檔描述來了解一下:
其實看看源碼很有意思:(内部其實就是調用的<code>os.popen</code>【程序先導篇講程序守護的時候用過】)
傳回值類型:<code>CompletedProcess</code>
再來個案例體會一下友善之處:
圖示:
再來個強大的案例(互動的程式都可以,比如 <code>ftp</code>,<code>nslookup</code> 等等):<code>popen1.communicate</code>
注意點:如果逾時到期,則子程序不會被終止,需要自己處理一下(官方提醒)
這個等會說程序間通信還會說,是以簡單舉個例子,老規矩拿<code>ps aux | grep bash</code>說事:
輸出:(以前案例:程序間通信~PIPE匿名管道)
其他擴充可以看看這篇文章:subprocess與Popen()
這個比較有意思,看個案例:
按照道理應該子程序自己寫完自己讀了,和上次講得不一樣啊?不急,先看看源碼:
看看<code>connection.Pipe</code>方法的定義部分,是不是雙向通信就看你是否設定<code>duplex=True</code>
通過源碼知道了,原來雙工是通過socket搞的啊~
再看個和原來一樣效果的案例:(不用關來關去的了,友善!)
輸出:(可以思考下為什麼<code>start換個位置就死鎖</code>,提示:<code>阻塞讀寫</code>)
再舉個<code>Pool</code>的例子,咱們就進入今天的重點了:
看看源碼就了解了:看看Pool的join是啥情況?看源碼:
在pool的<code>__init__</code>的方法中,這幾個屬性:
将池程序的數量增加到指定的數量,join的時候會使用這個清單
注意:池的方法隻能由建立它的程序使用
一步步的設局,從底層的的<code>pipe()</code>-><code>os.pipe</code>-><code>PIPE</code>,現在終于到<code>Queue</code>了,心酸啊,明知道上面兩個項目
裡面基本上不會用,但為了你們能看懂源碼,說了這麼久<code>%>_<%</code>其實以後當我們從<code>Queue</code>說到<code>MQ</code>和<code>RPC</code>之後,現在
講得這些程序間通信(<code>IPC</code>)也基本上不會用了,但本質你得清楚,我盡量多分析點源碼,這樣你們以後看開源項目壓力會很小
歡迎批評指正~
輸出:(<code>get</code>和<code>put</code>預設是阻塞等待的)
先看看<code>Queue</code>的初始化方法:(不指定大小就是最大隊列數)
關于<code>get</code>和<code>put</code>是阻塞的問題,看下源碼探探究竟:
<code>q.get()</code>:收消息
<code>queue.put()</code>:發消息
非阻塞<code>get_nowait</code>和<code>put_nowait</code>本質其實也是調用了<code>get</code>和<code>put</code>方法:
說這麼多不如來個例子看看:
補充說明一下:
<code>q._maxsize</code> 隊列數(盡量不用<code>_</code>開頭的屬性和方法)
<code>q.qsize()</code>檢視目前隊列中存在幾條消息
<code>q.full()</code>檢視是否滿了
<code>q.empty()</code>檢視是否為空
再看個簡單點的子程序間通信:(鋪墊demo)
輸出:(<code>time python3 5.queue2.py</code>)
多程序基本上都是用<code>pool</code>,可用上面說的<code>Queue</code>方法怎麼報錯了?
輸出:(無法将<code>multiprocessing.Queue</code>對象傳遞給<code>Pool</code>方法)
下面會詳說,先看一下正确方式:(隊列換了一下,其他都一樣<code>Manager().Queue()</code>)
再抛個思考題:(Linux)
輸出:(為啥這樣也可以【提示:<code>fork</code>】)
官方參考:https://docs.python.org/3/library/multiprocessing.html
spawn:(Win預設,Linux下也可以用【>=3.4】)
父程序啟動一個新的python解釋器程序。
子程序隻會繼承運作程序對象run()方法所需的那些資源。
不會繼承父程序中不必要的檔案描述符和句柄。
與使用fork或forkserver相比,使用此方法啟動程序相當慢。
可在Unix和Windows上使用。Windows上的預設設定。
fork:(Linux下預設)
父程序用于os.fork()分叉Python解釋器。
子程序在開始時與父程序相同(這時候内部變量之類的還沒有被修改)
父程序的所有資源都由子程序繼承(用到多線程的時候可能有些問題)
僅适用于Unix。Unix上的預設值。
forkserver:(常用)
當程式啟動并選擇forkserver start方法時,将啟動伺服器程序。
從那時起,每當需要一個新程序時,父程序就會連接配接到伺服器并請求它分叉一個新程序。
fork伺服器程序是單線程的,是以它可以安全使用os.fork()。沒有不必要的資源被繼承。
可在Unix平台上使用,支援通過Unix管道傳遞檔案描述符。
這塊官方文檔很詳細,貼下官方的2個案例:
通過<code>multiprocessing.set_start_method(xxx)</code>來設定啟動的上下文類型
輸出:(<code>set_start_method</code>不要過多使用)
如果你把設定啟動上下文注釋掉:(消耗的總時間少了很多)
也可以通過<code>multiprocessing.get_context(xxx)</code>擷取指定類型的上下文
輸出:(<code>get_context</code>在Python源碼裡用的比較多,so=>也建議大家這麼用)
從結果來看,總耗時也少了很多
說下日記相關的事情:
先看下<code>multiprocessing</code>裡面的日記記錄:
更多<code>Loging</code>子產品内容可以看官方文檔:https://docs.python.org/3/library/logging.html
這個是内部代碼,看看即可:
<code>Logging</code>之前也有提過,可以看看:https://www.cnblogs.com/dotnetcrazy/p/9333792.html#2.裝飾器傳參的擴充(可傳可不傳)
來個案例:
之前忘記說了~現在快結尾了,補充一下程序5态:(來個草圖)
應該盡量避免程序間狀态共享,但需求在那,是以還是得研究,官方推薦了兩種方式:
之前說過<code>Queue</code>:在<code>Process</code>之間使用沒問題,用到<code>Pool</code>,就使用<code>Manager().xxx</code>,<code>Value</code>和<code>Array</code>,就不太一樣了:
看看源碼:(Manager裡面的Array和Process共享的Array不是一個概念,而且也沒有同步機制)
以<code>Process</code>為例看看怎麼用:
輸出:(<code>Value</code>和<code>Array</code>是<code>程序|線程</code>安全的)
類型方面的對應關系:
這兩個類型其實是<code>ctypes</code>類型,更多的類型可以去` multiprocessing.sharedctypes`檢視,來張圖:
回頭解決<code>GIL</code>的時候會用到<code>C</code>系列或者<code>Go</code>系列的共享庫(講線程的時候會說)
關于程序安全的補充說明:對于原子性操作就不用說,鐵定安全,但注意一下<code>i+=1</code>并不是原子性操作:
輸出:(理論上應該是:5×1000=5000)
稍微改一下才行:(程序安全:隻是提供了安全的方法,并不是什麼都不用你操心了)
輸出:(關于鎖這塊,後面講線程的時候會詳說,看看就好【文法的确比C#麻煩點】)
看看源碼:(之前探讨如何優雅的殺死子程序,其中就有一種方法使用了<code>Value</code>)
擴充部分可以檢視這篇文章:http://blog.51cto.com/11026142/1874807
官方文檔:https://docs.python.org/3/library/multiprocessing.html#managers
有一個伺服器程序負責維護所有的對象,而其他程序連接配接到該程序,通過代理對象操作伺服器程序當中的對象
通過傳回的經理<code>Manager()</code>将支援類型<code>list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue</code>
舉個簡單例子(後面還會再說):(本質其實就是<code>多個程序通過代理,共同操作服務端内容</code>)
伺服器程序管理器比使用共享記憶體對象更靈活,因為它們可以支援任意對象類型。此外,單個管理器可以通過網絡在不同計算機上的程序共享。但是,它們比使用共享記憶體慢(畢竟有了<code>“中介”</code>)
同步問題依然需要注意一下,舉個例子體會一下:
擴充補充:
<code>multiprocessing.Lock</code>是一個程序安全對象,是以您可以将其直接傳遞給子程序并在所有程序中安全地使用它。
大多數可變Python對象(如list,dict,大多數類)不能保證程序中安全,是以它們在程序間共享時需要使用<code>Manager</code>
多程序模式的缺點是建立程序的代價大,在<code>Unix/Linux</code>系統下,用<code>fork</code>調用還行,在<code>Windows</code>下建立程序開銷巨大。
Manager這塊官方文檔很詳細,可以看看:https://docs.python.org/3/library/multiprocessing.html#managers
<code>WinServer</code>的可以參考這篇 or 這篇埋坑記(Manager一般都是部署在Linux的,Win的用戶端不影響)
還記得之前的:無法将multiprocessing.Queue對象傳遞給Pool方法嗎?其實一般都是這兩種方式解決的:
使用Manager需要生成另一個程序來托管Manager伺服器。 并且所有擷取/釋放鎖的調用都必須通過IPC發送到該伺服器。
使用初始化程式在池建立時傳遞正常<code>multiprocessing.Queue()</code>這将使<code>Queue</code>執行個體在所有子程序中全局共享
再看一下Pool的<code>__init__</code>方法:
第一種方法不夠輕量級,在講案例前,稍微說下第二種方法:(也算把上面留下的懸念解了)
輸出:(就是在初始化Pool的時候,傳了初始化執行的方法并傳了參數:<code>alizer=init, initargs=(queue, ))</code>)
Win下亦通用(win下沒有<code>os.getgid</code>)
有了<code>1.6</code>的基礎,咱們來個例子練練:
<code>BaseManager</code>的縮略圖:
伺服器端代碼:
用戶端代碼1:
用戶端代碼2:
輸出圖示:
伺服器運作在Linux的測試:
其實還有一部分内容沒說,明天得出去辦點事,先到這吧,後面找機會繼續帶一下
參考文章:
程序共享的探讨:python-sharing-a-lock-between-processes
多程序鎖的探讨:trouble-using-a-lock-with-multiprocessing-pool-pickling-error
JoinableQueue擴充:https://www.cnblogs.com/smallmars/p/7093603.html
Python多程序程式設計:https://www.cnblogs.com/kaituorensheng/p/4445418.html
有深度但需要辯證看的兩篇文章:
跨程序對象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue
關于Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process
Python的線程、并行、協程下次說
示例代碼:https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency
先簡單說下概念(其實之前也有說,是以簡說下):
并發:同時做多件事情
多線程:并發的一種形式
并行處理:多線程的一種(線程池産生的一種并發類型,eg:異步程式設計)
響應式程式設計:一種程式設計模式,對事件進行響應(有點類似于JQ的事件)
Net裡面很少用程序,在以前基本上都是<code>線程+池+異步+并行+協程</code>
我這邊簡單引入一下,畢竟主要是寫Python的教程,Net隻是幫你們回顧一下,如果你發現還沒聽過這些概念,或者你的項目中還充斥着各種<code>Thread</code>和<code>ThreadPool</code>的話,真的得系統的學習一下了,現在官網的文檔已經很完善了,記得早幾年啥都沒有,也隻能挖那些外國開源項目:
https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency
Task的目的其實就是為了簡化<code>Thread</code>和<code>ThreadPool</code>的代碼,下面一起看看吧:
異步用起來比較簡單,一般IO,DB,Net用的比較多,很多時候都會采用重試機制,舉個簡單的例子:
然後補充說下Task異常的問題,當你await的時候如果有異常會抛出,在第一個await處捕獲處理即可
如果<code>async</code>和<code>await</code>就是了解不了的可以這樣想:<code>async</code>就是為了讓<code>await</code>生效(為了向後相容)
對了,如果傳回的是void,你設定成Task就行了,觸發是類似于事件之類的方法才使用void,不然沒有傳回值都是使用Task
項目裡經常有這麼一個場景:等待一組任務完成後再執行某個操作,看個引入案例:
再舉一個場景:同時調用多個同效果的API,有一個傳回就好了,其他的忽略
一個async方法被await調用後,當它恢複運作時就會回到原來的上下文中運作。
如果你的Task不再需要上下文了可以使用:<code>task.ConfigureAwait(false)</code>,eg:寫個日記還要啥上下文?
逆天的建議是:在核心代碼裡面一種使用<code>ConfigureAwait</code>,使用者頁面相關代碼,不需要上下文的加上
其實如果有太多await在上下文裡恢複那也是比較卡的,使用<code>ConfigureAwait</code>之後,被暫停後會線上程池裡面繼續運作
再看一個場景:比如一個耗時操作,我需要指定它的逾時時間:
異步這塊簡單回顧就不說了,留兩個擴充,你們自行探讨:
進度方面的可以使用<code>IProgress<T></code>,就當留個作業自己摸索下吧~
使用了異步之後盡量避免使用<code>task.Wait</code> or <code>task.Result</code>,這樣可以避免死鎖
Task其他新特征去官網看看吧,引入到此為止了。
這個其實出來很久了,現在基本上都是用<code>PLinq</code>比較多點,主要就是:
資料并行:重點在處理資料(eg:聚合)
任務并行:重點在執行任務(每個任務塊盡可能獨立,越獨立效率越高)
以前都是<code>Parallel.ForEach</code>這麼用,現在和Linq結合之後非常友善<code>.AsParallel()</code>就OK了
說很抽象看個簡單案例:
正常執行的結果應該是:
并行之後就是這樣了(不管順序了):
當然了,如果你就是對順序有要求可以使用:<code>.AsOrdered()</code>
其實實際項目中,使用并行的時候:任務時間适中,太長不适合,太短也不适合
記得大家在項目裡經常會用到如<code>Sum</code>,<code>Count</code>等聚合函數,其實這時候使用并行就很合适
time dotnet PLINQ.dll
不使用并行:(稍微多了點,CPU越密集差距越大)
其實聚合有一個通用方法,可以支援複雜的聚合:(以上面sum為例)
稍微擴充一下,PLinq也是支援取消的,<code>.WithCancellation(CancellationToken)</code>
Token的用法和上面一樣,就不複述了,如果需要和異步結合,一個<code>Task.Run</code>就可以把并行任務交給線程池了
也可以使用Task的異步方法,設定逾時時間,這樣PLinq逾時了也就終止了
PLinq這麼友善,其實也是有一些小弊端的,比如它會直接最大程度的占用系統資源,可能會影響其他的任務,而傳統的Parallel則會動态調整
這個PLinq好像沒有對應的方法,有新文法你可以說下,來舉個例子:
取消也支援:
其實還有一些比如資料流和響應程式設計沒說,這個之前都是用第三方庫,剛才看官網文檔,好像已經支援了,是以就不賣弄了,感興趣的可以去看看,其實項目裡面有流資料相關的架構,eg:<code>Spark</code>,都是比較成熟的解決方案了基本上也不太使用這些了。
然後還有一些沒說,比如NetCore裡面不可變類型(清單、字典、集合、隊列、棧、線程安全字典等等)以及限流、任務排程等,這些關鍵詞我提一下,也友善你去搜尋自己學習拓展
先到這吧,其他的自己探索一下吧,最後貼一些Nuget庫,你可以針對性的使用:
資料流:<code>Microsoft.Tpl.Dataflow</code>
響應程式設計(Linq的Rx操作):<code>Rx-Main</code>
不可變類型:<code>Microsoft.Bcl.Immutable</code>
不得不感慨一句,微軟媽媽真的花了很多功夫,Net的并發程式設計比Python省心多了(完)
作者:毒逆天
出處:https://www.cnblogs.com/dotnetcrazy
打賞:<b>18i4JpL6g54yAPAefdtgqwRrZ43YJwAV5z</b>
本文版權歸作者和部落格園共有。歡迎轉載,但必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接!