上次说了很多Linux下进程相关知识,这边不再复述,下面来说说Python的并发编程,如有错误欢迎提出~
如果遇到听不懂的可以看上一次的文章:https://www.cnblogs.com/dotnetcrazy/p/9363810.html
官方文档:https://docs.python.org/3/library/concurrency.html
在线预览:http://github.lesschina.com/python/base/concurrency/2.并发编程-进程篇.html
官方文档:https://docs.python.org/3/library/multiprocessing.html
Code:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/PythonProcess
Python的进程创建非常方便,看个案例:(这种方法通用,fork只适用于Linux系)
运行结果:
创建子进程时,传入一个执行函数和参数,用start()方法来启动进程即可
<code>join()</code>方法是父进程回收子进程的封装(主要是回收僵尸子进程(点我))
其他参数可以参考源码 or 文档,贴一下源码的<code>init</code>方法:
<code>def __init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)</code>
扩展:<code>name:为当前进程实例的别名</code>
<code>p.is_alive()</code> 判断进程实例p是否还在执行
<code>p.terminate()</code> 终止进程(发<code>SIGTERM</code>信号)
上面的案例如果用OOP来实现就是这样:(如果不指定方法,默认调Run方法)
PS:<code>multiprocessing.Process</code>自行处理僵死进程,不用像<code>os.fork</code>那样自己建立信号处理程序、安装信号处理程序
现在说说里面的一些门道(只想用的可以忽略)
新版本的封装可能多层,这时候可以看看Python3.3.X系列(这个算是Python3早期版本了,很多代码都暴露出来,比较明了直观)
multiprocessing.process.py
multiprocessing.popen_fork.py
关于断言的简单说明:(别泛滥)
如果条件为真,它什么都不做,反之它触发一个带可选错误信息的AssertionError
结果:
运行的时候可以指定<code>-O参数</code>来忽略<code>assert</code>,eg:
<code>python3 -O 0.assert.py</code>
扩展:
https://docs.python.org/3/library/unittest.html
https://www.cnblogs.com/shangren/p/8038935.html
多个进程就不需要自己手动去管理了,有Pool来帮你完成,先看个案例:
图示:(join可以指定超时时间,eg:<code>p.join(1)</code>)

