天天看點

Python實作多線程并發下載下傳大檔案(斷點續傳支援)

不知道各位童鞋們是否遇到過需要使用python下載下傳大檔案的需求,或者需要從一些網速很慢的網站上下載下傳檔案。如果你在實際下載下傳過程碰到下載下傳不穩定經常失敗的情況,本文的方法将會給你帶來一些解決思路和方案。

本文會給大家示範如何使用python對單個大檔案進行多線程下載下傳或協程形式下載下傳,基于此還提供了斷點續傳的實作思路,想使用python開發下載下傳器的朋友都可以拿本文作為參考,期待各位大佬的大作。

文章目錄

  • ​​⭐單線程直接下載下傳⭐​​
  • ​​🔥單線程流式下載下傳🔥​​
  • ​​☀️單線程分片流式下載下傳☀️​​
  • ​​😎多線程下載下傳大檔案😎​​
  • ​​😱協程分片下載下傳大檔案😱​​
  • ​​💙實作斷點續傳的思路💙​​
  • ​​🚀總結🚀​​

下面我們以知乎視訊《

​​【AI混血】人工智能一鍵生成角色全身立繪?!【大谷紐約實驗室】​​》為例進行示範,連結:

​​https://www.zhihu.com/zvideo/1387830268154195968​​

下面首先看一下最基礎的直接下載下傳檔案的方法:

⭐單線程直接下載下傳⭐

隻需要在開發者工具的元素頁籤搜尋​

​video​

​标簽,即可找到視訊的MP4下載下傳連結:

Python實作多線程并發下載下傳大檔案(斷點續傳支援)

當然這個操作也完全可以借助idm的嗅探功能擷取下載下傳連結。

idm下載下傳位址:https://www.lanzoui.com/ia51jqb

idm插件安裝位址:https://chrome.google.com/webstore/detail/idm-integration-module/ngpampappnmepgilojfohadhhmbhlaek

複制出該連結後即可直接下載下傳該視訊:

import requests

url = "https://vdn3.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?disable_local_cache=1&auth_key=1629560389-0-0-05874b492bec9be924c7da35aa619536&f=mp4&bu=http-com&expiration=1629560389&v=tx"
save_name = "單線程直接下載下傳.mp4"
with open(save_name, "wb") as f, requests.get(url) as res:
    f.write(res.content)      

下載下傳後視訊正常播放:

Python實作多線程并發下載下傳大檔案(斷點續傳支援)

這種下載下傳方式對于知乎這種網絡快的網站自然是沒有問題,但是有些網絡不好的網站就很可能下載下傳中途網絡中斷,等了很久最終卻下載下傳失敗。下面我們看看相對穩定很多的下載下傳方法:

🔥單線程流式下載下傳🔥

使用方法是get方法指定參數​

​stream=True​

​:

save_name = "單線程流式下載下傳.mp4"
num = 0
with open(save_name, "wb") as f, requests.get(url, stream=True) as res:
    for chunk in res.iter_content(chunk_size=64*1024):
        if not chunk:
            break
        f.write(chunk)
        num += 1
        print(f"\r疊代次數:{num}", end="         ")      

上述代碼,以64KB為一組進行流式資料傳輸,最終速度顯然比普通的下載下傳更快一些:

Python實作多線程并發下載下傳大檔案(斷點續傳支援)

使用shutil庫可以簡化代碼編寫:

import requests
import shutil

url = "https://vdn6.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?pkey=AAVKDIgZuES50oUNCBq-nUpHoBmaVfAyhuDeSf9v2szjq3tuG83GsKISZBIU7-i2OyTlNR3IYADmdKI_hcRIouRc&c=avc.0.0&f=mp4&pu=da4bec50&bu=http-da4bec50&expiration=1659059784&v=ks6"
save_name = "單線程流式下載下傳2.mp4"

with open(save_name, "wb") as f, requests.get(url, stream=True) as res:
    shutil.copyfileobj(res.raw, f)      

☀️單線程分片流式下載下傳☀️

