天天看點

Python3 與 C# 并發程式設計之~ 程序篇

上次說了很多Linux下程序相關知識,這邊不再複述,下面來說說Python的并發程式設計,如有錯誤歡迎提出~

如果遇到聽不懂的可以看上一次的文章:https://www.cnblogs.com/dotnetcrazy/p/9363810.html

官方文檔:https://docs.python.org/3/library/concurrency.html

線上預覽:http://github.lesschina.com/python/base/concurrency/2.并發程式設計-程序篇.html

官方文檔:https://docs.python.org/3/library/multiprocessing.html

Code:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/PythonProcess

Python的程序建立非常友善,看個案例:(這種方法通用,fork隻适用于Linux系)

運作結果:

建立子程序時,傳入一個執行函數和參數,用start()方法來啟動程序即可

<code>join()</code>方法是父程序回收子程序的封裝(主要是回收僵屍子程序(點我))

其他參數可以參考源碼 or 文檔,貼一下源碼的<code>init</code>方法:

<code>def __init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)</code>

擴充:<code>name:為目前程序執行個體的别名</code>

<code>p.is_alive()</code> 判斷程序執行個體p是否還在執行

<code>p.terminate()</code> 終止程序(發<code>SIGTERM</code>信号)

上面的案例如果用OOP來實作就是這樣:(如果不指定方法,預設調Run方法)

PS:<code>multiprocessing.Process</code>自行處理僵死程序,不用像<code>os.fork</code>那樣自己建立信号處理程式、安裝信号處理程式

現在說說裡面的一些門道(隻想用的可以忽略)

新版本的封裝可能多層,這時候可以看看Python3.3.X系列(這個算是Python3早期版本了,很多代碼都暴露出來,比較明了直覺)

multiprocessing.process.py

multiprocessing.popen_fork.py

關于斷言的簡單說明:(别泛濫)

如果條件為真,它什麼都不做,反之它觸發一個帶可選錯誤資訊的AssertionError

結果:

運作的時候可以指定<code>-O參數</code>來忽略<code>assert</code>,eg:

<code>python3 -O 0.assert.py</code>

擴充:

https://docs.python.org/3/library/unittest.html

https://www.cnblogs.com/shangren/p/8038935.html

多個程序就不需要自己手動去管理了,有Pool來幫你完成,先看個案例:

圖示:(join可以指定逾時時間,eg:<code>p.join(1)</code>)

Python3 與 C# 并發程式設計之~ 程式篇

調用<code>join()</code>之前必須先調用<code>close()</code>,調用<code>close()</code>之後就不能繼續添加新的<code>Process</code>了(下面會說為什麼)

驗證一下Pool的預設大小是CPU的核數,看源碼:

multiprocessing.pool.py

源碼裡面<code>apply_async</code>方法,是有回調函數(callback)的

來看個例子:(和JQ很像)

輸出:

接着上面繼續拓展,補充說說擷取函數傳回值。<code>上面是通過成功後的回調函數來擷取傳回值</code>,這次說說自帶的方法:

輸出:(<code>apply_async</code>傳回一個<code>ApplyResult</code>類,裡面有個get方法可以擷取傳回值)

再舉個例子,順便把<code>Pool</code>裡面的<code>map</code>和<code>imap</code>方法搞個案例(類比jq)

微微看一眼源碼:(基礎忘了可以檢視==&gt; 點我 )

擴充:優雅殺死子程序的探讨 https://segmentfault.com/q/1010000005077517

官方文檔:https://docs.python.org/3/library/subprocess.html

還記得之前李代桃僵的<code>execlxxx</code>系列嗎?

這不,<code>subprocess</code>就是它的一層封裝,當然了要強大的多,先看個例子:(以<code>os.execlp</code>的例子為引)

輸出

現在看下官方的文檔描述來了解一下:

其實看看源碼很有意思:(内部其實就是調用的<code>os.popen</code>【程序先導篇講程序守護的時候用過】)

傳回值類型:<code>CompletedProcess</code>

再來個案例體會一下友善之處:

圖示:

