不知道各位童鞋們是否遇到過需要使用python下載下傳大檔案的需求,或者需要從一些網速很慢的網站上下載下傳檔案。如果你在實際下載下傳過程碰到下載下傳不穩定經常失敗的情況,本文的方法将會給你帶來一些解決思路和方案。
本文會給大家示範如何使用python對單個大檔案進行多線程下載下傳或協程形式下載下傳,基于此還提供了斷點續傳的實作思路,想使用python開發下載下傳器的朋友都可以拿本文作為參考,期待各位大佬的大作。
文章目錄
- ⭐單線程直接下載下傳⭐
- 🔥單線程流式下載下傳🔥
- ☀️單線程分片流式下載下傳☀️
- 😎多線程下載下傳大檔案😎
- 😱協程分片下載下傳大檔案😱
- 💙實作斷點續傳的思路💙
- 🚀總結🚀
下面我們以知乎視訊《
【AI混血】人工智能一鍵生成角色全身立繪?!【大谷紐約實驗室】》為例進行示範,連結:
https://www.zhihu.com/zvideo/1387830268154195968
下面首先看一下最基礎的直接下載下傳檔案的方法:
⭐單線程直接下載下傳⭐
隻需要在開發者工具的元素頁籤搜尋
video
标簽,即可找到視訊的MP4下載下傳連結:
當然這個操作也完全可以借助idm的嗅探功能擷取下載下傳連結。
idm下載下傳位址:https://www.lanzoui.com/ia51jqb
idm插件安裝位址:https://chrome.google.com/webstore/detail/idm-integration-module/ngpampappnmepgilojfohadhhmbhlaek
複制出該連結後即可直接下載下傳該視訊:
import requests
url = "https://vdn3.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?disable_local_cache=1&auth_key=1629560389-0-0-05874b492bec9be924c7da35aa619536&f=mp4&bu=http-com&expiration=1629560389&v=tx"
save_name = "單線程直接下載下傳.mp4"
with open(save_name, "wb") as f, requests.get(url) as res:
f.write(res.content)
下載下傳後視訊正常播放:
這種下載下傳方式對于知乎這種網絡快的網站自然是沒有問題,但是有些網絡不好的網站就很可能下載下傳中途網絡中斷,等了很久最終卻下載下傳失敗。下面我們看看相對穩定很多的下載下傳方法:
🔥單線程流式下載下傳🔥
使用方法是get方法指定參數
stream=True
:
save_name = "單線程流式下載下傳.mp4"
num = 0
with open(save_name, "wb") as f, requests.get(url, stream=True) as res:
for chunk in res.iter_content(chunk_size=64*1024):
if not chunk:
break
f.write(chunk)
num += 1
print(f"\r疊代次數:{num}", end=" ")
上述代碼,以64KB為一組進行流式資料傳輸,最終速度顯然比普通的下載下傳更快一些:
使用shutil庫可以簡化代碼編寫:
import requests
import shutil
url = "https://vdn6.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?pkey=AAVKDIgZuES50oUNCBq-nUpHoBmaVfAyhuDeSf9v2szjq3tuG83GsKISZBIU7-i2OyTlNR3IYADmdKI_hcRIouRc&c=avc.0.0&f=mp4&pu=da4bec50&bu=http-da4bec50&expiration=1659059784&v=ks6"
save_name = "單線程流式下載下傳2.mp4"
with open(save_name, "wb") as f, requests.get(url, stream=True) as res:
shutil.copyfileobj(res.raw, f)
☀️單線程分片流式下載下傳☀️
那麼我們如何做到檔案的斷點續傳呢?這時候就需要通過請求頭修改需要讀取的位元組範圍,當然也需要先檢查目标伺服器是否支援範圍請求。
如果請求一個資源時, HTTP響應中出現
Accept-Ranges
且其值不是none, 那麼伺服器支援範圍請求。
我們看看head這種請求方式:
res = requests.head(url)
head = res.headers
data = res.content
print(head)
print(data)
{'Server': 'NWSs', 'Date': 'Sat, 21 Aug 2021 15:25:19 GMT', 'Content-Type': 'video/mp4', 'Content-Length': '53825263', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=600', 'Expires': 'Sat, 21 Aug 2021 15:35:19 GMT', 'Last-Modified': 'Mon, 14 Jun 2021 10:06:22 GMT', 'X-NWS-LOG-UUID': '8fcf387e-7bab-44fb-8cf0-e96f5def3b1c', 'Access-Control-Allow-Origin': '*', 'Access-Control-Max-Age': '31536000', 'x-cdn-provider': 'tencent', 'X-Cache-Lookup': 'Hit From Disktank3, Hit From Inner Cluster', 'Accept-Ranges': 'bytes', 'ETag': '"bfd7937656505a8ab9a05bc373745a8b"', 'x-cos-hash-crc64ecma': '5809002519149120115', 'x-cos-replication-status': 'Complete', 'x-cos-request-id': 'NjEwOTM5MzZfNWM0ZTQ0MGJfNDc2MV8xZmQ3YmZj', 'x-cos-storage-class': 'STANDARD_IA', 'x-cos-version-id': 'MTg0NDUxMjA0MDg1MjcxMDk4MTA', 'X-Daa-Tunnel': 'hop_count=1'}
b''
可以看到head請求隻傳回的響應頭,未傳回任何資料。上面的響應頭中,
'Accept-Ranges': 'bytes'
代表可以使用位元組作為機關來定義請求範圍。
Content-Length
則代表該資源的完整大小。
于是我們可以通過
Content-Length
響應頭擷取檔案的大小:
filesize = int(res.headers['Content-Length'])
filesize
53825263
這就是目前檔案的總大小。
這時我們就可以根據總大小對檔案進行分片,例如總共分幾部分或者多大的部分作為一個分片。這裡我以個數進行分片,下面方法預設對檔案分成10個小部分:
def calc_divisional_range(filesize, chuck=10):
step = filesize//chuck
arr = list(range(0, filesize, step))
result = []
for i in range(len(arr)-1):
s_pos, e_pos = arr[i], arr[i+1]-1
result.append([s_pos, e_pos])
result[-1][-1] = filesize-1
return result
divisional_ranges = calc_divisional_range(filesize)
divisional_ranges
[[0, 5382525],
[5382526, 10765051],
[10765052, 16147577],
[16147578, 21530103],
[21530104, 26912629],
[26912630, 32295155],
[32295156, 37677681],
[37677682, 43060207],
[43060208, 48442733],
[48442734, 53825262]]
Range 請求頭的文法:
Range: bytes=start-end
Range頭域可以請求一個或者多個子範圍。例如:
- 表示頭500個位元組:bytes=0-499
- 表示第二個500位元組:bytes=500-999
- 表示最後500個位元組:bytes=-500
- 表示500位元組以後的範圍:bytes=500-
- 第一個和最後一個位元組:bytes=0-0,-1
- 同時指定幾個範圍:bytes=500-600,601-999
需要注意一下各種檔案模式的差別:
模式 | 描述 |
r | 預設模式:以隻讀文本形式打開檔案,檔案的指針将會放在檔案的開頭。 |
rb | 以二進制格式打開一個檔案用于隻讀,檔案指針将會放在檔案的開頭。 |
r+ | 打開一個檔案用于文本讀寫,檔案指針将會放在檔案的開頭。 |
rb+ | 以二進制格式打開一個檔案用于讀寫,檔案指針将會放在檔案的開頭。 |
w | 打開一個檔案隻用于寫入。如果該檔案已存在則将其覆寫。如果該檔案不存在,建立新檔案。 |
wb | 以二進制格式打開一個檔案隻用于文本寫入。如果該檔案已存在則将其覆寫。如果該檔案不存在,建立新檔案。 |
w+ | 打開一個檔案用于讀寫,如果該檔案已存在則将其覆寫。如果該檔案不存在,建立新檔案。 |
wb+ | 以二進制格式打開一個檔案用于讀寫。如果該檔案已存在則将其覆寫。如果該檔案不存在,建立新檔案。 |
a | 打開一個檔案用于文本追加。如果該檔案已存在,檔案指針将會放在檔案的結尾。如果該檔案不存在,建立新檔案進行寫入。 |
ab | 以二進制格式打開一個檔案用于追加。如果該檔案已存在,檔案指針将會放在檔案的結尾。如果該檔案不存在,建立新檔案進行寫入。 |
a+ | 打開一個檔案用于讀寫。如果該檔案已存在,檔案指針将會放在檔案的結尾。檔案打開時會是追加模式。如果該檔案不存在,建立新檔案用于讀寫。 |
ab+ | 以二進制格式打開一個檔案用于追加。如果該檔案已存在,檔案指針将會放在檔案的結尾。如果該檔案不存在,建立新檔案用于讀寫。 |
可以看到:
wb+
、
rb+
和
ab+
均以二進制格式讀寫檔案,但
wb+
會覆寫已經存在的檔案,隻有
rb+
或
ab+
能夠允許多個檔案句柄操作同一個檔案。最終我選擇打開後檔案指針在檔案開頭的
rb+
模式來完成多線程對同一檔案的讀寫。
首先需要先建立空檔案,保證
rb+
模式讀取檔案前,檔案已經存在:
save_name = "單線程分片流式下載下傳.mp4"
with open(save_name, "wb") as f:
pass
将範圍下載下傳的過程封裝到以下方法中:
def range_download(save_name, s_pos, e_pos):
headers = {"Range": f"bytes={s_pos}-{e_pos}"}
res = requests.get(url, headers=headers, stream=True)
with open(save_name, "rb+") as f:
f.seek(s_pos)
for chunk in res.iter_content(chunk_size=64*1024):
if chunk:
f.write(chunk)
然後就可以分片進行下載下傳:
for s_pos, e_pos in divisional_ranges:
range_download(save_name, s_pos, e_pos)
循環每一次都打開一個檔案句柄寫入指定範圍的資料。
基于此,我們就可以很簡單的轉換為多線程的實作:
😎多線程下載下傳大檔案😎
關于多線程、協程和多程序可以參考前面的示例:
- 單線程、多線程和協程的爬蟲性能對比
- Python的多程序并行計算庫與多程序爬蟲
結合前面已有代碼,實作多線程下載下傳的完整代碼為:
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def calc_divisional_range(filesize, chuck=10):
step = filesize//chuck
arr = list(range(0, filesize, step))
result = []
for i in range(len(arr)-1):
s_pos, e_pos = arr[i], arr[i+1]-1
result.append([s_pos, e_pos])
result[-1][-1] = filesize-1
return result
# 下載下傳方法
def range_download(save_name, s_pos, e_pos):
headers = {"Range": f"bytes={s_pos}-{e_pos}"}
res = requests.get(url, headers=headers, stream=True)
with open(save_name, "rb+") as f:
f.seek(s_pos)
for chunk in res.iter_content(chunk_size=64*1024):
if chunk:
f.write(chunk)
url = "https://vdn3.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?disable_local_cache=1&auth_key=1629707188-0-0-8e6fe4e1e29621664c71e2b95fc3bdb9&f=mp4&bu=http-com&expiration=1629707188&v=tx"
res = requests.head(url)
filesize = int(res.headers['Content-Length'])
divisional_ranges = calc_divisional_range(filesize)
save_name = "多線程流式下載下傳.mp4"
# 先建立空檔案
with open(save_name, "wb") as f:
pass
with ThreadPoolExecutor() as p:
futures = []
for s_pos, e_pos in divisional_ranges:
print(s_pos, e_pos)
futures.append(p.submit(range_download, save_name, s_pos, e_pos))
# 等待所有任務執行完畢
as_completed(futures)
0 5382525
5382526 10765051
10765052 16147577
16147578 21530103
21530104 26912629
26912630 32295155
32295156 37677681
37677682 43060207
43060208 48442733
48442734 53825262
這樣我們就實作了Python多線程下載下傳大檔案。
從結果看,四種下載下傳方法得到的檔案都完全一緻:
這樣我們就實作了大檔案的多線程下載下傳。
😱協程分片下載下傳大檔案😱
那麼能否以協程形式分片下載下傳大檔案呢?
在之前的協程爬蟲的文章中,我使用了
aiohttp
完成了資料的異常爬蟲,這次我們嘗試使用最近一個新的支援異步爬取的庫
httpx
,而且該庫支援http2.0能夠爬取http2.0協定的網頁。
要爬取http2.0的站點隻需要:
import httpx
client = httpx.Client(http2=True)
之後client對象與request庫的API幾乎完全一緻,隻需把之前代碼中使用的requests改成這個client對象即可。
協程不需要支援stream流式下載下傳,最終封裝的下載下傳方法為:
async def async_range_download(save_name, s_pos, e_pos):
headers = {"Range": f"bytes={s_pos}-{e_pos}"}
res = await client.get(url, headers=headers)
with open(save_name, "rb+") as f:
f.seek(s_pos)
f.write(res.content)
完整下載下傳代碼為:
import asyncio
import httpx
import requests
import nest_asyncio
nest_asyncio.apply()
def calc_divisional_range(filesize, chuck=10):
step = filesize//chuck
arr = list(range(0, filesize, step))
result = []
for i in range(len(arr)-1):
s_pos, e_pos = arr[i], arr[i+1]-1
result.append([s_pos, e_pos])
result[-1][-1] = filesize-1
return result
# 下載下傳方法
async def async_range_download(save_name, s_pos, e_pos):
headers = {"Range": f"bytes={s_pos}-{e_pos}"}
res = await client.get(url, headers=headers)
with open(save_name, "rb+") as f:
f.seek(s_pos)
f.write(res.content)
client = httpx.AsyncClient()
url = "https://vdn1.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?disable_local_cache=1&auth_key=1629718189-0-0-2e4eceee29e2d17a92c77fd49911d39a&f=mp4&bu=http-com&expiration=1629718189&v=hw"
res = httpx.head(url)
filesize = int(res.headers['Content-Length'])
divisional_ranges = calc_divisional_range(filesize, 20)
save_name = "協程分片下載下傳.mp4"
# 先建立空檔案
with open(save_name, "wb") as f:
pass
loop = asyncio.get_event_loop()
tasks = [async_range_download(save_name, s_pos, e_pos)
for s_pos, e_pos in divisional_ranges]
# 等待所有協程執行完畢
loop.run_until_complete(asyncio.wait(tasks))
上述代碼中:
import nest_asyncio
nest_asyncio.apply()
這兩行的目的是為了相容協程程式能夠在Jupyter notebook環境中運作,對于普通的py檔案中運作,可以直接删除。
最終下載下傳結果:
💙實作斷點續傳的思路💙
那麼如何通過python實作斷點續傳呢?
粗粒度的方法就是以分片為校驗機關,某個分片下載下傳失敗則重新下載下傳。
細粒度一點的方法是每個分片内部校驗已下載下傳的範圍,對于下載下傳失敗的分布,重新定位起始位置繼續下載下傳。