關于協程
協程 coroutine 微線程,一種使用者态的輕量級線程
好處:
無需線程上下文切換的開銷
無需原子操作鎖定及同步的開銷
友善切換控制流,簡化程式設計模型
高并發+高擴充+低成本,一個cup支援上萬的協程都不是問題
缺點:
無法利用多核資源,協程的本質是單線程,
程序阻塞blocking操作如io時會阻塞整個程式
單線程下實作并發效果:遇到io就切換
伺服器處理模型:
1.一個程序處理一個請求
2.一個線程處理一個請求
3.主程序處理事件隊列的請求
事件驅動模型:
多個事件 -> 消息隊列 -> 處理線程
事件驅動程式設計是一種程式設計範式,這裡程式的執行流由外部事件來決定。
它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。
另外兩種常見的程式設計範式是(單線程)同步以及多線程程式設計。
異步asynchronous
使用者空間application 和 核心空間kernel
程序控制塊(Processing Control Block)
參考:
http://www.cnblogs.com/alex3714/articles/5876749.htmlio操作兩個階段:核心緩沖區資料準備,拷貝到應用程式位址空間
阻塞io blocking 一直等待
非阻塞io nonblocking 準備資料立刻傳回
多路複用 select,poll,epoll
異步io 沒有被阻塞
twisted架構
手動切換協程
from greenlet import greenlet
def foo1():
print("foo11")
gr2.switch()
print("foo12")
gr2.switch()
def foo2():
print("foo21")
gr1.switch()
print("foo22")
gr1 = greenlet(foo1) # 建立協程
gr2 = greenlet(foo2)
print("main")
gr1.switch() # 切換協程
print("done")
"""
main
foo11
foo21
foo12
foo22
done
"""
自動切換協程
import gevent # 第三方庫
def foo1():
print("foo11")
gevent.sleep(2)
print("foo12")
def foo2():
print("foo21")
gevent.sleep(2)
print("foo22")
def foo3():
print("foo31")
gevent.sleep(0)
print("foo32")
gevent.joinall([
gevent.spawn(foo1),
gevent.spawn(foo2),
gevent.spawn(foo3)
])
"""
foo11
foo21
foo31
foo32
foo12
foo22
"""
gevent 多并發socket
import socket
import gevent
from gevent import monkey
monkey.patch_all() # 猴子更新檔
def server(port):
s = socket.socket()
s.bind(("0.0.0.0", port))
s.listen(500)
print("服務已啟動")
while True:
conn, addr = s.accept()
print(addr)
gevent.spawn(handle_request, conn)
def handle_request(conn): # 處理請求的協程
try:
while True:
data = conn.recv(1024)
print(conn, data.decode())
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as e:
print(e)
finally:
print("關閉", conn)
conn.close()
if __name__ == "__main__":
server(6969)
select 多并發socket
# fd 檔案描述符
import select
import socket
import queue
server = socket.socket()
server.setblocking(False) # 不阻塞
server.bind(("localhost", 6969))
server.listen(5)
print("服務已開啟")
inputs = [server, ] # server本身也是一個檔案描述符
outputs = []
message_dict = {}
while True:
# 如果沒有任何 fd 就緒,就會阻塞在這裡
readable, writeable, exceptional = select.select(inputs, outputs, inputs)
print("readable", readable)
print("writeable", writeable)
print("exceptional", exceptional)
for r in readable:
if r is server: # server就緒,新連接配接來了
conn, addr = r.accept()
print("已連接配接", conn)
conn.setblocking(False)
inputs.append(conn) # 為了不阻塞程式,收到連接配接對象後放入清單,如果接收到資訊,fd就緒
message_dict[conn] = queue.Queue() # 建立消息隊列,不立刻傳回,逐個處理
else: # 如果不是伺服器,就是用戶端
data = r.recv(1024)
if not data:
print("連接配接斷開")
else:
print("收到資料", data.decode("utf-8"))
message_dict[r].put(data) # 放入消息隊列
outputs.append(r) # 不影響其他用戶端連接配接,後續處理發送資訊
for s in outputs: # 要傳回給用戶端的連接配接表
data = message_dict[s].get()
s.send(data)
outputs.remove(s) # 處理完就删除
for e in exceptional: # 删除異常連接配接
if e in outputs:
outputs.remove(e)
inputs.remove(e)
del message_dict[e]
selectors 多并發socket
import selectors
import socket
def accept(server, mask): # 接受連接配接
conn, addr = server.accept()
print("conn:", conn, "addr:", addr, "mask:", mask)
conn.setblocking(False)
selector.register(conn, selectors.EVENT_READ, action)
def action(conn, mask): # 接收資料
data = conn.recv(1024)
if data:
print("conn:", conn, "data:", data.decode("utf-8"))
conn.send(data)
else:
print("斷開連接配接")
conn.close()
selector.unregister(conn)
server = socket.socket()
server.setblocking(False)
address = ("localhost", 6969)
server.bind(address)
server.listen(1000)
selector = selectors.DefaultSelector()
selector.register(server, selectors.EVENT_READ, accept) # accept回調函數
print("服務啟動")
while True:
events = selector.select() # 預設阻塞,有活動則傳回活動清單
for key, mask in events:
callback = key.data # accept
callback(key.fileobj, mask) # fileobj 檔案句柄
異步爬蟲(并行)
from urllib import request
from gevent import monkey
import time, gevent
monkey.patch_all() # 把目前程式所有io操作都做上标記
def get_html(url):
response = request.urlopen(url)
html = response.read()
print("receive:", len(html))
urls = [
"https://www.python.org/",
"https://www.yahoo.com/",
"https://github.com/"
]
start_time = time.time()
for url in urls:
html = get_html(url)
end_time = time.time()
print("串行:", end_time - start_time)
async_start_time = time.time()
gevent.joinall([
gevent.spawn(get_html, urls[0]),
gevent.spawn(get_html, urls[1]),
gevent.spawn(get_html, urls[2])
])
async_end_time = time.time()
print("并行:", async_end_time - async_start_time)
"""
receive: 48893
receive: 511654
receive: 52225
串行: 4.339118003845215
receive: 48893
receive: 502566
receive: 52223
并行: 1.4489901065826416
"""