天天看点

Python3 与 C# 并发编程之~ 进程篇

上次说了很多Linux下进程相关知识,这边不再复述,下面来说说Python的并发编程,如有错误欢迎提出~

如果遇到听不懂的可以看上一次的文章:https://www.cnblogs.com/dotnetcrazy/p/9363810.html

官方文档:https://docs.python.org/3/library/concurrency.html

在线预览:http://github.lesschina.com/python/base/concurrency/2.并发编程-进程篇.html

官方文档:https://docs.python.org/3/library/multiprocessing.html

Code:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/PythonProcess

Python的进程创建非常方便,看个案例:(这种方法通用,fork只适用于Linux系)

运行结果:

创建子进程时,传入一个执行函数和参数,用start()方法来启动进程即可

<code>join()</code>方法是父进程回收子进程的封装(主要是回收僵尸子进程(点我))

其他参数可以参考源码 or 文档,贴一下源码的<code>init</code>方法:

<code>def __init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)</code>

扩展:<code>name:为当前进程实例的别名</code>

<code>p.is_alive()</code> 判断进程实例p是否还在执行

<code>p.terminate()</code> 终止进程(发<code>SIGTERM</code>信号)

上面的案例如果用OOP来实现就是这样:(如果不指定方法,默认调Run方法)

PS:<code>multiprocessing.Process</code>自行处理僵死进程,不用像<code>os.fork</code>那样自己建立信号处理程序、安装信号处理程序

现在说说里面的一些门道(只想用的可以忽略)

新版本的封装可能多层,这时候可以看看Python3.3.X系列(这个算是Python3早期版本了,很多代码都暴露出来,比较明了直观)

multiprocessing.process.py

multiprocessing.popen_fork.py

关于断言的简单说明:(别泛滥)

如果条件为真,它什么都不做,反之它触发一个带可选错误信息的AssertionError

结果:

运行的时候可以指定<code>-O参数</code>来忽略<code>assert</code>,eg:

<code>python3 -O 0.assert.py</code>

扩展:

https://docs.python.org/3/library/unittest.html

https://www.cnblogs.com/shangren/p/8038935.html

多个进程就不需要自己手动去管理了,有Pool来帮你完成,先看个案例:

图示:(join可以指定超时时间,eg:<code>p.join(1)</code>)

Python3 与 C# 并发编程之~ 进程篇

调用<code>join()</code>之前必须先调用<code>close()</code>,调用<code>close()</code>之后就不能继续添加新的<code>Process</code>了(下面会说为什么)

验证一下Pool的默认大小是CPU的核数,看源码:

multiprocessing.pool.py

源码里面<code>apply_async</code>方法,是有回调函数(callback)的

来看个例子:(和JQ很像)

输出:

接着上面继续拓展,补充说说获取函数返回值。<code>上面是通过成功后的回调函数来获取返回值</code>,这次说说自带的方法:

输出:(<code>apply_async</code>返回一个<code>ApplyResult</code>类,里面有个get方法可以获取返回值)

再举个例子,顺便把<code>Pool</code>里面的<code>map</code>和<code>imap</code>方法搞个案例(类比jq)

微微看一眼源码:(基础忘了可以查看==&gt; 点我 )

扩展:优雅杀死子进程的探讨 https://segmentfault.com/q/1010000005077517

官方文档:https://docs.python.org/3/library/subprocess.html

还记得之前李代桃僵的<code>execlxxx</code>系列吗?

这不,<code>subprocess</code>就是它的一层封装,当然了要强大的多,先看个例子:(以<code>os.execlp</code>的例子为引)

输出

现在看下官方的文档描述来理解一下:

其实看看源码很有意思:(内部其实就是调用的<code>os.popen</code>【进程先导篇讲进程守护的时候用过】)

返回值类型:<code>CompletedProcess</code>

再来个案例体会一下方便之处:

图示:

Python3 与 C# 并发编程之~ 进程篇

再来个强大的案例(交互的程序都可以,比如 <code>ftp</code>,<code>nslookup</code> 等等):<code>popen1.communicate</code>

注意点:如果超时到期,则子进程不会被终止,需要自己处理一下(官方提醒)

这个等会说进程间通信还会说,所以简单举个例子,老规矩拿<code>ps aux | grep bash</code>说事:

输出:(以前案例:进程间通信~PIPE匿名管道)

其他扩展可以看看这篇文章:subprocess与Popen()

这个比较有意思,看个案例:

按照道理应该子进程自己写完自己读了,和上次讲得不一样啊?不急,先看看源码:

看看<code>connection.Pipe</code>方法的定义部分,是不是双向通信就看你是否设置<code>duplex=True</code>

通过源码知道了,原来双工是通过socket搞的啊~

再看个和原来一样效果的案例:(不用关来关去的了,方便!)

