天天看點

分段上傳_用戶端測試

1. 需求

2. 測試

// 改造而來,非原創
import os
import requests
import math
import traceback
from requests_toolbelt.multipart.encoder import MultipartEncoder

URL = "xxx"

def _slice( file_path, offset, size, file_name,  file_total_size):

    try:
        targetFile = open(file_path, 'rb')
        targetFile.seek(offset)

        m = MultipartEncoder(
            fields={
                    'file': (file_name, targetFile.read(size), 'application/octet-stream')
            }
                            )

        targetFile.close()

        headers = {"Content-Type": m.content_type,
                   "range": "{0}-{1}/{2}".format(offset, offset + size-1, file_total_size)
                   }
        response = requests.post(url=url,headers=headers, data=m)

        print(response.content)
        print(response)
        status = 0
    except Exception as e:
        traceback.print_exc()
        status = 1
  finally:
     return status 


def do_upload(file_path):
    file_total_size = os.path.getsize(file_path)
    chunk_size = 500
    chunks = math.ceil(file_total_size / chunk_size)
    file_path = file_path.replace('\\', '/')
    file_names = file_path.split('/')
    file_name = file_names[-1]

    for i in range(chunks):

        if i + 1 == chunks:
            current_size = file_total_size % chunk_size
        else:
            current_size = chunk_size

        _slice(file_path, i * chunk_size, current_size, file_name,  file_total_size)


if __name__ == "__main__":
   do_upload('xx.zip')      

開源永流傳