閱讀時間 8 分鐘
> 作者的生産環境剛從2.6更新到3.5.0,但滿足不了aiohttp的最低版本需求。是以在有了這篇文章,如何改造代碼,充分利用python3.5 asyncio提供的異步功能。原文連結
近日IT部門最終将我們工作環境的分布式Python版本更新到了3.5.0。這對從2.6版本來說是一次巨大的更新,但依然有些遺憾。3.5.0 不能滿足一些庫的最小版本需求,這其中就包括aiohttp。
盡管有這些限制,我依然需要寫腳本從我們的API擷取數以百計的csv檔案,然後處理資料。Python本身并不想NodeJS那樣基于事件驅動和原生異步,但這并不妨礙Python 也能實作一樣的功能。這篇文檔将詳細介紹我如何學習異步操作,并列出它的優勢。
聲明: 如果你有更高的版本(3.5.2+),強烈推薦你使用aiohttp。這是個非常健壯的庫, 特别适合解決這類問題。網上也有很多關于她的教程。
假設
作如下假設:
> * 熟悉Python和它的文法
> * 熟悉基礎的網絡請求
> * 知道異步執行的概念
開始
安裝requests
$ python -m pip install requests
沒有權限可以做如下安裝
$ python -m pip install requests --user
錯誤的做法:同步請求
為了展現并行的好處,先看看同步的做法。我大概描述一下代碼将要做什麼。我們要執行一個能擷取csv檔案的GET請求,測量讀取其中文本的時間。
在說明一下,我們将用requests 庫裡 Session對象,執行GET請求。
首先,需要一個方法執行web請求:
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
# Return .csv data for future consumption
return data
這個函數使用Session對象和csv名字,執行網絡請求,然後傳回response裡的文本内容。
下面,我們需要一個函數周遊檔案清單,然後去請求,統計執行請求的時間。
from timeit import default_timer()
def get_data_synchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
with requests.Session() as session:
print("{0:<30} {1:>20}".format("File", "Completed at"))
# Set any session parameters here before calling `fetch`
# For instance, if you needed to set Headers or Authentication
# this can be done before starting the loop
total_start_time = default_timer()
for csv in csvs_to_fetch:
fetch(session, csv)
elapsed = default_timer() - total_start_time
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
這個函數建立了一個Session對象,然後周遊csvs_to_fetch裡的每個檔案。一旦fetch操作結束, 就将計算下載下傳時間,并以易讀的格式展示。
最後main函數調用:
def main():
# Simple for now
get_data_synchronous()
main()
同步執行的完整代碼
import requests
from timeit import default_timer
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
# Return .csv data for future consumption
return data
def get_data_synchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
with requests.Session() as session:
print("{0:<30} {1:>20}".format("File", "Completed at"))
# Set any session parameters here before calling `fetch`
# For instance, if you needed to set Headers or Authentication
# this can be done before starting the loop
total_start_time = default_timer()
for csv in csvs_to_fetch:
fetch(session, csv)
elapsed = default_timer() - total_start_time
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
def main():
# Simple for now
get_data_synchronous()
main()
結果:
多虧了Python3 asyncio, 通過它我們可以大幅度提高性能。
正确的解決辦法: 一次執行多個異步請求
為了能起作用,我們要先重做現有的代碼。從fetch開始:
import requests
from timeit import default_timer
# We'll need access to this variable later
START_TIME = default_timer()
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
# Now we will print how long it took to complete the operation from the
# `fetch` function itself
elapsed = default_timer() - START_TIME
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
return data
下一步, 改造get_data為異步函數
import asyncio
from timeit import default_timer
from concurrent.futures import ThreadPoolExecutor
async def get_data_asynchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
print("{0:<30} {1:>20}".format("File", "Completed at"))
# Note: max_workers is set to 10 simply for this example,
# you'll have to tweak with this number for your own projects
# as you see fit
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
# Initialize the event loop
loop = asyncio.get_event_loop()
# Set the START_TIME for the `fetch` function
START_TIME = default_timer()
# Use list comprehension to create a list of
# tasks to complete. The executor will run the `fetch`
# function for each csv in the csvs_to_fetch list
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, csv) # Allows us to pass in multiple arguments to `fetch`
)
for csv in csvs_to_fetch
]
# Initializes the tasks to run and awaits their results
for response in await asyncio.gather(*tasks):
pass
現在的代碼建立了多個線程,為每個csv檔案執行fetch函數進行下載下傳。
最後,我們的mian函數為了正确的初始化異步函數,也需要稍微做些修改。
def main():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
main()
再執行下,看看結果:
略微修改後,12個檔案的下載下傳時間3.43s vs 10.84s。下載下傳時間減少了近70%。
import requests
import asyncio
from concurrent.futures import ThreadPoolExecutor
from timeit import default_timer
START_TIME = default_timer()
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
elapsed = default_timer() - START_TIME
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
return data
async def get_data_asynchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
print("{0:<30} {1:>20}".format("File", "Completed at"))
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
loop = asyncio.get_event_loop()
START_TIME = default_timer()
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, csv) # Allows us to pass in multiple arguments to `fetch`
)
for csv in csvs_to_fetch
]
for response in await asyncio.gather(*tasks):
pass
def main():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
main()
希望你喜歡這篇文章,并将這些技術應用到必須使用舊版本Python的項目。
盡管Python沒有簡單的async / await 模式,但要取得類似的結果,也并不難。