天天看點

協程future對象及concurrent.futuresFuture 對象concurrent.futures

文章目錄

  • Future 對象
  • concurrent.futures
    • 示例: async + requests

Future 對象

Future 是一種特殊的低層級可等待對象(Task是Future的子類), 表示一個異步操作的最終結果

當一個 Future 對象被等待, 這意味着協程将保持等待直到該 Future 對象在其他地方操作完畢

async def main():
    # 擷取目前事件循環
    loop = asyncio.get_running_loop()
    # 建立一個任務(Future對象), 預設未綁定任何行為, 則這個任務永遠都不會結束
    fut = loop.create_futute()
    # 等到任務最終結果(Future對象), 沒有結果則會一直等待下去
    await fut
           
import asyncio

async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result('666')

async def main():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()

    # 建立一個任務, 綁定了 set_after 函數, 函數内部在 2s 後, 會給 fut 指派
    # 手動設定 future任務的最終結果, 那麼 fut 就可以結束了
    await loop.create_task(set_after(fut))
    data = await fut
    print(data)

asyncio.run(main())
           

Future 對象本身無函數進行綁定, 想要讓事件循環擷取 Future 的結果, 則需要手動設定

Task 對象繼承了 Future 對象并進行了擴充, 可以實作在對應綁定的函數執行完成之後, 自動執行

set_result

, 進而實作自動結束

concurrent.futures

在 python 的

concurrent.futures

子產品中也有一個 Future 對象, 這個對象是基于線程池或程序池實作異步操作時使用的對象

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor

def func(value):
    time.sleep(1)
    print(value)

pool = ThreadPoolExecutor(max_workers=5)

for i in range(1):
    fut = pool.submit(func, i)
    print(fut)
           

示例: async + requests

(1) 直接使用, 實際表現形式為同步請求

import asyncio
import requests

async def send_req(url):
    print(f'開始請求 {url}')
    res = requests.get(url).text
    print(res)
    print(f'請求結束 {url}')
    return res

async def main():
    tasks = [
        asyncio.create_task(send_req('http://httpbin.org/ip')),
        asyncio.create_task(send_req('http://httpbin.org/headers')),
    ]

    await asyncio.wait(tasks)

asyncio.run(main())
           

(2) run_in_executor 異步請求

import asyncio
import requests
import concurrent.futures


def send_req(url):
    print(f'開始請求 {url}')
    res = requests.get(url).text
    print(res)
    print(f'請求結束 {url}')
    return res


async def main():
    loop = asyncio.get_running_loop()

    # None, 預設采用 ThreadPoolExecutor 線程池
    # 第一步:内部會先調用 ThreadPoolExecutor 的 submit 方法去線程池中申請一個線程去執行 send_req 函數,并傳回一個concurrent.futures.Future對象
    # 第二步:調用 asyncio.wrap_future 将 concurrent.futures.Future 對象包裝為 asycio.Future 對象
    tasks = [
        loop.run_in_executor(None, send_req, 'http://httpbin.org/ip'),
        loop.run_in_executor(None, send_req, 'http://httpbin.org/headers')
    ]
    
    await asyncio.wait(tasks)


# concurrent.futures 異步
async def main():
    loop = asyncio.get_running_loop()

    with concurrent.futures.ThreadPoolExecutor() as pool:
        tasks = [
            loop.run_in_executor(pool, send_req, 'http://httpbin.org/ip'),
            loop.run_in_executor(pool, send_req, 'http://httpbin.org/user-agent'),
        ] 

        await asyncio.wait(tasks)

asyncio.run(main())
           
當項目以協程式的異步程式設計開發時,如果要使用一個第三方子產品,而第三方子產品不支援協程方式異步程式設計時,就可以用到這個功能