输出:(可以思考下为什么<code>start换个位置就死锁</code>,提示:<code>阻塞读写</code>)

再举个<code>Pool</code>的例子,咱们就进入今天的重点了:

看看源码就理解了:看看Pool的join是啥情况?看源码:

在pool的<code>__init__</code>的方法中,这几个属性:

将池进程的数量增加到指定的数量,join的时候会使用这个列表

注意:池的方法只能由创建它的进程使用

一步步的设局,从底层的的<code>pipe()</code>-&gt;<code>os.pipe</code>-&gt;<code>PIPE</code>,现在终于到<code>Queue</code>了,心酸啊,明知道上面两个项目

里面基本上不会用,但为了你们能看懂源码,说了这么久<code>%&gt;_&lt;%</code>其实以后当我们从<code>Queue</code>说到<code>MQ</code>和<code>RPC</code>之后,现在

讲得这些进程间通信(<code>IPC</code>)也基本上不会用了,但本质你得清楚,我尽量多分析点源码,这样你们以后看开源项目压力会很小

欢迎批评指正~

输出:(<code>get</code>和<code>put</code>默认是阻塞等待的)

先看看<code>Queue</code>的初始化方法:(不指定大小就是最大队列数)

关于<code>get</code>和<code>put</code>是阻塞的问题,看下源码探探究竟:

<code>q.get()</code>:收消息

<code>queue.put()</code>:发消息

非阻塞<code>get_nowait</code>和<code>put_nowait</code>本质其实也是调用了<code>get</code>和<code>put</code>方法:

说这么多不如来个例子看看:

补充说明一下:

<code>q._maxsize</code> 队列数(尽量不用<code>_</code>开头的属性和方法)

<code>q.qsize()</code>查看当前队列中存在几条消息

<code>q.full()</code>查看是否满了

<code>q.empty()</code>查看是否为空

再看个简单点的子进程间通信:(铺垫demo)

输出:(<code>time python3 5.queue2.py</code>)

多进程基本上都是用<code>pool</code>,可用上面说的<code>Queue</code>方法怎么报错了?

输出:(无法将<code>multiprocessing.Queue</code>对象传递给<code>Pool</code>方法)

下面会详说,先看一下正确方式:(队列换了一下,其他都一样<code>Manager().Queue()</code>)

再抛个思考题:(Linux)

输出:(为啥这样也可以【提示:<code>fork</code>】)

官方参考:https://docs.python.org/3/library/multiprocessing.html

spawn:(Win默认,Linux下也可以用【&gt;=3.4】)

父进程启动一个新的python解释器进程。

子进程只会继承运行进程对象run()方法所需的那些资源。

不会继承父进程中不必要的文件描述符和句柄。

与使用fork或forkserver相比,使用此方法启动进程相当慢。

可在Unix和Windows上使用。Windows上的默认设置。

fork:(Linux下默认)

父进程用于os.fork()分叉Python解释器。

子进程在开始时与父进程相同(这时候内部变量之类的还没有被修改)

父进程的所有资源都由子进程继承(用到多线程的时候可能有些问题)

仅适用于Unix。Unix上的默认值。

forkserver:(常用)

当程序启动并选择forkserver start方法时,将启动服务器进程。

从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。

fork服务器进程是单线程的,因此它可以安全使用os.fork()。没有不必要的资源被继承。

可在Unix平台上使用,支持通过Unix管道传递文件描述符。

这块官方文档很详细,贴下官方的2个案例:

通过<code>multiprocessing.set_start_method(xxx)</code>来设置启动的上下文类型

输出:(<code>set_start_method</code>不要过多使用)

如果你把设置启动上下文注释掉:(消耗的总时间少了很多)

也可以通过<code>multiprocessing.get_context(xxx)</code>获取指定类型的上下文

输出:(<code>get_context</code>在Python源码里用的比较多,so=&gt;也建议大家这么用)

从结果来看,总耗时也少了很多

说下日记相关的事情:

先看下<code>multiprocessing</code>里面的日记记录:

更多<code>Loging</code>模块内容可以看官方文档:https://docs.python.org/3/library/logging.html

这个是内部代码,看看即可:

<code>Logging</code>之前也有提过,可以看看:https://www.cnblogs.com/dotnetcrazy/p/9333792.html#2.装饰器传参的扩展(可传可不传)

来个案例:

之前忘记说了~现在快结尾了,补充一下进程5态:(来个草图)

Python3 与 C# 并发编程之~ 进程篇

应该尽量避免进程间状态共享,但需求在那,所以还是得研究,官方推荐了两种方式:

之前说过<code>Queue</code>:在<code>Process</code>之间使用没问题,用到<code>Pool</code>,就使用<code>Manager().xxx</code>,<code>Value</code>和<code>Array</code>,就不太一样了:

