天天看點

asyncio協程

asyncio協程

3.4版本加入标準庫。

asyncio底層基于selectors實作,看似庫,其實就是一個架構,包含異步IO、事件循環、協程、任務等内容。

問題引出

  • 基于不同程式交替執行
  1. 多線程版本
import threading
import time

def a():
    for x in range(3):
        time.sleep(0.001)
        print(x)

def b():
    for x in "abc":
        time.sleep(0.001)
        print(x)

threading.Thread(target=a,name="a").start()
threading.Thread(target=b,name="b").start()      
  1. 多程序版本
import multiprocessing
import time

def a():
    for x in range(3):
        time.sleep(0.1)
        print(x)

def b():
    for x in "abc":
        time.sleep(0.1)
        print(x)

if __name__ == '__main__':
    pa = multiprocessing.Process(target=a,name="a")
    pb = multiprocessing.Process(target=b,name="b")
    pa.start()
    pb.start()      
  1. 生成器版本
import multiprocessing
import time

def a():
    for x in range(3):
        print(x)
        yield

def b():
    for x in "abc":
        print(x)
        yield

x = a()
y = b()

for i in range(3):
    next(x)
    next(y)      
  • 上例在一個線程内通過生成器完成了排程,讓兩個函數都有機會執行,這樣的排程不是作業系統的程序、線程完成的,而是使用者自己設計出來的。
  1. 需要使用yield來讓出控制權
  2. 需要使用循環幫助交替執行

事件循環

事件循環是asyncio提供的核心運作機制

常用方法 含義
asyncio.get_event_loop() 傳回一個事件循環對象,是asyncio.BaseEventLoop的執行個體
AbstractEventLoop.stop() 停止運作事件循環
AbstractEventLoop.run_forever() 一直運作,知道stop()
AbstractEventLoop.run_until_complete(future) 運作直至Future對象運作完。傳回Futrue的結果。參數可以是Future類或子類Task的對象。如果是協程對象也會被封裝成Task對象
AbstractEventLoop.close() 關閉事件循環
AbstractEventLoop.is_running() 傳回事件循環的是否運作
AbstractEventLoop.close() 關閉事件循環
AbstractEventLoop.create_task(coro) 使用協程對象建立任務對象

協程

  • 協程不是程序、也不是線程,它是使用者空間排程來完成并發處理的方式
  • 程序、線程由作業系統完成排程,而協程是線程内完成排程。它不需要更多的線程,自然也沒有多線程切換帶來的開銷
  • 協程是非搶占式排程,隻有一個協程主動讓出控制權,另一個協程才會被排程
  • 協程也不需要使用鎖機制,因為是在同一個線程中執行
  • 多CPU下,可以使用程序和協程配合,即能程序并發又能發揮協程在單線程中的優勢
  • Python中協程是基于生成器的
  • asyncio.iscoroutine(obj)判斷是不是協程對象
  • asyncio.iscoroutinefunction(func)判斷是不是協程函數
  • Future和concurrent.futures.Future類似。通過Future對象可以了解任務執行的狀态資料。
  • 事件循環來監聽Future對象是否完成。
  • Task任務:Task類是Future的子類,它的作用就是把協程包裝成一個Future對象。

協程的使用

  • 3.4引入asyncio,使用裝飾器,将生成器函數轉換成協程函數,就可以在事件循環中執行了。
  • 簡單示例
import asyncio

@asyncio.coroutine
def xdd(x): #協程函數
    for i in range(3):
        print("xdd {}".format(i))
        yield from asyncio.sleep(x)
    return "result = {}".format(1000)

#事件循環
loop = asyncio.get_event_loop()

# 本質就是tasks的ensure_future,把協程包裝進一個Future對象中,并使用create_task傳回一個task
future = asyncio.ensure_future(xdd(1))

# 内部會調用ensure_future,内部會執行loop.run_forever()
loop.run_until_complete(future)

print(" -"* 30)
loop.close()
print(future.result()) #拿return值
print("====end=======")      
asyncio協程
  1. ensure_future(coro_or_future):
  • 如果參數已經是future直接傳回
  • 如果是協程,則使用loop.create_task()建立task,并傳回task
  • 可以使用create_task函數得到task對象
import asyncio

@asyncio.coroutine
def xdd(x): #協程函數
    for i in range(3):
        print("xdd {}".format(i))
        yield from asyncio.sleep(x)
    return "result = {}".format(1000)

x = xdd(1)
print(x)

# 事件循環
loop = asyncio.get_event_loop()