调用<code>join()</code>之前必须先调用<code>close()</code>,调用<code>close()</code>之后就不能继续添加新的<code>Process</code>了(下面会说为什么)
验证一下Pool的默认大小是CPU的核数,看源码:
multiprocessing.pool.py
源码里面<code>apply_async</code>方法,是有回调函数(callback)的
来看个例子:(和JQ很像)
输出:
接着上面继续拓展,补充说说获取函数返回值。<code>上面是通过成功后的回调函数来获取返回值</code>,这次说说自带的方法:
输出:(<code>apply_async</code>返回一个<code>ApplyResult</code>类,里面有个get方法可以获取返回值)
再举个例子,顺便把<code>Pool</code>里面的<code>map</code>和<code>imap</code>方法搞个案例(类比jq)
微微看一眼源码:(基础忘了可以查看==> 点我 )
扩展:优雅杀死子进程的探讨 https://segmentfault.com/q/1010000005077517
官方文档:https://docs.python.org/3/library/subprocess.html
还记得之前李代桃僵的<code>execlxxx</code>系列吗?
这不,<code>subprocess</code>就是它的一层封装,当然了要强大的多,先看个例子:(以<code>os.execlp</code>的例子为引)
输出
现在看下官方的文档描述来理解一下:
其实看看源码很有意思:(内部其实就是调用的<code>os.popen</code>【进程先导篇讲进程守护的时候用过】)
返回值类型:<code>CompletedProcess</code>
再来个案例体会一下方便之处:
图示:
再来个强大的案例(交互的程序都可以,比如 <code>ftp</code>,<code>nslookup</code> 等等):<code>popen1.communicate</code>
注意点:如果超时到期,则子进程不会被终止,需要自己处理一下(官方提醒)
这个等会说进程间通信还会说,所以简单举个例子,老规矩拿<code>ps aux | grep bash</code>说事:
输出:(以前案例:进程间通信~PIPE匿名管道)
其他扩展可以看看这篇文章:subprocess与Popen()
这个比较有意思,看个案例:
按照道理应该子进程自己写完自己读了,和上次讲得不一样啊?不急,先看看源码:
看看<code>connection.Pipe</code>方法的定义部分,是不是双向通信就看你是否设置<code>duplex=True</code>
通过源码知道了,原来双工是通过socket搞的啊~
再看个和原来一样效果的案例:(不用关来关去的了,方便!)
输出:(可以思考下为什么<code>start换个位置就死锁</code>,提示:<code>阻塞读写</code>)
再举个<code>Pool</code>的例子,咱们就进入今天的重点了:
看看源码就理解了:看看Pool的join是啥情况?看源码:
在pool的<code>__init__</code>的方法中,这几个属性:
将池进程的数量增加到指定的数量,join的时候会使用这个列表
注意:池的方法只能由创建它的进程使用
一步步的设局,从底层的的<code>pipe()</code>-><code>os.pipe</code>-><code>PIPE</code>,现在终于到<code>Queue</code>了,心酸啊,明知道上面两个项目
里面基本上不会用,但为了你们能看懂源码,说了这么久<code>%>_<%</code>其实以后当我们从<code>Queue</code>说到<code>MQ</code>和<code>RPC</code>之后,现在
讲得这些进程间通信(<code>IPC</code>)也基本上不会用了,但本质你得清楚,我尽量多分析点源码,这样你们以后看开源项目压力会很小
欢迎批评指正~
输出:(<code>get</code>和<code>put</code>默认是阻塞等待的)
先看看<code>Queue</code>的初始化方法:(不指定大小就是最大队列数)
关于<code>get</code>和<code>put</code>是阻塞的问题,看下源码探探究竟:
<code>q.get()</code>:收消息
<code>queue.put()</code>:发消息
非阻塞<code>get_nowait</code>和<code>put_nowait</code>本质其实也是调用了<code>get</code>和<code>put</code>方法:
说这么多不如来个例子看看:
补充说明一下:
<code>q._maxsize</code> 队列数(尽量不用<code>_</code>开头的属性和方法)
<code>q.qsize()</code>查看当前队列中存在几条消息
<code>q.full()</code>查看是否满了
<code>q.empty()</code>查看是否为空
再看个简单点的子进程间通信:(铺垫demo)
输出:(<code>time python3 5.queue2.py</code>)
多进程基本上都是用<code>pool</code>,可用上面说的<code>Queue</code>方法怎么报错了?
输出:(无法将<code>multiprocessing.Queue</code>对象传递给<code>Pool</code>方法)
下面会详说,先看一下正确方式:(队列换了一下,其他都一样<code>Manager().Queue()</code>)
再抛个思考题:(Linux)
输出:(为啥这样也可以【提示:<code>fork</code>】)
官方参考:https://docs.python.org/3/library/multiprocessing.html
spawn:(Win默认,Linux下也可以用【>=3.4】)
父进程启动一个新的python解释器进程。
子进程只会继承运行进程对象run()方法所需的那些资源。
不会继承父进程中不必要的文件描述符和句柄。
与使用fork或forkserver相比,使用此方法启动进程相当慢。
可在Unix和Windows上使用。Windows上的默认设置。
fork:(Linux下默认)
父进程用于os.fork()分叉Python解释器。
子进程在开始时与父进程相同(这时候内部变量之类的还没有被修改)
父进程的所有资源都由子进程继承(用到多线程的时候可能有些问题)
仅适用于Unix。Unix上的默认值。
forkserver:(常用)
当程序启动并选择forkserver start方法时,将启动服务器进程。
从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。
fork服务器进程是单线程的,因此它可以安全使用os.fork()。没有不必要的资源被继承。
可在Unix平台上使用,支持通过Unix管道传递文件描述符。
这块官方文档很详细,贴下官方的2个案例:
通过<code>multiprocessing.set_start_method(xxx)</code>来设置启动的上下文类型
输出:(<code>set_start_method</code>不要过多使用)
如果你把设置启动上下文注释掉:(消耗的总时间少了很多)
也可以通过<code>multiprocessing.get_context(xxx)</code>获取指定类型的上下文
输出:(<code>get_context</code>在Python源码里用的比较多,so=>也建议大家这么用)
从结果来看,总耗时也少了很多
说下日记相关的事情:
先看下<code>multiprocessing</code>里面的日记记录:
更多<code>Loging</code>模块内容可以看官方文档:https://docs.python.org/3/library/logging.html
这个是内部代码,看看即可:
<code>Logging</code>之前也有提过,可以看看:https://www.cnblogs.com/dotnetcrazy/p/9333792.html#2.装饰器传参的扩展(可传可不传)
来个案例:
之前忘记说了~现在快结尾了,补充一下进程5态:(来个草图)
应该尽量避免进程间状态共享,但需求在那,所以还是得研究,官方推荐了两种方式:
之前说过<code>Queue</code>:在<code>Process</code>之间使用没问题,用到<code>Pool</code>,就使用<code>Manager().xxx</code>,<code>Value</code>和<code>Array</code>,就不太一样了:
看看源码:(Manager里面的Array和Process共享的Array不是一个概念,而且也没有同步机制)
以<code>Process</code>为例看看怎么用:
输出:(<code>Value</code>和<code>Array</code>是<code>进程|线程</code>安全的)
类型方面的对应关系:
这两个类型其实是<code>ctypes</code>类型,更多的类型可以去` multiprocessing.sharedctypes`查看,来张图:
回头解决<code>GIL</code>的时候会用到<code>C</code>系列或者<code>Go</code>系列的共享库(讲线程的时候会说)
关于进程安全的补充说明:对于原子性操作就不用说,铁定安全,但注意一下<code>i+=1</code>并不是原子性操作:
输出:(理论上应该是:5×1000=5000)
稍微改一下才行:(进程安全:只是提供了安全的方法,并不是什么都不用你操心了)
输出:(关于锁这块,后面讲线程的时候会详说,看看就好【语法的确比C#麻烦点】)
看看源码:(之前探讨如何优雅的杀死子进程,其中就有一种方法使用了<code>Value</code>)
扩展部分可以查看这篇文章:http://blog.51cto.com/11026142/1874807
官方文档:https://docs.python.org/3/library/multiprocessing.html#managers
有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象
通过返回的经理<code>Manager()</code>将支持类型<code>list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue</code>
举个简单例子(后面还会再说):(本质其实就是<code>多个进程通过代理,共同操作服务端内容</code>)
服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存慢(毕竟有了<code>“中介”</code>)
同步问题依然需要注意一下,举个例子体会一下:
扩展补充:
<code>multiprocessing.Lock</code>是一个进程安全对象,因此您可以将其直接传递给子进程并在所有进程中安全地使用它。
大多数可变Python对象(如list,dict,大多数类)不能保证进程中安全,所以它们在进程间共享时需要使用<code>Manager</code>
多进程模式的缺点是创建进程的代价大,在<code>Unix/Linux</code>系统下,用<code>fork</code>调用还行,在<code>Windows</code>下创建进程开销巨大。
Manager这块官方文档很详细,可以看看:https://docs.python.org/3/library/multiprocessing.html#managers
<code>WinServer</code>的可以参考这篇 or 这篇埋坑记(Manager一般都是部署在Linux的,Win的客户端不影响)
还记得之前的:无法将multiprocessing.Queue对象传递给Pool方法吗?其实一般都是这两种方式解决的:
使用Manager需要生成另一个进程来托管Manager服务器。 并且所有获取/释放锁的调用都必须通过IPC发送到该服务器。
使用初始化程序在池创建时传递常规<code>multiprocessing.Queue()</code>这将使<code>Queue</code>实例在所有子进程中全局共享
再看一下Pool的<code>__init__</code>方法:
第一种方法不够轻量级,在讲案例前,稍微说下第二种方法:(也算把上面留下的悬念解了)
输出:(就是在初始化Pool的时候,传了初始化执行的方法并传了参数:<code>alizer=init, initargs=(queue, ))</code>)
Win下亦通用(win下没有<code>os.getgid</code>)
有了<code>1.6</code>的基础,咱们来个例子练练:
<code>BaseManager</code>的缩略图:
服务器端代码:
客户端代码1:
客户端代码2:
输出图示:
服务器运行在Linux的测试:
其实还有一部分内容没说,明天得出去办点事,先到这吧,后面找机会继续带一下
参考文章:
进程共享的探讨:python-sharing-a-lock-between-processes
多进程锁的探讨:trouble-using-a-lock-with-multiprocessing-pool-pickling-error
JoinableQueue扩展:https://www.cnblogs.com/smallmars/p/7093603.html
Python多进程编程:https://www.cnblogs.com/kaituorensheng/p/4445418.html
有深度但需要辩证看的两篇文章:
跨进程对象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue
关于Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process
Python的线程、并行、协程下次说
示例代码:https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency
先简单说下概念(其实之前也有说,所以简说下):
并发:同时做多件事情
多线程:并发的一种形式
并行处理:多线程的一种(线程池产生的一种并发类型,eg:异步编程)
响应式编程:一种编程模式,对事件进行响应(有点类似于JQ的事件)
Net里面很少用进程,在以前基本上都是<code>线程+池+异步+并行+协程</code>
我这边简单引入一下,毕竟主要是写Python的教程,Net只是帮你们回顾一下,如果你发现还没听过这些概念,或者你的项目中还充斥着各种<code>Thread</code>和<code>ThreadPool</code>的话,真的得系统的学习一下了,现在官网的文档已经很完善了,记得早几年啥都没有,也只能挖那些外国开源项目:
https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency
Task的目的其实就是为了简化<code>Thread</code>和<code>ThreadPool</code>的代码,下面一起看看吧:
异步用起来比较简单,一般IO,DB,Net用的比较多,很多时候都会采用重试机制,举个简单的例子:
然后补充说下Task异常的问题,当你await的时候如果有异常会抛出,在第一个await处捕获处理即可
如果<code>async</code>和<code>await</code>就是理解不了的可以这样想:<code>async</code>就是为了让<code>await</code>生效(为了向后兼容)
对了,如果返回的是void,你设置成Task就行了,触发是类似于事件之类的方法才使用void,不然没有返回值都是使用Task
项目里经常有这么一个场景:等待一组任务完成后再执行某个操作,看个引入案例:
再举一个场景:同时调用多个同效果的API,有一个返回就好了,其他的忽略
一个async方法被await调用后,当它恢复运行时就会回到原来的上下文中运行。
如果你的Task不再需要上下文了可以使用:<code>task.ConfigureAwait(false)</code>,eg:写个日记还要啥上下文?
逆天的建议是:在核心代码里面一种使用<code>ConfigureAwait</code>,用户页面相关代码,不需要上下文的加上
其实如果有太多await在上下文里恢复那也是比较卡的,使用<code>ConfigureAwait</code>之后,被暂停后会在线程池里面继续运行
再看一个场景:比如一个耗时操作,我需要指定它的超时时间:
异步这块简单回顾就不说了,留两个扩展,你们自行探讨:
进度方面的可以使用<code>IProgress<T></code>,就当留个作业自己摸索下吧~
使用了异步之后尽量避免使用<code>task.Wait</code> or <code>task.Result</code>,这样可以避免死锁
Task其他新特征去官网看看吧,引入到此为止了。
这个其实出来很久了,现在基本上都是用<code>PLinq</code>比较多点,主要就是:
数据并行:重点在处理数据(eg:聚合)
任务并行:重点在执行任务(每个任务块尽可能独立,越独立效率越高)
以前都是<code>Parallel.ForEach</code>这么用,现在和Linq结合之后非常方便<code>.AsParallel()</code>就OK了
说很抽象看个简单案例:
正常执行的结果应该是:
并行之后就是这样了(不管顺序了):
当然了,如果你就是对顺序有要求可以使用:<code>.AsOrdered()</code>
其实实际项目中,使用并行的时候:任务时间适中,太长不适合,太短也不适合
记得大家在项目里经常会用到如<code>Sum</code>,<code>Count</code>等聚合函数,其实这时候使用并行就很合适
time dotnet PLINQ.dll
不使用并行:(稍微多了点,CPU越密集差距越大)
其实聚合有一个通用方法,可以支持复杂的聚合:(以上面sum为例)
稍微扩展一下,PLinq也是支持取消的,<code>.WithCancellation(CancellationToken)</code>
Token的用法和上面一样,就不复述了,如果需要和异步结合,一个<code>Task.Run</code>就可以把并行任务交给线程池了
也可以使用Task的异步方法,设置超时时间,这样PLinq超时了也就终止了
PLinq这么方便,其实也是有一些小弊端的,比如它会直接最大程度的占用系统资源,可能会影响其他的任务,而传统的Parallel则会动态调整
这个PLinq好像没有对应的方法,有新语法你可以说下,来举个例子:
取消也支持:
其实还有一些比如数据流和响应编程没说,这个之前都是用第三方库,刚才看官网文档,好像已经支持了,所以就不卖弄了,感兴趣的可以去看看,其实项目里面有流数据相关的框架,eg:<code>Spark</code>,都是比较成熟的解决方案了基本上也不太使用这些了。
然后还有一些没说,比如NetCore里面不可变类型(列表、字典、集合、队列、栈、线程安全字典等等)以及限流、任务调度等,这些关键词我提一下,也方便你去搜索自己学习拓展
先到这吧,其他的自己探索一下吧,最后贴一些Nuget库,你可以针对性的使用:
数据流:<code>Microsoft.Tpl.Dataflow</code>
响应编程(Linq的Rx操作):<code>Rx-Main</code>
不可变类型:<code>Microsoft.Bcl.Immutable</code>
不得不感慨一句,微软妈妈真的花了很多功夫,Net的并发编程比Python省心多了(完)
作者:毒逆天
出处:https://www.cnblogs.com/dotnetcrazy
打赏:<b>18i4JpL6g54yAPAefdtgqwRrZ43YJwAV5z</b>
本文版权归作者和博客园共有。欢迎转载,但必须保留此段声明,且在文章页面明显位置给出原文连接!