看看源码:(Manager里面的Array和Process共享的Array不是一个概念,而且也没有同步机制)

以<code>Process</code>为例看看怎么用:

输出:(<code>Value</code>和<code>Array</code>是<code>进程|线程</code>安全的)

类型方面的对应关系:

这两个类型其实是<code>ctypes</code>类型,更多的类型可以去` multiprocessing.sharedctypes`查看,来张图:

Python3 与 C# 并发编程之~ 进程篇

回头解决<code>GIL</code>的时候会用到<code>C</code>系列或者<code>Go</code>系列的共享库(讲线程的时候会说)

关于进程安全的补充说明:对于原子性操作就不用说,铁定安全,但注意一下<code>i+=1</code>并不是原子性操作:

输出:(理论上应该是:5×1000=5000)

稍微改一下才行:(进程安全:只是提供了安全的方法,并不是什么都不用你操心了)

输出:(关于锁这块,后面讲线程的时候会详说,看看就好【语法的确比C#麻烦点】)

看看源码:(之前探讨如何优雅的杀死子进程,其中就有一种方法使用了<code>Value</code>)

扩展部分可以查看这篇文章:http://blog.51cto.com/11026142/1874807

官方文档:https://docs.python.org/3/library/multiprocessing.html#managers

有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象

通过返回的经理<code>Manager()</code>将支持类型<code>list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue</code>

举个简单例子(后面还会再说):(本质其实就是<code>多个进程通过代理,共同操作服务端内容</code>)

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存慢(毕竟有了<code>“中介”</code>)

同步问题依然需要注意一下,举个例子体会一下:

扩展补充:

<code>multiprocessing.Lock</code>是一个进程安全对象,因此您可以将其直接传递给子进程并在所有进程中安全地使用它。

大多数可变Python对象(如list,dict,大多数类)不能保证进程中安全,所以它们在进程间共享时需要使用<code>Manager</code>

多进程模式的缺点是创建进程的代价大,在<code>Unix/Linux</code>系统下,用<code>fork</code>调用还行,在<code>Windows</code>下创建进程开销巨大。

Manager这块官方文档很详细,可以看看:https://docs.python.org/3/library/multiprocessing.html#managers

<code>WinServer</code>的可以参考这篇 or 这篇埋坑记(Manager一般都是部署在Linux的,Win的客户端不影响)

还记得之前的:无法将multiprocessing.Queue对象传递给Pool方法吗?其实一般都是这两种方式解决的:

使用Manager需要生成另一个进程来托管Manager服务器。 并且所有获取/释放锁的调用都必须通过IPC发送到该服务器。

使用初始化程序在池创建时传递常规<code>multiprocessing.Queue()</code>这将使<code>Queue</code>实例在所有子进程中全局共享

再看一下Pool的<code>__init__</code>方法:

第一种方法不够轻量级,在讲案例前,稍微说下第二种方法:(也算把上面留下的悬念解了)

输出:(就是在初始化Pool的时候,传了初始化执行的方法并传了参数:<code>alizer=init, initargs=(queue, ))</code>)

Win下亦通用(win下没有<code>os.getgid</code>)

Python3 与 C# 并发编程之~ 进程篇

有了<code>1.6</code>的基础,咱们来个例子练练:

<code>BaseManager</code>的缩略图:

Python3 与 C# 并发编程之~ 进程篇

服务器端代码:

客户端代码1:

客户端代码2:

输出图示:

Python3 与 C# 并发编程之~ 进程篇

服务器运行在Linux的测试:

Python3 与 C# 并发编程之~ 进程篇

其实还有一部分内容没说,明天得出去办点事,先到这吧,后面找机会继续带一下

参考文章:

进程共享的探讨:python-sharing-a-lock-between-processes

多进程锁的探讨:trouble-using-a-lock-with-multiprocessing-pool-pickling-error

JoinableQueue扩展:https://www.cnblogs.com/smallmars/p/7093603.html

Python多进程编程:https://www.cnblogs.com/kaituorensheng/p/4445418.html

有深度但需要辩证看的两篇文章:

跨进程对象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue

关于Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process

 Python的线程、并行、协程下次说

示例代码:https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency

先简单说下概念(其实之前也有说,所以简说下):

并发:同时做多件事情

多线程:并发的一种形式

并行处理:多线程的一种(线程池产生的一种并发类型,eg:异步编程)

响应式编程:一种编程模式,对事件进行响应(有点类似于JQ的事件)

Net里面很少用进程,在以前基本上都是<code>线程+池+异步+并行+协程</code>

我这边简单引入一下,毕竟主要是写Python的教程,Net只是帮你们回顾一下,如果你发现还没听过这些概念,或者你的项目中还充斥着各种<code>Thread</code>和<code>ThreadPool</code>的话,真的得系统的学习一下了,现在官网的文档已经很完善了,记得早几年啥都没有,也只能挖那些外国开源项目:

https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency

Task的目的其实就是为了简化<code>Thread</code>和<code>ThreadPool</code>的代码,下面一起看看吧:

异步用起来比较简单,一般IO,DB,Net用的比较多,很多时候都会采用重试机制,举个简单的例子:

然后补充说下Task异常的问题,当你await的时候如果有异常会抛出,在第一个await处捕获处理即可

如果<code>async</code>和<code>await</code>就是理解不了的可以这样想:<code>async</code>就是为了让<code>await</code>生效(为了向后兼容)

对了,如果返回的是void,你设置成Task就行了,触发是类似于事件之类的方法才使用void,不然没有返回值都是使用Task

项目里经常有这么一个场景:等待一组任务完成后再执行某个操作,看个引入案例:

再举一个场景:同时调用多个同效果的API,有一个返回就好了,其他的忽略

一个async方法被await调用后,当它恢复运行时就会回到原来的上下文中运行。

如果你的Task不再需要上下文了可以使用:<code>task.ConfigureAwait(false)</code>,eg:写个日记还要啥上下文?

逆天的建议是:在核心代码里面一种使用<code>ConfigureAwait</code>,用户页面相关代码,不需要上下文的加上

其实如果有太多await在上下文里恢复那也是比较卡的,使用<code>ConfigureAwait</code>之后,被暂停后会在线程池里面继续运行

再看一个场景:比如一个耗时操作,我需要指定它的超时时间:

异步这块简单回顾就不说了,留两个扩展,你们自行探讨:

进度方面的可以使用<code>IProgress&lt;T&gt;</code>,就当留个作业自己摸索下吧~

使用了异步之后尽量避免使用<code>task.Wait</code> or <code>task.Result</code>,这样可以避免死锁

Task其他新特征去官网看看吧,引入到此为止了。

这个其实出来很久了,现在基本上都是用<code>PLinq</code>比较多点,主要就是:

数据并行:重点在处理数据(eg:聚合)

任务并行:重点在执行任务(每个任务块尽可能独立,越独立效率越高)

以前都是<code>Parallel.ForEach</code>这么用,现在和Linq结合之后非常方便<code>.AsParallel()</code>就OK了

说很抽象看个简单案例:

正常执行的结果应该是:

并行之后就是这样了(不管顺序了):

当然了,如果你就是对顺序有要求可以使用:<code>.AsOrdered()</code>

其实实际项目中,使用并行的时候:任务时间适中,太长不适合,太短也不适合

记得大家在项目里经常会用到如<code>Sum</code>,<code>Count</code>等聚合函数,其实这时候使用并行就很合适

time dotnet PLINQ.dll

不使用并行:(稍微多了点,CPU越密集差距越大)

其实聚合有一个通用方法,可以支持复杂的聚合:(以上面sum为例)

稍微扩展一下,PLinq也是支持取消的,<code>.WithCancellation(CancellationToken)</code>

Token的用法和上面一样,就不复述了,如果需要和异步结合,一个<code>Task.Run</code>就可以把并行任务交给线程池了

也可以使用Task的异步方法,设置超时时间,这样PLinq超时了也就终止了

PLinq这么方便,其实也是有一些小弊端的,比如它会直接最大程度的占用系统资源,可能会影响其他的任务,而传统的Parallel则会动态调整

这个PLinq好像没有对应的方法,有新语法你可以说下,来举个例子:

取消也支持:

其实还有一些比如数据流和响应编程没说,这个之前都是用第三方库,刚才看官网文档,好像已经支持了,所以就不卖弄了,感兴趣的可以去看看,其实项目里面有流数据相关的框架,eg:<code>Spark</code>,都是比较成熟的解决方案了基本上也不太使用这些了。

然后还有一些没说,比如NetCore里面不可变类型(列表、字典、集合、队列、栈、线程安全字典等等)以及限流、任务调度等,这些关键词我提一下,也方便你去搜索自己学习拓展

先到这吧,其他的自己探索一下吧,最后贴一些Nuget库,你可以针对性的使用:

数据流:<code>Microsoft.Tpl.Dataflow</code>

响应编程(Linq的Rx操作):<code>Rx-Main</code>

不可变类型:<code>Microsoft.Bcl.Immutable</code>

不得不感慨一句,微软妈妈真的花了很多功夫,Net的并发编程比Python省心多了(完)

作者:毒逆天

出处:https://www.cnblogs.com/dotnetcrazy

打赏:<b>18i4JpL6g54yAPAefdtgqwRrZ43YJwAV5z</b>

本文版权归作者和博客园共有。欢迎转载,但必须保留此段声明,且在文章页面明显位置给出原文连接!