前言
阿裡雲函數計算(簡稱 FC )提供了一種事件驅動的計算模型。函數的執行是由事件驅動的,函數計算觸發器描述了一組規則,當某個事件滿足這些規則,事件源就會觸發相應的函數。函數計算已經将
對象存儲作為一個事件源用于觸發函數, 用于對存入oss的檔案進行自動化處理:

如上圖所示,阿裡雲對象存儲和函數計算無縫內建。您可以為各種類型的事件設定處理函數,當OSS系統捕獲到指定類型的事件後,會自動調用函數處理。例如,您可以設定函數來處理PutObject事件,當您調用OSS PutObject API上傳圖檔到OSS後,相關聯的函數會自動觸發來處理該圖檔。
在本文中,以使用者上傳超大壓縮檔案( zip 類型)到 oss, oss 系統捕獲 PutObjec/PostObject 的事件, 自動觸發函數執行, 函數将壓縮檔案解壓,并将對應的解壓檔案放在oss 指定的 bucket 的某個目錄下。 比如在bucket myzipfilebucket 的
source
目錄是上傳壓縮檔案的目錄,
processed
目錄是解壓後壓縮檔案存放的目錄
方法
在本文中,以python3 runtime 為例,一步一步慢慢展示fc的能力
簡單法
因為 FC 的臨時目錄的空間大小為512M,如果使用落盤,明顯是下下策, 不僅增加 io 時間, 而且 512M 的限制基本讓人敬而遠之了。是以我們把一切在記憶體中完成, 于是有下面的解決方案, FC 從内網中拉取 OSS 中的壓縮檔案,然後一切在記憶體中完成,将解壓後的檔案的 bytes 上傳到指定 bucket 中目錄, 于是有了下面的代碼:
# -*- coding: utf-8 -*-
import oss2, json
import zipfile
import os, io
import logging
import chardet
LOGGER = logging.getLogger()
def handler(event, context):
"""
The object from OSS will be decompressed automatically .
param: event: The OSS event json string. Including oss object uri and other information.
For detail info, please refer https://help.aliyun.com/document_detail/70140.html?spm=a2c4g.11186623.6.578.5eb8cc74AJCA9p#OSS
param: context: The function context, including credential and runtime info.
For detail info, please refer to https://help.aliyun.com/document_detail/56316.html#using-context
"""
evt_lst = json.loads(event)
creds = context.credentials
auth=oss2.StsAuth(
creds.access_key_id,
creds.access_key_secret,
creds.security_token)
evt = evt_lst['events'][0]
bucket_name = evt['oss']['bucket']['name']
endpoint = 'oss-' + evt['region'] + '-internal.aliyuncs.com'
bucket = oss2.Bucket(auth, endpoint, bucket_name)
object_name = evt['oss']['object']['key']
"""
When a source/ prefix object is placed in an OSS, it is hoped that the object will be decompressed and then stored in the OSS as processed/ prefixed.
For example, source/a.zip will be processed as processed/a/...
"source /", "processed/" can be changed according to the user's requirements."""
file_type = os.path.splitext(object_name)[1]
if file_type != ".zip":
raise RuntimeError('{} filetype is not zip'.format(object_name))
newKey = object_name.replace("source/", "processed/")
remote_stream = bucket.get_object(object_name)
if not remote_stream:
raise RuntimeError('failed to get oss object. bucket: %s. object: %s' % (bucket_name, object_name))
zip_buffer = io.BytesIO(remote_stream.read())
LOGGER.info('download object from oss success: {}'.format(object_name))
newKey = newKey.replace(".zip", "/")
with zipfile.ZipFile(zip_buffer) as zip_file:
for name in zip_file.namelist():
with zip_file.open(name) as file_obj:
# fix chinese directory name garbled problem
try:
name = name.encode(encoding='cp437')
except:
name = name.encode(encoding='utf-8')
detect = chardet.detect( (name*100)[0:100] )
confidence = detect["confidence"]
if confidence >= 0.8:
try:
name = name.decode(encoding=detect["encoding"])
except:
name = name.decode(encoding="gb2312")
else:
name = name.decode(encoding="gb2312")
bucket.put_object(newKey + name, file_obj.read())
- 優點
- 方法簡單,一切在記憶體中完成,适合較小的壓縮檔案解壓
- 不足
- 有記憶體的限制,假設壓縮檔案稍微大點,就很容易超出函數計算執行環境最大記憶體 3G 的限制
- 假設壓縮檔案大小差異很大,以最大壓縮檔案消耗的記憶體設定函數記憶體規格, 增加函數執行費用
流式法
完整的代碼示例請在附件下載下傳
注:附件的流式法的代碼中的入口函數 py 檔案可能沒有及時更新, 下載下傳代碼下來後, 直接copy 文章中的代碼更新入口函數即可
很快,我們自然想到不能完全把壓縮檔案的内容全部通過 FC 作為 中轉站來處理,如果我們先擷取壓縮檔案的 meta 資訊,比如我們先拉取壓縮檔案中的 meta 資訊的位元組流(很小), 分析出這個大的壓縮檔案裡面有哪些檔案,這些檔案對應到壓縮檔案位元組流中的起止位置資訊;通過這些資訊, 壓縮檔案裡面的每個檔案都能構造出一個
file-like object, 那麼在 FC 這邊, 隻需要将 get 的 file-like object 進行解壓,同時将解壓後的内容 as a file-like object put 到指定的 bucket 目錄。 完全沒必要把所有内容一下子拖到這裡統一加工。
-
改造 zipfile
在 python 中,我們繼續使用
zipfile 這個lib,看起來是這個庫支援參數是 file-like object, 但是這個庫要求 file-like object 具有 seek 和 tell 接口, oss get_object 獲得
類型的對象雖然是一個 file-like object, 但是沒有GetObjectResult
和seek
接口, 是以我們 zipfile 進行了一些改造:tell
- 同時,構造出能被 zipfile 支援的file-like object
# -*- coding: utf-8 -*- import oss2 from oss2 import utils, models import ossZipfile as zipfile zipfile_support_oss = zipfile # support upload to oss as a file-like object def make_crc_adapter(data, init_crc=0): data = utils.to_bytes(data) # file-like object if hasattr(data,'read'): return utils._FileLikeAdapter(data, crc_callback=utils.Crc64(init_crc)) utils.make_crc_adapter = make_crc_adapter class OssStreamFileLikeObject(object): def __init__(self, bucket, key): super(OssStreamFileLikeObject, self).__init__() self._bucket = bucket self._key = key self._meta_data = self._bucket.get_object_meta(self._key) @property def bucket(self): return self._bucket @property def key(self): return self._key @property def filesize(self): return self._meta_data.content_length def get_reader(self, begin, end): begin = begin if begin >= 0 else 0 end = end if end > 0 else self.filesize - 1 end = end if end < self.filesize else self.filesize - 1 begin = begin if begin < end else end return self._bucket.get_object(self._key, byte_range=(begin, end)) def get_content_bytes(self, begin, end): reader = self.get_reader(begin, end) return reader.read() def get_last_content_bytes(self, offset): return self.get_content_bytes(self.filesize-offset, self.filesize-1)
- 入口函數
# -*- coding: utf-8 -*-
'''
聲明:
這個函數針對檔案和檔案夾命名編碼是如下格式:
1. mac/linux 系統, 預設是utf-8
2. windows 系統, 預設是gb2312, 也可以是utf-8
對于其他編碼,我們這裡嘗試使用chardet這個庫進行編碼判斷, 但是這個并不能保證100% 正确,
建議使用者先調試函數,如果有必要改寫這個函數,并保證調試通過
函數最新進展可以關注該blog: https://yq.aliyun.com/articles/680958
Statement:
This function names and encodes files and folders as follows:
1. MAC/Linux system, default is utf-8
2. For Windows, the default is gb2312 or utf-8
For other encodings, we try to use the chardet library for coding judgment here,
but this is not guaranteed to be 100% correct.
If necessary to rewrite this function, and ensure that the debugging pass
'''
import helper
import oss2, json
import os
import logging
import chardet
"""
When a source/ prefix object is placed in an OSS, it is hoped that the object will be decompressed and then stored in the OSS as processed/ prefixed.
For example, source/a.zip will be processed as processed/a/...
"Source /", "processed/" can be changed according to the user's requirements.
detail: https://yq.aliyun.com/articles/680958
"""
# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)
LOGGER = logging.getLogger()
def handler(event, context):
"""
The object from OSS will be decompressed automatically .
param: event: The OSS event json string. Including oss object uri and other information.
For detail info, please refer https://help.aliyun.com/document_detail/70140.html?spm=a2c4g.11186623.6.578.5eb8cc74AJCA9p#OSS
param: context: The function context, including credential and runtime info.
For detail info, please refer to https://help.aliyun.com/document_detail/56316.html#using-context
"""
evt_lst = json.loads(event)
creds = context.credentials
auth=oss2.StsAuth(
creds.access_key_id,
creds.access_key_secret,
creds.security_token)
evt = evt_lst['events'][0]
bucket_name = evt['oss']['bucket']['name']
endpoint = 'oss-' + evt['region'] + '-internal.aliyuncs.com'
bucket = oss2.Bucket(auth, endpoint, bucket_name)
object_name = evt['oss']['object']['key']
if "ObjectCreated:PutSymlink" == evt['eventName']:
object_name = bucket.get_symlink(object_name).target_key
if object_name == "":
raise RuntimeError('{} is invalid symlink file'.format(evt['oss']['object']['key']))
file_type = os.path.splitext(object_name)[1]
if file_type != ".zip":
raise RuntimeError('{} filetype is not zip'.format(object_name))
LOGGER.info("start to decompress zip file = {}".format(object_name))
lst = object_name.split("/")
zip_name = lst[-1]
PROCESSED_DIR = os.environ.get("PROCESSED_DIR", "")
if PROCESSED_DIR and PROCESSED_DIR[-1] != "/":
PROCESSED_DIR += "/"
newKey = PROCESSED_DIR + zip_name
zip_fp = helper.OssStreamFileLikeObject(bucket, object_name)
newKey = newKey.replace(".zip", "/")
with helper.zipfile_support_oss.ZipFile(zip_fp) as zip_file:
for name in zip_file.namelist():
with zip_file.open(name) as file_obj:
try:
name = name.encode(encoding='cp437')
except:
name = name.encode(encoding='utf-8')
# the string to be detect is long enough, the detection result accuracy is higher
detect = chardet.detect( (name*100)[0:100] )
confidence = detect["confidence"]
if confidence > 0.8:
try:
name = name.decode(encoding=detect["encoding"])
except:
name = name.decode(encoding='gb2312')
else:
name = name.decode(encoding="gb2312")
bucket.put_object(newKey + name, file_obj)
-
- 可以突破記憶體限制,小記憶體的函數也可以幹大壓縮檔案檔案的自動解壓存放工作
函數計算實作 oss 上傳超大 zip 壓縮檔案的自動解壓處理
- 可以突破記憶體限制,小記憶體的函數也可以幹大壓縮檔案檔案的自動解壓存放工作
-
- 對于較小的壓縮檔案,不如簡單的方法來的簡潔和直接
總結
本文針對 oss 上傳 zip 壓縮檔案進行自動解壓進行了一些方案的探讨與嘗試, 分析了兩種方案各自的優點和不足。 但是這兩種方法都有一個共同的限制,就是函數執行時間最大為15分鐘,如果壓縮檔案足夠大和足夠複雜(裡面有很多的小檔案), 需要合理評估時間, 執行時間 = 解壓時間(純cpu計算,這個跟設定函數記憶體規格大小成線性關系,需要合理設定) + 網絡io 時間(走内網); 一般來說本地(兩核3G的配置)解壓時間在10分鐘以内的, FC 應該都有能力處理。