Python3 與 C# 并發程式設計之~ 程式篇

再來個強大的案例(互動的程式都可以,比如 <code>ftp</code>,<code>nslookup</code> 等等):<code>popen1.communicate</code>

注意點:如果逾時到期,則子程序不會被終止,需要自己處理一下(官方提醒)

這個等會說程序間通信還會說,是以簡單舉個例子,老規矩拿<code>ps aux | grep bash</code>說事:

輸出:(以前案例:程序間通信~PIPE匿名管道)

其他擴充可以看看這篇文章:subprocess與Popen()

這個比較有意思,看個案例:

按照道理應該子程序自己寫完自己讀了,和上次講得不一樣啊?不急,先看看源碼:

看看<code>connection.Pipe</code>方法的定義部分,是不是雙向通信就看你是否設定<code>duplex=True</code>

通過源碼知道了,原來雙工是通過socket搞的啊~

再看個和原來一樣效果的案例:(不用關來關去的了,友善!)

輸出:(可以思考下為什麼<code>start換個位置就死鎖</code>,提示:<code>阻塞讀寫</code>)

再舉個<code>Pool</code>的例子,咱們就進入今天的重點了:

看看源碼就了解了:看看Pool的join是啥情況?看源碼:

在pool的<code>__init__</code>的方法中,這幾個屬性:

将池程序的數量增加到指定的數量,join的時候會使用這個清單

注意:池的方法隻能由建立它的程序使用

一步步的設局,從底層的的<code>pipe()</code>-&gt;<code>os.pipe</code>-&gt;<code>PIPE</code>,現在終于到<code>Queue</code>了,心酸啊,明知道上面兩個項目

裡面基本上不會用,但為了你們能看懂源碼,說了這麼久<code>%&gt;_&lt;%</code>其實以後當我們從<code>Queue</code>說到<code>MQ</code>和<code>RPC</code>之後,現在

講得這些程序間通信(<code>IPC</code>)也基本上不會用了,但本質你得清楚,我盡量多分析點源碼,這樣你們以後看開源項目壓力會很小

歡迎批評指正~

輸出:(<code>get</code>和<code>put</code>預設是阻塞等待的)

先看看<code>Queue</code>的初始化方法:(不指定大小就是最大隊列數)

關于<code>get</code>和<code>put</code>是阻塞的問題,看下源碼探探究竟:

<code>q.get()</code>:收消息

<code>queue.put()</code>:發消息

非阻塞<code>get_nowait</code>和<code>put_nowait</code>本質其實也是調用了<code>get</code>和<code>put</code>方法:

說這麼多不如來個例子看看:

補充說明一下:

<code>q._maxsize</code> 隊列數(盡量不用<code>_</code>開頭的屬性和方法)

<code>q.qsize()</code>檢視目前隊列中存在幾條消息

<code>q.full()</code>檢視是否滿了

<code>q.empty()</code>檢視是否為空

再看個簡單點的子程序間通信:(鋪墊demo)

輸出:(<code>time python3 5.queue2.py</code>)

多程序基本上都是用<code>pool</code>,可用上面說的<code>Queue</code>方法怎麼報錯了?

輸出:(無法将<code>multiprocessing.Queue</code>對象傳遞給<code>Pool</code>方法)

下面會詳說,先看一下正确方式:(隊列換了一下,其他都一樣<code>Manager().Queue()</code>)

再抛個思考題:(Linux)

輸出:(為啥這樣也可以【提示:<code>fork</code>】)

官方參考:https://docs.python.org/3/library/multiprocessing.html

spawn:(Win預設,Linux下也可以用【&gt;=3.4】)

父程序啟動一個新的python解釋器程序。

子程序隻會繼承運作程序對象run()方法所需的那些資源。

不會繼承父程序中不必要的檔案描述符和句柄。

與使用fork或forkserver相比,使用此方法啟動程序相當慢。

可在Unix和Windows上使用。Windows上的預設設定。

fork:(Linux下預設)

父程序用于os.fork()分叉Python解釋器。

子程序在開始時與父程序相同(這時候内部變量之類的還沒有被修改)

