天天看点

Python线程Thread、条件变量Condition、信号量Semaphore、事件Event线程

文章目录

  • 线程
    • 创建线程
    • 线程的方法
    • GIL 全局解释器锁
    • 线程锁
      • 多个线程去操作同一个变量
      • 死锁
    • Condition 条件变量
      • 生产者消费者
    • Semaphore 信号量
    • Event 事件
    • 上下文管理器

线程

创建线程

from _thread import start_new_thread
import threading

def foo():
    print threading.current_thread().name

# 方式1
start_new_thread(foo)

# 方式2
t = threading.Thread(target=foo)
t.start()

# 方式3 继承Thread,重写run方法
class MyThread(threading.Thread):
    def run(self):
        # 系统的Thread模块在里面执行的target方法
        pass
           

线程的方法

t = thraeding.Thread(group=None, target=None, name=None, args=(), kwargs={}) 
# group: 线程组,目前还没有实现,库引用中提示必须是None
# target: 要执行的方法
# name: 线程名
# args/kwargs: 要传入方法的参数

# 实例方法
isAlive() # 返回线程是否在运行。正在运行指启动后、终止前。
get/setName(name) # 获取/设置线程名
'''
get/setDaemon(bool) # 获取/设置是否线程守护(默认False)。(在start之前设置)
如果为False,线程不守护,主线程结束,子线程不会立即结束
如果为True,线程守护,主线程结束,子线程结束
'''
start() # 启动线程
join(timeout) # 线程等待,等待这个线程结束,或者timeout时间到
           

GIL 全局解释器锁

全局解释器锁(Global InterpreterLock,GIL)是一个加在解释器上的锁,即使多个线程直接不会相互影响在同一个进程下也只有一个线程使用CPU

线程锁

多个线程去操作同一个变量

import threading
lock = threading.Lock()
def acquire(self, blocking=True, timeout=-1):
    pass