# create_task傳回一個task
task = loop.create_task(x)
print(1,task) #pending
#内部會調用ensure_future,内部會執行loop.run_forever()
loop.run_until_complete(task)
print(2,task) #finished

print("- "* 30)
loop.close()
print(task.result()) #拿return值
print("======end=======")      
asyncio協程
  • 如果對函數的傳回值不在乎,上面代碼可以如下實作:
import asyncio

@asyncio.coroutine
def xdd(x): #協程函數
    for i in range(3):
        print("xdd {}".format(i))
        yield from asyncio.sleep(x)
    return "result = {}".format(1000)

#事件循環
loop = asyncio.get_event_loop()

# 本質就是tasks的ensure_future,把協程包裝進一個Future對象中,并傳回一個task
# 内部會調用ensure_future,内部會執行loop.run_forever()
loop.run_until_complete(xdd(0.1)) #包裝協程為task

print(" -"* 30)
loop.close()
print("====end=======")      
asyncio協程

上例中,似乎拿不到xdd(1)這個協程對象的結果future了,run_until_complete函數的傳回值就是其參數future對象的傳回結果。

future對象都可以調用add_done_callback(fn)增加回調函數,回調函數是單參的,參數就是future對象。

代碼如下:

import asyncio

@asyncio.coroutine
def xdd(x): #協程函數
    for i in range(3):
        print("xdd {}".format(i))
        yield from asyncio.sleep(x)
    return "result = {}".format(1000)

# 回調函數,參數必須是future
def callback(future):
    print("in callback. future = {}".format(future))
    print(future.result())

# 事件循環
loop = asyncio.get_event_loop()

# create_task傳回一個task
task = loop.create_task(xdd(1))
# 添加回調函數
task.add_done_callback(callback)
print(1,task) #pending
# 内部會調用ensure_future,内部會執行loop.run_forever()
loop.run_until_complete(task)
print(2,task)

print(" -"* 30)
loop.close()

print("====end=======")      
asyncio協程
  • 多個任務執行
import asyncio

@asyncio.coroutine
def xdd(x): #協程函數
    for i in range(3):
        print("xdd {}".format(i))
        yield from asyncio.sleep(x)
    return "result = {}".format(1000)

@asyncio.coroutine
def b():
    for x in range(10):
        print("b.x",x)
        yield
    return "b.finished"

# 回調函數,參數必須是future
def callback(future):
    print("in callback. future = {}".format(future))
    print(future.result())

print(asyncio.iscoroutinefunction(xdd),asyncio.iscoroutinefunction(b))

t1 = xdd(0.001)
t2 = b()
print(asyncio.iscoroutine(t1),asyncio.iscoroutine(t2))

#事件大循環
loop = asyncio.get_event_loop()

# 先建構t1為task1,為其添加回調函數
task1 = loop.create_task(t1)
task1.add_done_callback(callback)
tasks = [task1,t2]

# asyncio.wait 會疊代清單中的對象并封裝成(f1,f2),傳回一個協程對象f
# 循環執行f,它内部等價于yield from {f1,f2}
rt = loop.run_until_complete(asyncio.wait(tasks))
print(rt) #全部結果,全部完成才能看
loop.close()      
asyncio協程
  • 注意:run_until_complete方法的傳回結果,必須所有任務執行完才能看。

協程的新文法

  • 3.5版本開始,Python提供關鍵字async,await,在語言上原生支援協程。
  • 簡單示範
import asyncio

async def xdd():
    for i in range(3):
        print("xdd {}".format(i))
        await asyncio.sleep(0.5)

loop = asyncio.get_event_loop()
loop.run_until_complete(xdd())
loop.close()      
asyncio協程
  • ​async def​

    ​用來定義協程函數。
  • ​iscoroutinefunction()​

    ​判斷是否是協程函數,True表示是協程函數
  • 協程函數中可以不包含await、async關鍵字,但不能使用yield關鍵字。
  • 如同生成器函數調用傳回生成器對象一樣,協程函數調用也會傳回一個對象稱為協程對象,iscoroutine()傳回True,判斷對象是否是協程對象。
  • await語句之後是awaitable對象,可以是協程或者實作了__await__()方法的對象。await會暫停目前協程執行,使loop排程其他協程。
  • 示例:
import asyncio

@asyncio.coroutine
def w():
    yield

async def a():
    for x in range(3):
        print("a.x",x)
        await w()
    return "a.finished"

async def b():
    for x in range(5):
        print("b.x",x)
        await w()
    return "b.finished"