那麼我們如何做到檔案的斷點續傳呢?這時候就需要通過請求頭修改需要讀取的位元組範圍,當然也需要先檢查目标伺服器是否支援範圍請求。

如果請求一個資源時, HTTP響應中出現​

​Accept-Ranges​

​且其值不是none, 那麼伺服器支援範圍請求。

我們看看head這種請求方式:

res = requests.head(url)
head = res.headers
data = res.content
print(head)
print(data)      
{'Server': 'NWSs', 'Date': 'Sat, 21 Aug 2021 15:25:19 GMT', 'Content-Type': 'video/mp4', 'Content-Length': '53825263', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=600', 'Expires': 'Sat, 21 Aug 2021 15:35:19 GMT', 'Last-Modified': 'Mon, 14 Jun 2021 10:06:22 GMT', 'X-NWS-LOG-UUID': '8fcf387e-7bab-44fb-8cf0-e96f5def3b1c', 'Access-Control-Allow-Origin': '*', 'Access-Control-Max-Age': '31536000', 'x-cdn-provider': 'tencent', 'X-Cache-Lookup': 'Hit From Disktank3, Hit From Inner Cluster', 'Accept-Ranges': 'bytes', 'ETag': '"bfd7937656505a8ab9a05bc373745a8b"', 'x-cos-hash-crc64ecma': '5809002519149120115', 'x-cos-replication-status': 'Complete', 'x-cos-request-id': 'NjEwOTM5MzZfNWM0ZTQ0MGJfNDc2MV8xZmQ3YmZj', 'x-cos-storage-class': 'STANDARD_IA', 'x-cos-version-id': 'MTg0NDUxMjA0MDg1MjcxMDk4MTA', 'X-Daa-Tunnel': 'hop_count=1'}
b''      

可以看到head請求隻傳回的響應頭,未傳回任何資料。上面的響應頭中,​

​'Accept-Ranges': 'bytes'​

​​ 代表可以使用位元組作為機關來定義請求範圍。​

​Content-Length​

​ 則代表該資源的完整大小。

于是我們可以通過​

​Content-Length​

​ 響應頭擷取檔案的大小:

filesize = int(res.headers['Content-Length'])
filesize      
53825263      

這就是目前檔案的總大小。

這時我們就可以根據總大小對檔案進行分片,例如總共分幾部分或者多大的部分作為一個分片。這裡我以個數進行分片,下面方法預設對檔案分成10個小部分:

def calc_divisional_range(filesize, chuck=10):
    step = filesize//chuck
    arr = list(range(0, filesize, step))
    result = []
    for i in range(len(arr)-1):
        s_pos, e_pos = arr[i], arr[i+1]-1
        result.append([s_pos, e_pos])
    result[-1][-1] = filesize-1
    return result


divisional_ranges = calc_divisional_range(filesize)
divisional_ranges      
[[0, 5382525],
 [5382526, 10765051],
 [10765052, 16147577],
 [16147578, 21530103],
 [21530104, 26912629],
 [26912630, 32295155],
 [32295156, 37677681],
 [37677682, 43060207],
 [43060208, 48442733],
 [48442734, 53825262]]      

Range 請求頭的文法:​

​Range: bytes=start-end​

Range頭域可以請求一個或者多個子範圍。例如:

  • 表示頭500個位元組:bytes=0-499
  • 表示第二個500位元組:bytes=500-999
  • 表示最後500個位元組:bytes=-500
  • 表示500位元組以後的範圍:bytes=500-
  • 第一個和最後一個位元組:bytes=0-0,-1
  • 同時指定幾個範圍:bytes=500-600,601-999

需要注意一下各種檔案模式的差別:

模式 描述
r 預設模式:以隻讀文本形式打開檔案,檔案的指針将會放在檔案的開頭。
rb 以二進制格式打開一個檔案用于隻讀,檔案指針将會放在檔案的開頭。
r+ 打開一個檔案用于文本讀寫,檔案指針将會放在檔案的開頭。
rb+ 以二進制格式打開一個檔案用于讀寫,檔案指針将會放在檔案的開頭。
w 打開一個檔案隻用于寫入。如果該檔案已存在則将其覆寫。如果該檔案不存在,建立新檔案。
wb 以二進制格式打開一個檔案隻用于文本寫入。如果該檔案已存在則将其覆寫。如果該檔案不存在,建立新檔案。
w+ 打開一個檔案用于讀寫,如果該檔案已存在則将其覆寫。如果該檔案不存在,建立新檔案。
wb+ 以二進制格式打開一個檔案用于讀寫。如果該檔案已存在則将其覆寫。如果該檔案不存在,建立新檔案。
a 打開一個檔案用于文本追加。如果該檔案已存在,檔案指針将會放在檔案的結尾。如果該檔案不存在,建立新檔案進行寫入。
ab 以二進制格式打開一個檔案用于追加。如果該檔案已存在,檔案指針将會放在檔案的結尾。如果該檔案不存在,建立新檔案進行寫入。
a+ 打開一個檔案用于讀寫。如果該檔案已存在,檔案指針将會放在檔案的結尾。檔案打開時會是追加模式。如果該檔案不存在,建立新檔案用于讀寫。
ab+ 以二進制格式打開一個檔案用于追加。如果該檔案已存在,檔案指針将會放在檔案的結尾。如果該檔案不存在,建立新檔案用于讀寫。

可以看到:

​wb+​

​​、​

​rb+​

​​和​

​ab+​

​​均以二進制格式讀寫檔案,但​

​wb+​

​​會覆寫已經存在的檔案,隻有​

​rb+​

​​或​

​ab+​

​​能夠允許多個檔案句柄操作同一個檔案。最終我選擇打開後檔案指針在檔案開頭的​

​rb+​

​模式來完成多線程對同一檔案的讀寫。

首先需要先建立空檔案,保證​

​rb+​

​模式讀取檔案前,檔案已經存在:

save_name = "單線程分片流式下載下傳.mp4"
with open(save_name, "wb") as f:
    pass      

将範圍下載下傳的過程封裝到以下方法中:

def range_download(save_name, s_pos, e_pos):
    headers = {"Range": f"bytes={s_pos}-{e_pos}"}
    res = requests.get(url, headers=headers, stream=True)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        for chunk in res.iter_content(chunk_size=64*1024):
            if chunk:
                f.write(chunk)      

然後就可以分片進行下載下傳:

for s_pos, e_pos in divisional_ranges:
    range_download(save_name, s_pos, e_pos)      

循環每一次都打開一個檔案句柄寫入指定範圍的資料。

基于此,我們就可以很簡單的轉換為多線程的實作:

😎多線程下載下傳大檔案😎

關于多線程、協程和多程序可以參考前面的示例:

  • ​​單線程、多線程和協程的爬蟲性能對比​​
  • ​​Python的多程序并行計算庫與多程序爬蟲​​

結合前面已有代碼,實作多線程下載下傳的完整代碼為:

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests


def calc_divisional_range(filesize, chuck=10):
    step = filesize//chuck
    arr = list(range(0, filesize, step))
    result = []
    for i in range(len(arr)-1):
        s_pos, e_pos = arr[i], arr[i+1]-1
        result.append([s_pos, e_pos])
    result[-1][-1] = filesize-1
    return result


# 下載下傳方法
def range_download(save_name, s_pos, e_pos):
    headers = {"Range": f"bytes={s_pos}-{e_pos}"}
    res = requests.get(url, headers=headers, stream=True)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        for chunk in res.iter_content(chunk_size=64*1024):
            if chunk:
                f.write(chunk)


url = "https://vdn3.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?disable_local_cache=1&auth_key=1629707188-0-0-8e6fe4e1e29621664c71e2b95fc3bdb9&f=mp4&bu=http-com&expiration=1629707188&v=tx"
res = requests.head(url)
filesize = int(res.headers['Content-Length'])
divisional_ranges = calc_divisional_range(filesize)


save_name = "多線程流式下載下傳.mp4"
# 先建立空檔案
with open(save_name, "wb") as f:
    pass
with ThreadPoolExecutor() as p:
    futures = []
    for s_pos, e_pos in divisional_ranges:
        print(s_pos, e_pos)
        futures.append(p.submit(range_download, save_name, s_pos, e_pos))
    # 等待所有任務執行完畢
    as_completed(futures)      
0 5382525
5382526 10765051
10765052 16147577
16147578 21530103
21530104 26912629
26912630 32295155
32295156 37677681
37677682 43060207
43060208 48442733
48442734 53825262      

這樣我們就實作了Python多線程下載下傳大檔案。

從結果看,四種下載下傳方法得到的檔案都完全一緻:

Python實作多線程并發下載下傳大檔案(斷點續傳支援)

這樣我們就實作了大檔案的多線程下載下傳。

😱協程分片下載下傳大檔案😱

那麼能否以協程形式分片下載下傳大檔案呢?

在之前的協程爬蟲的文章中,我使用了​

​aiohttp​

​​完成了資料的異常爬蟲,這次我們嘗試使用最近一個新的支援異步爬取的庫​

​httpx​

​,而且該庫支援http2.0能夠爬取http2.0協定的網頁。

要爬取http2.0的站點隻需要:

import httpx

client = httpx.Client(http2=True)

之後client對象與request庫的API幾乎完全一緻,隻需把之前代碼中使用的requests改成這個client對象即可。

協程不需要支援stream流式下載下傳,最終封裝的下載下傳方法為:

async def async_range_download(save_name, s_pos, e_pos):
    headers = {"Range": f"bytes={s_pos}-{e_pos}"}
    res = await client.get(url, headers=headers)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        f.write(res.content)      

完整下載下傳代碼為:

import asyncio
import httpx
import requests

import nest_asyncio
nest_asyncio.apply()


def calc_divisional_range(filesize, chuck=10):
    step = filesize//chuck
    arr = list(range(0, filesize, step))
    result = []
    for i in range(len(arr)-1):
        s_pos, e_pos = arr[i], arr[i+1]-1
        result.append([s_pos, e_pos])
    result[-1][-1] = filesize-1
    return result


# 下載下傳方法
async def async_range_download(save_name, s_pos, e_pos):
    headers = {"Range": f"bytes={s_pos}-{e_pos}"}
    res = await client.get(url, headers=headers)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        f.write(res.content)

client = httpx.AsyncClient()

url = "https://vdn1.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?disable_local_cache=1&auth_key=1629718189-0-0-2e4eceee29e2d17a92c77fd49911d39a&f=mp4&bu=http-com&expiration=1629718189&v=hw"
res = httpx.head(url)
filesize = int(res.headers['Content-Length'])
divisional_ranges = calc_divisional_range(filesize, 20)


save_name = "協程分片下載下傳.mp4"
# 先建立空檔案
with open(save_name, "wb") as f:
    pass

loop = asyncio.get_event_loop()
tasks = [async_range_download(save_name, s_pos, e_pos)
         for s_pos, e_pos in divisional_ranges]
# 等待所有協程執行完畢
loop.run_until_complete(asyncio.wait(tasks))      

上述代碼中:

import nest_asyncio
nest_asyncio.apply()      

這兩行的目的是為了相容協程程式能夠在Jupyter notebook環境中運作,對于普通的py檔案中運作,可以直接删除。

最終下載下傳結果:

Python實作多線程并發下載下傳大檔案(斷點續傳支援)

💙實作斷點續傳的思路💙

那麼如何通過python實作斷點續傳呢?

粗粒度的方法就是以分片為校驗機關,某個分片下載下傳失敗則重新下載下傳。

細粒度一點的方法是每個分片内部校驗已下載下傳的範圍,對于下載下傳失敗的分布,重新定位起始位置繼續下載下傳。

🚀總結🚀