****************
****************
blocking 参数:True表示阻塞,False表示不阻塞, 成功获得锁返回True,否则False
****************
'''
def release(self):
    pass
           
import threading

# 没有锁可能会出现线程安全的问题,举个栗子
# 两个线程同时去操作一个变量
count = 0
def change_num(n):
    global count
    for i in  range(1000):
        count += n
        count -= n

if __name__ == '__main__':
    thread_list = []
    for i in range(10):
        t = threading.Thread(target=change_num, args=(i, ))
        t.start()
        thread_list.append(t)
    for t in thread_list:
        t.join() # 线程等待
    print count
# 上面会出现count不为0的情况,这时候要引入锁
           
# 当一个线程获取这把锁,另一个线程也想获取这把锁,那么第二个线程会被阻塞,
# 等到上一个线程将所释放掉才会继续执行。

def change_num(n):
    global count
    lock.acquire()
    try:
        for i in range(100):
            count += n
            count -= n
    finally:
        lock.release()

#或者 上下文协议
def change_num(n):
    global count
    with lock:
        for i in range(1000):
            count += n
            count -= n


if __name__ == '__main__':
    thread_list = []
    for i in rnage(5):
        t = threading.Thread(target=foo, args=(i,))
        t.start()
    for t in thread_list:
        t.join()
    # 这时候打印的count为0
    print count
           

死锁

1、情况1
# 当一个锁被别人占用时,你在去拿这把锁,你就会陷入等待状态。
# 你拿了一把锁,别人那个另一把锁,他想拿你的锁,你想拿他的锁,都不让,这时进入死锁状态。
def foo(l1, l2):
    l1.acquire()
    l2.acquire()
    l1.release()
    l2.release()

if __name__ == '__main__':
    l1 = threading.Lock()
    l2 = threading.Lock()
    t1 = threading.Thread(target=foo, args=(l1, l2))
    t2 = threading.Thread(target=foo, args=(l2, l1))
    t1.start()
    t2.start()

2、情况2
# 当在一个线程中同时去acquire同一把锁的时候,也会进入死锁状态
if __name__ == '__main__':
    lock = threading.Lock()
    lock.acquire()
    lock.acquire()

3、情况2的问题可以用可重入锁来解决,RLock
# 可重入锁在一个线程中看可以获取多次,不会出现阻塞的情况,当时如果别人想拿,需要把所有acquire释放掉
import threading

lock = threading.RLock()
lock.acquire()
lock.acquire()
print 'thread end'
           

Condition 条件变量

生产者消费者

# Condition wait 和 notify 源码分析
 
 def wait(self, timeout=None):
     # 如果当前标识不是自己,那么抛出异常
     if not self._is_owned():
         raise RuntimeError("cannot wait on un-acquired lock")
     # 创建一把锁,锁自己
     waiter = _allocate_lock()
     waiter.acquire()
     self._waiters.append(waiter)
     # 释放公共锁,让别人可以拿到这把锁
     saved_state = self._release_save()
     gotit = False
     try:    # restore state no matter what (e.g., KeyboardInterrupt)
         if timeout is None:
             # 自己进入死锁状态,等待别人notify
             waiter.acquire()
             gotit = True
         else:
             if timeout > 0:
                 gotit = waiter.acquire(True, timeout)
             else:
                 gotit = waiter.acquire(False)
         return gotit
     finally:
         # 被人唤醒后拿到公共锁
         self._acquire_restore(saved_state)
         if not gotit:
             try:
                 self._waiters.remove(waiter)
             except ValueError:
                 pass

 def notify(self, n=1):
     # 如果当前标识不是自己,那么抛出异常
     if not self._is_owned():
         raise RuntimeError("cannot notify on un-acquired lock")
     all_waiters = self._waiters
     waiters_to_notify = _deque(_islice(all_waiters, n))
     if not waiters_to_notify:
         return
     # 释放那先陷入死锁的线程
     for waiter in waiters_to_notify:
         waiter.release()
         try:
             all_waiters.remove(waiter)
         except ValueError:
             pass
             
 """
 # Condition 支持上下文协议,acquire 和 release
 cond = threading.Condition()
 with cond:
     cond.notify()
     cond.wait()
     pass

 *********
 *******
 *****
 ***
 notify 方法的作用主要是,去唤醒处于死锁是另一个线程,但是执行权还没有让出。
 wait 方法的作用是让出执行权,同时自己进入死锁状态,等待其他线程唤醒自己。
 ***
 *****
 *******
 *********
 """
           

Semaphore 信号量

  • 信号量通常用于保护数量有限的资源,例如数据库服务器。在资源数量固定的任何情况下,都应该使用有界信号量。在生成任何工作线程前,应该在主线程中初始化信号量。
    # 源码分析
    class _Semaphore(_Verbose):
        def __init__(self, value=1, verbose=None):
            if value < 0:
                raise ValueError("semaphore initial value must be >= 0")
            _Verbose.__init__(self, verbose)
            # 创建一个条件变量
            self.__cond = Condition(Lock())
            # 信号数量
            self.__value = value
    
        def acquire(self, blocking=1):
            rc = False
            # 线程安全
            with self.__cond:
                # 如果value为0,那么进入阻塞状态
                while self.__value == 0:
                    if not blocking:
                        break
                    if __debug__:
                        self._note("%s.acquire(%s): blocked waiting, value=%s",
                                self, blocking, self.__value)
                    # 进入等待,等待别人去唤醒
                    self.__cond.wait()
                else:
                    # 如果不为0或者从上面的while循环中跳出,信号量-1
                    self.__value = self.__value - 1
                    if __debug__:
                        self._note("%s.acquire: success, value=%s",
                                self, self.__value)
                    rc = True
            return rc
    
        __enter__ = acquire
    
        def release(self):
            # 线程安全
            with self.__cond:
                self.__value = self.__value + 1
                if __debug__:
                    self._note("%s.release: success, value=%s",
                            self, self.__value)
                self.__cond.notify()
    
        def __exit__(self, t, v, tb):
            self.release()
            
    """
    执行过程:
    --
    创建信号量以后,会设置信号量的数量,在线程中可以使用acquire,信号量-1,
    当信号量减为0是,内部的条件变量会让这个线程进入wait状态,
    等待其他线程release唤醒他,唤醒他以后信号量再-1。
    """
               

Event 事件

这是线程之间通信的最简单机制之一:一个线程发出事件信号,而其他线程等待该信号。

class threading.Event()
---
方法:
    is_set() 如果标志为True,则返回True
    
    set()  把标志置为True
    
    clear()  把标志清除为False
    
    wait(timeout=None)  如果标志为False,则进入等待,等待其他线程把标志置为True
    wait方法除了超过超时事件的情况返回False,其余情况都返回True
           

上下文管理器

Lock、Condition、Semaphore都支持上下文协议,内部enter和exit方法都是使用的acquire和release方法

with xxx:
    pass