天天看點

Python程式設計:協程coroutine手動切換協程自動切換協程gevent 多并發socketselect 多并發socketselectors 多并發socket異步爬蟲(并行)

關于協程

協程 coroutine 微線程,一種使用者态的輕量級線程

好處:

無需線程上下文切換的開銷

無需原子操作鎖定及同步的開銷

友善切換控制流,簡化程式設計模型

高并發+高擴充+低成本,一個cup支援上萬的協程都不是問題

缺點:

無法利用多核資源,協程的本質是單線程,

程序阻塞blocking操作如io時會阻塞整個程式

單線程下實作并發效果:遇到io就切換

伺服器處理模型:

1.一個程序處理一個請求

2.一個線程處理一個請求

3.主程序處理事件隊列的請求

事件驅動模型:

多個事件 -> 消息隊列 -> 處理線程

事件驅動程式設計是一種程式設計範式,這裡程式的執行流由外部事件來決定。

它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。

另外兩種常見的程式設計範式是(單線程)同步以及多線程程式設計。

異步asynchronous

使用者空間application 和 核心空間kernel

程序控制塊(Processing Control Block)

參考:

http://www.cnblogs.com/alex3714/articles/5876749.html

io操作兩個階段:核心緩沖區資料準備,拷貝到應用程式位址空間

阻塞io blocking 一直等待

非阻塞io nonblocking 準備資料立刻傳回

多路複用 select,poll,epoll

異步io 沒有被阻塞

twisted架構

手動切換協程

from greenlet import greenlet

def foo1():
    print("foo11")
    gr2.switch()
    print("foo12")
    gr2.switch()

def foo2():
    print("foo21")
    gr1.switch()
    print("foo22")

gr1 = greenlet(foo1)  # 建立協程
gr2 = greenlet(foo2)

print("main")

gr1.switch()  # 切換協程

print("done")
"""
main
foo11
foo21
foo12
foo22
done
"""      

自動切換協程

import gevent  # 第三方庫

def foo1():
    print("foo11")
    gevent.sleep(2)
    print("foo12")

def foo2():
    print("foo21")
    gevent.sleep(2)
    print("foo22")

def foo3():
    print("foo31")
    gevent.sleep(0)
    print("foo32")

gevent.joinall([
    gevent.spawn(foo1),
    gevent.spawn(foo2),
    gevent.spawn(foo3)
])

"""
foo11
foo21
foo31
foo32
foo12
foo22
"""      

gevent 多并發socket

import socket
import gevent
from gevent import monkey

monkey.patch_all()  # 猴子更新檔

def server(port):
    s = socket.socket()
    s.bind(("0.0.0.0", port))
    s.listen(500)
    print("服務已啟動")
    while True:
        conn, addr = s.accept()
        print(addr)
        gevent.spawn(handle_request, conn)

def handle_request(conn): # 處理請求的協程
    try:
        while True:
            data = conn.recv(1024)
            print(conn, data.decode())
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as e:
        print(e)

    finally:
        print("關閉", conn)
        conn.close()

if __name__ == "__main__":
    server(6969)      

select 多并發socket

# fd  檔案描述符

import select
import socket
import queue

server = socket.socket()

server.setblocking(False)  # 不阻塞

server.bind(("localhost", 6969))
server.listen(5)
print("服務已開啟")

inputs = [server, ]  # server本身也是一個檔案描述符
outputs = []

message_dict = {}

while True:
    # 如果沒有任何 fd 就緒,就會阻塞在這裡
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    print("readable", readable)
    print("writeable", writeable)
    print("exceptional", exceptional)

    for r in readable:
        if r is server:  # server就緒,新連接配接來了
            conn, addr = r.accept()
            print("已連接配接", conn)
            conn.setblocking(False)
            inputs.append(conn)  # 為了不阻塞程式,收到連接配接對象後放入清單,如果接收到資訊,fd就緒
            message_dict[conn] = queue.Queue()  # 建立消息隊列,不立刻傳回,逐個處理

        else:  # 如果不是伺服器,就是用戶端
            data = r.recv(1024)
            if not data:
                print("連接配接斷開")
            else:
                print("收到資料", data.decode("utf-8"))
                message_dict[r].put(data)  # 放入消息隊列
                outputs.append(r)  # 不影響其他用戶端連接配接,後續處理發送資訊

    for s in outputs:  # 要傳回給用戶端的連接配接表
        data = message_dict[s].get()
        s.send(data)
        outputs.remove(s)  # 處理完就删除

    for e in exceptional:  # 删除異常連接配接
        if e in outputs:
            outputs.remove(e)
        inputs.remove(e)
        del message_dict[e]      

selectors 多并發socket

import selectors
import socket

def accept(server, mask):  # 接受連接配接
    conn, addr = server.accept()
    print("conn:", conn, "addr:", addr, "mask:", mask)
    conn.setblocking(False)
    selector.register(conn, selectors.EVENT_READ, action)

def action(conn, mask):  # 接收資料
    data = conn.recv(1024)
    if data:
        print("conn:", conn, "data:", data.decode("utf-8"))
        conn.send(data)
    else:
        print("斷開連接配接")
        conn.close()
        selector.unregister(conn)

server = socket.socket()
server.setblocking(False)
address = ("localhost", 6969)
server.bind(address)
server.listen(1000)

selector = selectors.DefaultSelector()
selector.register(server, selectors.EVENT_READ, accept)  # accept回調函數
print("服務啟動")

while True:
    events = selector.select()  # 預設阻塞,有活動則傳回活動清單
    for key, mask in events:
        callback = key.data  # accept
        callback(key.fileobj, mask)  # fileobj 檔案句柄      

異步爬蟲(并行)

from urllib import request
from gevent import monkey
import time, gevent

monkey.patch_all()  # 把目前程式所有io操作都做上标記

def get_html(url):
    response = request.urlopen(url)
    html = response.read()
    print("receive:", len(html))

urls = [
    "https://www.python.org/",
    "https://www.yahoo.com/",
    "https://github.com/"
    ]

start_time = time.time()

for url in urls:
    html = get_html(url)

end_time = time.time()

print("串行:", end_time - start_time)

async_start_time = time.time()

gevent.joinall([
    gevent.spawn(get_html, urls[0]),
    gevent.spawn(get_html, urls[1]),
    gevent.spawn(get_html, urls[2])
])

async_end_time = time.time()

print("并行:", async_end_time - async_start_time)

"""
receive: 48893
receive: 511654
receive: 52225
串行: 4.339118003845215
receive: 48893
receive: 502566
receive: 52223
并行: 1.4489901065826416
"""