print(asyncio.iscoroutinefunction(a),asyncio.iscoroutinefunction(b))

t1 = a()
t2 = b()
print(asyncio.iscoroutine(t1),asyncio.iscoroutine(t2))

def cb(future):
    print(future.result())

#事件大循環
loop = asyncio.get_event_loop()
fs = set()
for t in [t1,t2]:
    f = asyncio.ensure_future(t)
    f.add_done_callback(cb)
    fs.add(f)

# asyncio.wait 會疊代清單中的對象并封裝成(f1,f2),傳回一個協程對象f
# 循環執行f,它内部等價于yield from {f1,f2}
results = loop.run_until_complete(asyncio.wait(fs))
loop.close()

print(results) #全部結果,全部完成才能看      

TCP Echo Server舉例

  • 單聊版本
import asyncio
from asyncio.streams import StreamWriter,StreamReader

async def hander(reader:StreamReader,witer:StreamWriter):
    client = None
    try:
        print(server.sockets)
        client = witer.get_extra_info("peername")
        print("新加入了一個連接配接 {}".format(client))
        while True:
            data = await reader.read(1024)
            if data == b'':
                break
            print(data)
            msg = "{}-{}".format(client,data).encode()
            witer.write(msg)
    finally:
        witer.close()
        print("退出了一個連接配接 {}".format(client))

#擷取事件大循環
loop = asyncio.get_event_loop()
#傳回一個協程對象
conn = asyncio.start_server(hander,"127.0.0.1",3999,loop=loop)
#擷取sever
server = loop.run_until_complete(conn)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()      
  • 群聊版本
import asyncio
from asyncio.streams import StreamWriter,StreamReader
import threading

class AsClentServer:
    def __init__(self,ip="127.0.0.1",post=3999):
        self.loop = asyncio.get_event_loop()
        self.ip = ip
        self.post = post
        self.witers = set()
        self.event = threading.Event()

    #啟動服務
    def start(self):
        conn = asyncio.start_server(self.handler, self.ip, self.post, loop=self.loop)
        self.server = self.loop.run_until_complete(conn)
        threading.Thread(target=self.run,name="server",daemon=True).start()

    def run(self):
        try:
            self.loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            self.close()

    async def handler(self,reader:StreamReader,witer:StreamWriter):
        client = None
        self.witers.add(witer)
        try:
            client = witer.get_extra_info("peername")
            print("新加入了一個連接配接 {}".format(client))
            while not self.event.is_set():
                data = await reader.read(1024)
                if data == b'' or data == b'quit':
                    break
                print(data)
                msg = "{}-{}".format(client, data).encode()
                for i in self.witers:
                    i.write(msg)
        finally:
            witer.close()
            self.witers.remove(witer)
            print("退出了一個連接配接 {}".format(client))

    def close(self):
        self.event.set()
        for i in self.witers:
            i.close()
        self.witers.clear()
        if hasattr(self,"server"):
            self.server.close()
        try:
            self.loop.run_until_complete(self.server.wait_closed())
            self.loop.close()
        except:
            pass

    @classmethod
    def main(cls):
        cltserver = cls()
        cltserver.start()
        while True:
            cmd = input(">>>")
            if cmd == "quit":
                cltserver.close()
                break
            print(threading.enumerate())

if __name__ == '__main__':
    AsClentServer.main()      

aiohttp庫

  • HTTP Server簡單示例
from aiohttp import web

async def indexhandle(request:web.Request):
    return web.Response(text=request.path,status=201)

async def handle(request:web.Request):
    print(request.match_info)
    print(request.query_string) # http://127.0.0.1:8888/1?name=12301
    return web.Response(text=request.match_info.get("id","0000"),status=200)

app = web.Application()
app.router.add_get("/",indexhandle) # http://127.0.0.1:8080/
app.router.add_get("/{id}",handle) #http://127.0.0.1:8080/12301
# app.add_routes([
#     web.get("/",indexhandle),
#     web.get("/{id}",handle)
# ])
web.run_app(app,host="0.0.0.0",port=3888)      
  • HTTP Client
import asyncio
from aiohttp import ClientSession

async def get_html(url:str):
    async with ClientSession() as session:
        async with session.get(url) as res:
            print(res.status)
            print(await res.text())

url = "http://127.0.0.1:3888/2?age=20&name=jerry"
loop = asyncio.get_event_loop()
loop.run_until_complete(get_html(url))
loop.close()      

繼續閱讀