天天看點

Python進階知識點學習(八)

線程同步 - condition介紹

多線程中的另外一個重要點就是condition:條件變量。

condition是python多線程程式設計中用于複雜線程間通信的一個鎖 叫做條件變量。

cond = threading.Condition()

with self.cond:
     cond.notify()
     cond.wait()
           

condition有兩層鎖, 一把底層鎖會線上程調用了wait方法的時候釋放, 上面的鎖會在每次調用wait的時候配置設定一把并放入到cond的等待隊列中,等到notify方法的喚醒。

有關condition的詳情請查閱資料。(這裡作者自己暫時還沒理清楚原理,見諒)

線程同步 - Semaphore 介紹

信号量,Semaphore。

Semaphore 是用于控制進入數量的鎖,控制進入某段代碼的線程數。

檔案, 讀、寫, 寫一般隻是用于一個線程寫,讀可以允許有多個。

ThreadPoolExecutor線程池

多線程和多程序對比

  • 運算,耗cpu的操作,用多程序程式設計
  • 對于io操作來說, 使用多線程程式設計

由于程序切換代價要高于線程,是以能使用線程就不用程序。

耗費cpu的操作:

def fib(n):
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)


if __name__ == "__main__":
    with ThreadPoolExecutor(3) as executor:
        all_task = [executor.submit(fib, (num)) for num in range(1, 10)]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))
           

模拟IO操作:

def random_sleep(n):
    time.sleep(n)
    return n


if __name__ == "__main__":
    with ProcessPoolExecutor(3) as executor:
        all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))
           

multiprocessing 多程序

使用

os.fork

建立子程序,fork隻能用于linux/unix中。

import os
import time
# fork建立子程序 fork隻能用于linux/unix中
pid = os.fork()
print("a")
if pid == 0:
  print('子程序id:{} ,父程序id是: {}.' .format(os.getpid(), os.getppid()))
else:
  print('我是父程序, 我fork出的子程序id是:{}.'.format(pid))

time.sleep(2)

運作結果:
a
我是父程序, 我fork出的子程序id是:3093.
a
子程序id:3093 ,父程序id是: 3092.
           

運作結果中,可以看到列印了兩次a,因為在執行完

pid = os.fork()

這行代碼後,就建立了一個子程序,且子程序把父程序中的資料原樣拷貝了一份到自己的程序中,是以父程序中列印一次,子程序中又列印一次。

多程序程式設計:

import multiprocessing

# 多程序程式設計
import time
def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":
    progress = multiprocessing.Process(target=get_html, args=(2,))
    print(progress.pid)
    progress.start()
    print(progress.pid)
    progress.join()
    print("main progress end")
           

使用程序池:

import multiprocessing

# 多程序程式設計
import time
def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":
    # 使用程序池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    result = pool.apply_async(get_html, args=(3,))

    # 等待所有任務完成
    pool.close()
    pool.join()

    print(result.get())
           

程序池另一種方法:

import multiprocessing

# 多程序程式設計
import time
def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":
 
    # 使用程序池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    for result in pool.imap_unordered(get_html, [1, 5, 3]):
        print("{} sleep success".format(result))
           

程序間通信 Queue、Pipe,Manager

共享全局變量不能适用于多程序程式設計,可以适用于多線程。

程序間通信和線程間通信有相同也有不同,不同點是之前在多線程中用的線程間通信的類和線程間同步的鎖在多程序中是不能用的。

使用multiprocessing中的Queue實作程序通信
import time
from multiprocessing import Process, Queue, Pool, Manager, Pipe


def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Queue(10)
    my_producer = Process(target=producer, args=(queue,))
    my_consumer = Process(target=consumer, args=(queue,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
           

一定要使用

multiprocessing中的Queue

,如果使用

import queue

這個queue是不行的。

pool中的程序間通信需要使用manager中的queue

multiprocessing中的queue不能用于pool程序池。

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Manager().Queue(10)
    pool = Pool(2)

    pool.apply_async(producer, args=(queue,))
    pool.apply_async(consumer, args=(queue,))

    pool.close()
    pool.join()
           
使用Manager,多程序修改同一變量:
def add_data(p_dict, key, value):
    p_dict[key] = value


if __name__ == "__main__":
    progress_dict = Manager().dict()
    from queue import PriorityQueue

    first_progress = Process(target=add_data, args=(progress_dict, "a", 22))
    second_progress = Process(target=add_data, args=(progress_dict, "b", 23))

    first_progress.start()
    second_progress.start()
    first_progress.join()
    second_progress.join()

    print(progress_dict)
           

可以看到兩個程序對一個dict變量做值得填充,最終主程序中列印出了最終的dict。

通過pipe實作程序間通信:

pipe的性能高于queue。

pipe隻能适用于兩個程序。

def producer(pipe):
    pipe.send("a")

def consumer(pipe):
    print(pipe.recv())


if __name__ == "__main__":
    recevie_pipe, send_pipe = Pipe()
    #pipe隻能适用于兩個程序
    my_producer = Process(target=producer, args=(send_pipe, ))
    my_consumer = Process(target=consumer, args=(recevie_pipe,))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
           

my_producer程序給my_consumer程序發送的a變量可以正常列印。