天天看點

python運作配置不正确_如何使用Python3.5并行執行多個web請求(不适用aiohttp

閱讀時間 8 分鐘

> 作者的生産環境剛從2.6更新到3.5.0,但滿足不了aiohttp的最低版本需求。是以在有了這篇文章,如何改造代碼,充分利用python3.5 asyncio提供的異步功能。原文連結

近日IT部門最終将我們工作環境的分布式Python版本更新到了3.5.0。這對從2.6版本來說是一次巨大的更新,但依然有些遺憾。3.5.0 不能滿足一些庫的最小版本需求,這其中就包括aiohttp。

盡管有這些限制,我依然需要寫腳本從我們的API擷取數以百計的csv檔案,然後處理資料。Python本身并不想NodeJS那樣基于事件驅動和原生異步,但這并不妨礙Python 也能實作一樣的功能。這篇文檔将詳細介紹我如何學習異步操作,并列出它的優勢。

聲明: 如果你有更高的版本(3.5.2+),強烈推薦你使用aiohttp。這是個非常健壯的庫, 特别适合解決這類問題。網上也有很多關于她的教程。

假設

作如下假設:

> * 熟悉Python和它的文法

> * 熟悉基礎的網絡請求

> * 知道異步執行的概念

開始

安裝requests

$ python -m pip install requests

沒有權限可以做如下安裝

$ python -m pip install requests --user

錯誤的做法:同步請求

為了展現并行的好處,先看看同步的做法。我大概描述一下代碼将要做什麼。我們要執行一個能擷取csv檔案的GET請求,測量讀取其中文本的時間。

在說明一下,我們将用requests 庫裡 Session對象,執行GET請求。

首先,需要一個方法執行web請求:

def fetch(session, csv):

base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"

with session.get(base_url + csv) as response:

data = response.text

if response.status_code != 200:

print("FAILURE::{0}".format(url))

# Return .csv data for future consumption

return data

這個函數使用Session對象和csv名字,執行網絡請求,然後傳回response裡的文本内容。

下面,我們需要一個函數周遊檔案清單,然後去請求,統計執行請求的時間。

from timeit import default_timer()

def get_data_synchronous():

csvs_to_fetch = [

"ford_escort.csv",

"cities.csv",

"hw_25000.csv",

"mlb_teams_2012.csv",

"nile.csv",

"homes.csv",

"hooke.csv",

"lead_shot.csv",

"news_decline.csv",

"snakes_count_10000.csv",

"trees.csv",

"zillow.csv"

]

with requests.Session() as session:

print("{0:<30} {1:>20}".format("File", "Completed at"))

# Set any session parameters here before calling `fetch`

# For instance, if you needed to set Headers or Authentication

# this can be done before starting the loop

total_start_time = default_timer()

for csv in csvs_to_fetch:

fetch(session, csv)

elapsed = default_timer() - total_start_time

time_completed_at = "{:5.2f}s".format(elapsed)

print("{0:<30} {1:>20}".format(csv, time_completed_at))

這個函數建立了一個Session對象,然後周遊csvs_to_fetch裡的每個檔案。一旦fetch操作結束, 就将計算下載下傳時間,并以易讀的格式展示。

最後main函數調用:

def main():

# Simple for now

get_data_synchronous()

main()

同步執行的完整代碼

import requests

from timeit import default_timer

def fetch(session, csv):

base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"

with session.get(base_url + csv) as response:

data = response.text

if response.status_code != 200:

print("FAILURE::{0}".format(url))

# Return .csv data for future consumption

return data

def get_data_synchronous():

csvs_to_fetch = [

"ford_escort.csv",

"cities.csv",

"hw_25000.csv",

"mlb_teams_2012.csv",

"nile.csv",

"homes.csv",

"hooke.csv",

"lead_shot.csv",

"news_decline.csv",

"snakes_count_10000.csv",

"trees.csv",

"zillow.csv"

]

with requests.Session() as session:

print("{0:<30} {1:>20}".format("File", "Completed at"))

# Set any session parameters here before calling `fetch`

# For instance, if you needed to set Headers or Authentication

# this can be done before starting the loop

total_start_time = default_timer()

for csv in csvs_to_fetch:

fetch(session, csv)

elapsed = default_timer() - total_start_time

time_completed_at = "{:5.2f}s".format(elapsed)

print("{0:<30} {1:>20}".format(csv, time_completed_at))

def main():

# Simple for now

get_data_synchronous()

main()

結果:

python運作配置不正确_如何使用Python3.5并行執行多個web請求(不适用aiohttp

多虧了Python3 asyncio, 通過它我們可以大幅度提高性能。

正确的解決辦法: 一次執行多個異步請求

為了能起作用,我們要先重做現有的代碼。從fetch開始:

import requests

from timeit import default_timer

# We'll need access to this variable later

START_TIME = default_timer()

def fetch(session, csv):

base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"

with session.get(base_url + csv) as response:

data = response.text

if response.status_code != 200:

print("FAILURE::{0}".format(url))

# Now we will print how long it took to complete the operation from the

# `fetch` function itself

elapsed = default_timer() - START_TIME

time_completed_at = "{:5.2f}s".format(elapsed)

print("{0:<30} {1:>20}".format(csv, time_completed_at))

return data

下一步, 改造get_data為異步函數

import asyncio

from timeit import default_timer

from concurrent.futures import ThreadPoolExecutor

async def get_data_asynchronous():

csvs_to_fetch = [

"ford_escort.csv",

"cities.csv",

"hw_25000.csv",

"mlb_teams_2012.csv",

"nile.csv",

"homes.csv",

"hooke.csv",

"lead_shot.csv",

"news_decline.csv",

"snakes_count_10000.csv",

"trees.csv",

"zillow.csv"

]

print("{0:<30} {1:>20}".format("File", "Completed at"))

# Note: max_workers is set to 10 simply for this example,

# you'll have to tweak with this number for your own projects

# as you see fit

with ThreadPoolExecutor(max_workers=10) as executor:

with requests.Session() as session:

# Set any session parameters here before calling `fetch`

# Initialize the event loop

loop = asyncio.get_event_loop()

# Set the START_TIME for the `fetch` function

START_TIME = default_timer()

# Use list comprehension to create a list of

# tasks to complete. The executor will run the `fetch`

# function for each csv in the csvs_to_fetch list

tasks = [

loop.run_in_executor(

executor,

fetch,

*(session, csv) # Allows us to pass in multiple arguments to `fetch`

)

for csv in csvs_to_fetch

]

# Initializes the tasks to run and awaits their results

for response in await asyncio.gather(*tasks):

pass

現在的代碼建立了多個線程,為每個csv檔案執行fetch函數進行下載下傳。

最後,我們的mian函數為了正确的初始化異步函數,也需要稍微做些修改。

def main():

loop = asyncio.get_event_loop()

future = asyncio.ensure_future(get_data_asynchronous())

loop.run_until_complete(future)

main()

再執行下,看看結果:

python運作配置不正确_如何使用Python3.5并行執行多個web請求(不适用aiohttp

略微修改後,12個檔案的下載下傳時間3.43s vs 10.84s。下載下傳時間減少了近70%。

import requests

import asyncio

from concurrent.futures import ThreadPoolExecutor

from timeit import default_timer

START_TIME = default_timer()

def fetch(session, csv):

base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"

with session.get(base_url + csv) as response:

data = response.text

if response.status_code != 200:

print("FAILURE::{0}".format(url))

elapsed = default_timer() - START_TIME

time_completed_at = "{:5.2f}s".format(elapsed)

print("{0:<30} {1:>20}".format(csv, time_completed_at))

return data

async def get_data_asynchronous():

csvs_to_fetch = [

"ford_escort.csv",

"cities.csv",

"hw_25000.csv",

"mlb_teams_2012.csv",

"nile.csv",

"homes.csv",

"hooke.csv",

"lead_shot.csv",

"news_decline.csv",

"snakes_count_10000.csv",

"trees.csv",

"zillow.csv"

]

print("{0:<30} {1:>20}".format("File", "Completed at"))

with ThreadPoolExecutor(max_workers=10) as executor:

with requests.Session() as session:

# Set any session parameters here before calling `fetch`

loop = asyncio.get_event_loop()

START_TIME = default_timer()

tasks = [

loop.run_in_executor(

executor,

fetch,

*(session, csv) # Allows us to pass in multiple arguments to `fetch`

)

for csv in csvs_to_fetch

]

for response in await asyncio.gather(*tasks):

pass

def main():

loop = asyncio.get_event_loop()

future = asyncio.ensure_future(get_data_asynchronous())

loop.run_until_complete(future)

main()

希望你喜歡這篇文章,并将這些技術應用到必須使用舊版本Python的項目。

盡管Python沒有簡單的async / await 模式,但要取得類似的結果,也并不難。