父程序的所有資源都由子程序繼承(用到多線程的時候可能有些問題)

僅适用于Unix。Unix上的預設值。

forkserver:(常用)

當程式啟動并選擇forkserver start方法時,将啟動伺服器程序。

從那時起,每當需要一個新程序時,父程序就會連接配接到伺服器并請求它分叉一個新程序。

fork伺服器程序是單線程的,是以它可以安全使用os.fork()。沒有不必要的資源被繼承。

可在Unix平台上使用,支援通過Unix管道傳遞檔案描述符。

這塊官方文檔很詳細,貼下官方的2個案例:

通過<code>multiprocessing.set_start_method(xxx)</code>來設定啟動的上下文類型

輸出:(<code>set_start_method</code>不要過多使用)

如果你把設定啟動上下文注釋掉:(消耗的總時間少了很多)

也可以通過<code>multiprocessing.get_context(xxx)</code>擷取指定類型的上下文

輸出:(<code>get_context</code>在Python源碼裡用的比較多,so=&gt;也建議大家這麼用)

從結果來看,總耗時也少了很多

說下日記相關的事情:

先看下<code>multiprocessing</code>裡面的日記記錄:

更多<code>Loging</code>子產品内容可以看官方文檔:https://docs.python.org/3/library/logging.html

這個是内部代碼,看看即可:

<code>Logging</code>之前也有提過,可以看看:https://www.cnblogs.com/dotnetcrazy/p/9333792.html#2.裝飾器傳參的擴充(可傳可不傳)

來個案例:

之前忘記說了~現在快結尾了,補充一下程序5态:(來個草圖)

Python3 與 C# 并發程式設計之~ 程式篇

應該盡量避免程序間狀态共享,但需求在那,是以還是得研究,官方推薦了兩種方式:

之前說過<code>Queue</code>:在<code>Process</code>之間使用沒問題,用到<code>Pool</code>,就使用<code>Manager().xxx</code>,<code>Value</code>和<code>Array</code>,就不太一樣了:

看看源碼:(Manager裡面的Array和Process共享的Array不是一個概念,而且也沒有同步機制)

以<code>Process</code>為例看看怎麼用:

輸出:(<code>Value</code>和<code>Array</code>是<code>程序|線程</code>安全的)

類型方面的對應關系:

這兩個類型其實是<code>ctypes</code>類型,更多的類型可以去` multiprocessing.sharedctypes`檢視,來張圖:

Python3 與 C# 并發程式設計之~ 程式篇

回頭解決<code>GIL</code>的時候會用到<code>C</code>系列或者<code>Go</code>系列的共享庫(講線程的時候會說)

關于程序安全的補充說明:對于原子性操作就不用說,鐵定安全,但注意一下<code>i+=1</code>并不是原子性操作:

輸出:(理論上應該是:5×1000=5000)

稍微改一下才行:(程序安全:隻是提供了安全的方法,并不是什麼都不用你操心了)

