《Python高级编程》学习心得——第十一章 多线程和多进程
Python GIL:全局解释器锁
准确来说,GIL (全局解释器锁) 并不是Python的语法特性而是基于C语言的Python解释器CPython的规定,Python的其他解释器例如基于Java的Jython和基于Python的PyPy是没有全局解释器锁的。但是目前CPython是Python语言的主流解释器 (我们从Python官网上下载的解释器就是CPython) ,大部分第三方库也是基于CPython开发的,所以GIL的问题几乎就是Python语言本身的问题。
GIL在Python wiki的解释如下:
In CPython, the global interpreter lock*, or GIL, is a mutex that protects access to* Python objects, preventing multiple threads from executing Python bytecodes at once. This lock is necessary mainlybecause CPython’s memory management is not thread-safe.
翻译成中文,意思是说,“GIL是一个互斥锁,阻止多个线程同时执行一段Python字节码”。GIL保证了Python字节码是线程安全的,但同时,GIL使得Python不可能利用多核并行实现多线程。因为即使是多核,GIL仍然使得CPU一个时间只能运行一段字节码。这也是Python多线程效率不高、为人诟病的原因。
那么,是否Python的多线程就没有意义了呢?实则不然。操作系统课上都讲过,进程/线程可以分为两种,一种是CPU Bound (“计算密集型”,例如视频解码,科学计算,深度学习训练与预测<本质上是一堆矩阵乘法,也可以归为科学计算范畴>),一种是IO Bound (“IO密集型”,例如基于网络的、数据库应用)。IO Bound类型的进程/线程中CPU计算时间只占一小部分,大部分时间在处理IO,与CPU无关,此时CPU可以腾出手来处理其他进程/线程。因此,即使Python的GIL使得CPU不能并行计算多个进程,当存在多个IO Bound的进程/线程时,多进程/多线程仍能通过CPU和IO的切换提高效率。而在Python常见的服务器端开发和爬虫开发中,我们的应用大部分都是IO Bound (服务器端的访问数据库、处理请求和爬虫的发起请求)的,可以用多线程优化,因此掌握Python多线程还是很重要的。
多线程相关Python源码
本节不打算逐个介绍Python多线程编程和多进程编程的每个知识点,主要解读两个包 (threading.py和Queue.py) 中的3个类Condition, Semaphore和Queue来理解Python多线程。
threading.Condition && threading.Semaphore
有操作系统中进程调度的基础很容易理解Condition和Semaphore两个类。这里就不详细解释了,源码里的doc已经在类和关键的方法下面写了很详细的注释了。
值得一提的是Condition类实现了
__enter__
和
__exit__
方法,因而实现了上下文管理协议,是一个上下文管理器,可以用
with
…
as
…语法。
class Condition:
"""Class that implements a condition variable.
A condition variable allows one or more threads to wait until they are
notified by another thread.
If the lock argument is given and not None, it must be a Lock or RLock
object, and it is used as the underlying lock. Otherwise, a new RLock object
is created and used as the underlying lock.
"""
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
def __repr__(self):
return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
def _release_save(self):
self._lock.release() # No state to save
def _acquire_restore(self, x):
self._lock.acquire() # Ignore saved state
def _is_owned(self):
# Return True if lock is owned by current_thread.
# This method is called only if _lock doesn't have _is_owned().
if self._lock.acquire(0):
self._lock.release()
return False
else:
return True
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass
def wait_for(self, predicate, timeout=None):
"""Wait until a condition evaluates to True.
predicate should be a callable which result will be interpreted as a
boolean value. A timeout may be provided giving the maximum time to
wait.
"""
endtime = None
waittime = timeout
result = predicate()
while not result:
if waittime is not None:
if endtime is None:
endtime = _time() + waittime
else:
waittime = endtime - _time()
if waittime <= 0:
break
self.wait(waittime)
result = predicate()
return result
def notify(self, n=1):
"""Wake up one or more threads waiting on this condition, if any.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method wakes up at most n of the threads waiting for the condition
variable; it is a no-op if no threads are waiting.
"""
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass
def notify_all(self):
"""Wake up all threads waiting on this condition.
If the calling thread has not acquired the lock when this method
is called, a RuntimeError is raised.
"""
self.notify(len(self._waiters))
notifyAll = notify_all
class Semaphore:
"""This class implements semaphore objects.
Semaphores manage a counter representing the number of release() calls minus
the number of acquire() calls, plus an initial value. The acquire() method
blocks if necessary until it can return without making the counter
negative. If not given, value defaults to 1.
"""
# After Tim Peters' semaphore class, but not quite the same (no maximum)
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
"""Acquire a semaphore, decrementing the internal counter by one.
When invoked without arguments: if the internal counter is larger than
zero on entry, decrement it by one and return immediately. If it is zero
on entry, block, waiting until some other thread has called release() to
make it larger than zero. This is done with proper interlocking so that
if multiple acquire() calls are blocked, release() will wake exactly one
of them up. The implementation may pick one at random, so the order in
which blocked threads are awakened should not be relied on. There is no
return value in this case.
When invoked with blocking set to true, do the same thing as when called
without arguments, and return true.
When invoked with blocking set to false, do not block. If a call without
an argument would block, return false immediately; otherwise, do the
same thing as when called without arguments, and return true.
When invoked with a timeout other than None, it will block for at
most timeout seconds. If acquire does not complete successfully in
that interval, return false. Return true otherwise.
"""
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value -= 1
rc = True
return rc
__enter__ = acquire
def release(self):
"""Release a semaphore, incrementing the internal counter by one.
When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread.
"""
with self._cond:
self._value += 1
self._cond.notify()
def __exit__(self, t, v, tb):
self.release()
Queue.Queue
Queue
是Python中线程安全的队列API. 重点可以看
get
和
put
方法。可以看到
Queue
的
__init__
方法封装了
_init
方法,而
_init
方法封装了一个双端队列
deque
. 同时
__init__
方法还定义了一个互斥锁
mutex
,用这个锁去定义了三个
Condition
变量
self.not_empty
和
self.not_full
以及
self.all_tasks_done
.
在
get
方法中可以看到,如果当前线程被block了,则会一直wait直到被notify. 弹出队首元素后,又会notify,让别的被block的线程可以access当前对象。
put
方法的逻辑也是同理。
class Queue:
'''Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
'''
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = threading.Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = threading.Condition(self.mutex)
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
def task_done(self):
'''Indicate that a formerly enqueued task is complete.
Used by Queue consumer threads. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items
placed in the queue.
'''
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
def join(self):
'''Blocks until all items in the Queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
'''
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
def qsize(self):
'''Return the approximate size of the queue (not reliable!).'''
with self.mutex:
return self._qsize()
def empty(self):
'''Return True if the queue is empty, False otherwise (not reliable!).
This method is likely to be removed at some point. Use qsize() == 0
as a direct substitute, but be aware that either approach risks a race
condition where a queue can grow before the result of empty() or
qsize() can be used.
To create code that needs to wait for all queued tasks to be
completed, the preferred technique is to use the join() method.
'''
with self.mutex:
return not self._qsize()
def full(self):
'''Return True if the queue is full, False otherwise (not reliable!).
This method is likely to be removed at some point. Use qsize() >= n
as a direct substitute, but be aware that either approach risks a race
condition where a queue can shrink before the result of full() or
qsize() can be used.
'''
with self.mutex:
return 0 < self.maxsize <= self._qsize()
def put(self, item, block=True, timeout=None):
'''Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
'''
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
def put_nowait(self, item):
'''Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
'''
return self.put(item, block=False)
def get_nowait(self):
'''Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
'''
return self.get(block=False)
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
# Initialize the queue representation
def _init(self, maxsize):
self.queue = deque()
def _qsize(self):
return len(self.queue)
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
return self.queue.popleft()