天天看點

Python 阿裡雲OSS檔案上傳下載下傳與檔案删除及檢索示例

阿裡雲OSS檔案上傳下載下傳與檔案删除及檢索示例

實踐環境

運作環境:

Python 3.5.4

CentOS Linux release 7.4.1708 (Core)/Win10

需要安裝以下類庫:

pip3 install setuptools_rust1.1.2

pip3 install Crypto1.4.1 # Win10下,安裝後,需要更改 site-packages下crypto包名稱為Crypto

pip3 install cryptography3.3.2 # 注意,如果不指定版本,安裝oss2時會報錯:error: can't find Rust compiler

pip3 install oss22.15.0

上傳本地檔案到阿裡雲OSS示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import traceback
import os

# 批量上傳檔案到OSS
def upload_files(bucket, target_dir_path, exclusion_list=[]):
    oss_objects_path = []
    target_dir_path = os.path.normpath(target_dir_path).replace('\\', '/')
    for root, dirs, files in os.walk(target_dir_path):
        for file in files:
            target_file_path = os.path.normpath(os.path.join(root, file))
            target_file_relative_path = target_file_path.replace('\\', '/').replace(target_dir_path, '').lstrip('/')
            if target_file_relative_path in exclusion_list:
                continue
            object_path = 'f2b/artifacts/web-admin-react/%s' % target_file_relative_path
            upload_file(bucket, target_file_path, object_path)
            oss_objects_path.append(object_path)
    return oss_objects_path

# 上傳檔案到OSS
def upload_file(bucket, target_file_path, object_path):
    with open(target_file_path, 'rb') as fileobj:
        res = bucket.put_object(object_path, fileobj) # object_path為Object的完整路徑,路徑中不能包含Bucket名稱。
        if res.status != 200:
            raise Exception('upload %s error,status:%s' % (target_file_path, res.status))

if __name__ == '__main__':
    try:
        import oss2
        auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret')

        # oss2.Bucket(auth, endpoint, bucket_name)
        # endpoint填寫Bucket所在地域對應的endpoint,bucket_name為Bucket名稱。以華東1(杭州)為例,填寫為https://oss-cn-hangzhou.aliyuncs.com。
        bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket')

        oss_objects_path = []  # 存放上傳成功檔案對應的OSS對象相對路徑
        target_path = 'D:\\artifact-eb34ea94.tar.gz'
        if not os.path.exists(target_path):
            print('success:false,待上傳路徑(%s)不存在' %  target_path)
            exit(0)

        if os.path.isdir(target_path): # 如果為目錄
            oss_objects_path = upload_files(bucket, target_path)
        else:
            object_path = 'f2b/artifacts/web-admin-react/artifact-eb34ea94.tar.gz'
            upload_file(bucket, target_path, object_path)
            oss_objects_path.append(object_path)

        print(','.join(oss_objects_path))

    except Exception:
        print('success:false,%s' % traceback.format_exc())
           

參考連接配接:

https://help.aliyun.com/document_detail/88426.htm?spm=a2c4g.11186623.0.0.9e7e7dbbsOWOh6#t22317.html

https://help.aliyun.com/document_detail/31848.html

下載下傳阿裡雲OSS檔案對象到本地檔案示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import traceback

if __name__ == '__main__':
    try:
        import oss2
         auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret')

        # oss2.Bucket(auth, endpoint, bucket_name)
        # endpoint填寫Bucket所在地域對應的endpoint,bucket_name為Bucket名稱。以華東1(杭州)為例,填寫為https://oss-cn-hangzhou.aliyuncs.com。
        bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket')

        target_file_local_path = 'D:\\artifacts-17a86f.tar.gz' # 本地檔案路徑
        oss_object_path = 'f2b/artifacts/cloud-f2b-web-admin-react/artifact-eb34ea94.tar.gz'

        # bucket.get_object_to_file('object_path', 'object_local_path')
        # object_path 填寫Object完整路徑,完整路徑中不包含Bucket名稱,例如testfolder/exampleobject.txt。
        # object_local_path 下載下傳的Object在本地存儲的檔案路徑,形如 D:\\localpath\\examplefile.txt。如果指定路徑的檔案存在會覆寫,不存在則建立。
        try:
            res = bucket.get_object_to_file(oss_object_path, target_file_local_path)
            if res.status != 200:
                print('success:false,download fail, unknow exception, status:%s' % res.status)
        except Exception:
            print('success:false,%s' % traceback.format_exc())
    except Exception:
        print('success:false,%s' % traceback.format_exc())
           

參考連接配接:

https://help.aliyun.com/document_detail/88442.html

列舉指定字首的所有檔案

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import traceback

if __name__ == '__main__':
    try:
        import oss2
        auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret')

        bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket')

        result_file_list = []
        for obj in oss2.ObjectIteratorV2(bucket,  prefix='f2b/www/alpha/f2b/icec-cloud-f2b-mobile'):
            result_file_list.append(obj.key)
            print(obj.key)
        print(','.join(result_file_list))
    except Exception:
        print('success:false,%s' % traceback.format_exc())
           

參考連接配接:

https://help.aliyun.com/document_detail/88458.html

批量删除OSS對象

#!/usr/bin/env python
# -*- coding: utf-8 -*-

 
import sys
import traceback

if __name__ == '__main__':
    try:
        import oss2

        auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret')
        bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket')

        oss_object_path_list = ''.join(sys.argv[1:2]).split(',')

        index = 0
        oss_objects_to_delete = oss_object_path_list[index: index+1000] # API限制,每次最多删除1000個檔案
        while oss_objects_to_delete:
            result = bucket.batch_delete_objects(oss_object_path_list[index: index+1000])
            # 列印成功删除的檔案名。
            print(result.deleted_keys)
            print('批量删除以下OSS對象成功')
            print(''.join(result.deleted_keys))

            index += 1000
            oss_objects_to_delete = oss_object_path_list[index: index+1000]
    except Exception:
        print('success:false,%s' % traceback.format_exc())
           

參考連接配接:

https://help.aliyun.com/document_detail/88463.html

作者:授客

微信/QQ:1033553122

全國軟體測試QQ交流群:7156436

Git位址:https://gitee.com/ishouke

友情提示:限于時間倉促,文中可能存在錯誤,歡迎指正、評論!

作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額随意,您的支援将是我繼續創作的源動力,打賞後如有任何疑問,請聯系我!!!

           微信打賞                       

支付寶打賞                  全國軟體測試交流QQ群