輸出:(關于鎖這塊,後面講線程的時候會詳說,看看就好【文法的确比C#麻煩點】)

看看源碼:(之前探讨如何優雅的殺死子程序,其中就有一種方法使用了<code>Value</code>)

擴充部分可以檢視這篇文章:http://blog.51cto.com/11026142/1874807

官方文檔:https://docs.python.org/3/library/multiprocessing.html#managers

有一個伺服器程序負責維護所有的對象,而其他程序連接配接到該程序,通過代理對象操作伺服器程序當中的對象

通過傳回的經理<code>Manager()</code>将支援類型<code>list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue</code>

舉個簡單例子(後面還會再說):(本質其實就是<code>多個程序通過代理,共同操作服務端内容</code>)

伺服器程序管理器比使用共享記憶體對象更靈活,因為它們可以支援任意對象類型。此外,單個管理器可以通過網絡在不同計算機上的程序共享。但是,它們比使用共享記憶體慢(畢竟有了<code>“中介”</code>)

同步問題依然需要注意一下,舉個例子體會一下:

擴充補充:

<code>multiprocessing.Lock</code>是一個程序安全對象,是以您可以将其直接傳遞給子程序并在所有程序中安全地使用它。

大多數可變Python對象(如list,dict,大多數類)不能保證程序中安全,是以它們在程序間共享時需要使用<code>Manager</code>

多程序模式的缺點是建立程序的代價大,在<code>Unix/Linux</code>系統下,用<code>fork</code>調用還行,在<code>Windows</code>下建立程序開銷巨大。

Manager這塊官方文檔很詳細,可以看看:https://docs.python.org/3/library/multiprocessing.html#managers

<code>WinServer</code>的可以參考這篇 or 這篇埋坑記(Manager一般都是部署在Linux的,Win的用戶端不影響)

還記得之前的:無法将multiprocessing.Queue對象傳遞給Pool方法嗎?其實一般都是這兩種方式解決的:

使用Manager需要生成另一個程序來托管Manager伺服器。 并且所有擷取/釋放鎖的調用都必須通過IPC發送到該伺服器。

使用初始化程式在池建立時傳遞正常<code>multiprocessing.Queue()</code>這将使<code>Queue</code>執行個體在所有子程序中全局共享

再看一下Pool的<code>__init__</code>方法:

第一種方法不夠輕量級,在講案例前,稍微說下第二種方法:(也算把上面留下的懸念解了)

輸出:(就是在初始化Pool的時候,傳了初始化執行的方法并傳了參數:<code>alizer=init, initargs=(queue, ))</code>)

Win下亦通用(win下沒有<code>os.getgid</code>)

Python3 與 C# 并發程式設計之~ 程式篇

有了<code>1.6</code>的基礎,咱們來個例子練練:

<code>BaseManager</code>的縮略圖:

Python3 與 C# 并發程式設計之~ 程式篇

伺服器端代碼:

用戶端代碼1:

用戶端代碼2:

輸出圖示:

Python3 與 C# 并發程式設計之~ 程式篇

伺服器運作在Linux的測試:

Python3 與 C# 并發程式設計之~ 程式篇

其實還有一部分内容沒說,明天得出去辦點事,先到這吧,後面找機會繼續帶一下

參考文章:

程序共享的探讨:python-sharing-a-lock-between-processes

多程序鎖的探讨:trouble-using-a-lock-with-multiprocessing-pool-pickling-error

JoinableQueue擴充:https://www.cnblogs.com/smallmars/p/7093603.html

Python多程序程式設計:https://www.cnblogs.com/kaituorensheng/p/4445418.html

有深度但需要辯證看的兩篇文章:

跨程序對象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue

關于Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process

 Python的線程、并行、協程下次說

示例代碼:https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency

先簡單說下概念(其實之前也有說,是以簡說下):

并發:同時做多件事情

多線程:并發的一種形式

并行處理:多線程的一種(線程池産生的一種并發類型,eg:異步程式設計)

響應式程式設計:一種程式設計模式,對事件進行響應(有點類似于JQ的事件)

Net裡面很少用程序,在以前基本上都是<code>線程+池+異步+并行+協程</code>

我這邊簡單引入一下,畢竟主要是寫Python的教程,Net隻是幫你們回顧一下,如果你發現還沒聽過這些概念,或者你的項目中還充斥着各種<code>Thread</code>和<code>ThreadPool</code>的話,真的得系統的學習一下了,現在官網的文檔已經很完善了,記得早幾年啥都沒有,也隻能挖那些外國開源項目:

https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency

Task的目的其實就是為了簡化<code>Thread</code>和<code>ThreadPool</code>的代碼,下面一起看看吧:

異步用起來比較簡單,一般IO,DB,Net用的比較多,很多時候都會采用重試機制,舉個簡單的例子:

然後補充說下Task異常的問題,當你await的時候如果有異常會抛出,在第一個await處捕獲處理即可

如果<code>async</code>和<code>await</code>就是了解不了的可以這樣想:<code>async</code>就是為了讓<code>await</code>生效(為了向後相容)

對了,如果傳回的是void,你設定成Task就行了,觸發是類似于事件之類的方法才使用void,不然沒有傳回值都是使用Task

項目裡經常有這麼一個場景:等待一組任務完成後再執行某個操作,看個引入案例:

再舉一個場景:同時調用多個同效果的API,有一個傳回就好了,其他的忽略

一個async方法被await調用後,當它恢複運作時就會回到原來的上下文中運作。

如果你的Task不再需要上下文了可以使用:<code>task.ConfigureAwait(false)</code>,eg:寫個日記還要啥上下文?

逆天的建議是:在核心代碼裡面一種使用<code>ConfigureAwait</code>,使用者頁面相關代碼,不需要上下文的加上

其實如果有太多await在上下文裡恢複那也是比較卡的,使用<code>ConfigureAwait</code>之後,被暫停後會線上程池裡面繼續運作

再看一個場景:比如一個耗時操作,我需要指定它的逾時時間:

異步這塊簡單回顧就不說了,留兩個擴充,你們自行探讨:

進度方面的可以使用<code>IProgress&lt;T&gt;</code>,就當留個作業自己摸索下吧~

使用了異步之後盡量避免使用<code>task.Wait</code> or <code>task.Result</code>,這樣可以避免死鎖

Task其他新特征去官網看看吧,引入到此為止了。

這個其實出來很久了,現在基本上都是用<code>PLinq</code>比較多點,主要就是:

資料并行:重點在處理資料(eg:聚合)

任務并行:重點在執行任務(每個任務塊盡可能獨立,越獨立效率越高)

以前都是<code>Parallel.ForEach</code>這麼用,現在和Linq結合之後非常友善<code>.AsParallel()</code>就OK了

說很抽象看個簡單案例:

正常執行的結果應該是:

并行之後就是這樣了(不管順序了):

當然了,如果你就是對順序有要求可以使用:<code>.AsOrdered()</code>

其實實際項目中,使用并行的時候:任務時間适中,太長不适合,太短也不适合

記得大家在項目裡經常會用到如<code>Sum</code>,<code>Count</code>等聚合函數,其實這時候使用并行就很合适

time dotnet PLINQ.dll

不使用并行:(稍微多了點,CPU越密集差距越大)

其實聚合有一個通用方法,可以支援複雜的聚合:(以上面sum為例)

稍微擴充一下,PLinq也是支援取消的,<code>.WithCancellation(CancellationToken)</code>

Token的用法和上面一樣,就不複述了,如果需要和異步結合,一個<code>Task.Run</code>就可以把并行任務交給線程池了

也可以使用Task的異步方法,設定逾時時間,這樣PLinq逾時了也就終止了

PLinq這麼友善,其實也是有一些小弊端的,比如它會直接最大程度的占用系統資源,可能會影響其他的任務,而傳統的Parallel則會動态調整

這個PLinq好像沒有對應的方法,有新文法你可以說下,來舉個例子:

取消也支援:

其實還有一些比如資料流和響應程式設計沒說,這個之前都是用第三方庫,剛才看官網文檔,好像已經支援了,是以就不賣弄了,感興趣的可以去看看,其實項目裡面有流資料相關的架構,eg:<code>Spark</code>,都是比較成熟的解決方案了基本上也不太使用這些了。

然後還有一些沒說,比如NetCore裡面不可變類型(清單、字典、集合、隊列、棧、線程安全字典等等)以及限流、任務排程等,這些關鍵詞我提一下,也友善你去搜尋自己學習拓展

先到這吧,其他的自己探索一下吧,最後貼一些Nuget庫,你可以針對性的使用:

資料流:<code>Microsoft.Tpl.Dataflow</code>

響應程式設計(Linq的Rx操作):<code>Rx-Main</code>

不可變類型:<code>Microsoft.Bcl.Immutable</code>

不得不感慨一句,微軟媽媽真的花了很多功夫,Net的并發程式設計比Python省心多了(完)

作者:毒逆天

出處:https://www.cnblogs.com/dotnetcrazy

打賞:<b>18i4JpL6g54yAPAefdtgqwRrZ43YJwAV5z</b>

本文版權歸作者和部落格園共有。歡迎轉載,但必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接!