背景
在某些業務場景下,生成超大的日志檔案或者其他檔案, 這些檔案需要及時移出并 gz 壓縮儲存到 OSS,但是壓縮檔案可能會大于 3G 超出函數計算執行環境的最大記憶體限制, 本文提供流式解決這個問題的方案
- 函數計算配置VPC, 内網打通ecs
- OSS 和 函數計算在相同 region, 内網傳輸
示例代碼
依賴使用第三方庫
paramiko, 但是預設的庫在傳輸大檔案上有傳輸速率限制, 需要做如下改造, 同時構造 paramiko.SFTPClient 的時候需要設定好 window_size 和 max_packet_size 這兩個參數

import paramiko
import gzip
import oss2
import logging
import os
import time
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)
# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.22.3"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'
def handler(event, context):
start = time.time()
region = context.region
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)
scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
scp.connect(username=USR_NAME, password=USR_PWD)
window_size = 2 ** 28
max_packet_size = 2 ** 26
sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)
pos = 0
CHUNK_SIZE = 1024*1024
APPEND_SIZE = 128
LOG_FILE = "test-1.log"
DST_OBJ = 'dst/' + LOG_FILE + '.gz'
data_out = []
with sftp.open("/root/" + LOG_FILE, "r") as f:
while 1:
data=f.read(CHUNK_SIZE)
if not data:
if len(data_out) > 0 and len(data_out) < APPEND_SIZE:
result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
break
data_out.append(gzip.compress(data))
if len(data_out) == APPEND_SIZE:
upload_data = b"".join(data_out)
result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
pos += len(upload_data)
data_out = []
print("total time = ", time.time() - start)
return "OK"
這個方案, 雖然解決了函數計算記憶體限制, 但是對于某些超大檔案, 比如15G 以上的檔案, 10分鐘的時間限制又是一個limit
ImproveMent
OSS 支援分片上傳功能,那基于分片上傳,配合 FC 的彈性伸縮功能, 可以有如下方案:
示例代碼:
master:
# -*- coding: utf-8 -*-
import logging
import fc2
import oss2
import paramiko
import math
import json
import time
from threading import Thread
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)
SRC_LOG_FILE = "/root/huge.log"
DST_FILE = "jiahe/push_log_50000_5G.gz"
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'
SUB_LOG = 5*1024*1024*1024 # 交給每個work函數處理的日志大小
PART_SIZE = 16 * 1024 * 1024 # work 函數上傳分片的大小, 和work函數大小必須相同
SERVICE_NMAE = "fc_demo" # work函數所在的service, 最好在同一個service
SUB_FUNCTION_NAME = "worker" # work函數的名字
parts = []
def sub_gz(fcClient, subevent):
content=fcClient.invoke_function(SERVICE_NMAE, SUB_FUNCTION_NAME, json.dumps(subevent)).data
global parts
#print(content)
parts.extend(json.loads(content))
def handler(event, context):
#start = time.time()
global parts
parts = []
region = context.region
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)
scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
scp.connect(username=USR_NAME, password=USR_PWD)
sftp=paramiko.SFTPClient.from_transport(scp)
info = sftp.stat(SRC_LOG_FILE)
total_size = info.st_size
print(total_size)
sftp.close()
endpoint = "http://{0}.{1}-internal.fc.aliyuncs.com".format(context.account_id, context.region)
fcClient = fc2.Client(endpoint=endpoint,accessKeyID=creds.accessKeyId,accessKeySecret=creds.accessKeySecret, securityToken=creds.securityToken,Timeout=900)
threadNum = int(math.ceil(float(total_size)/SUB_LOG))
key = DST_FILE
upload_id = bucket.init_multipart_upload(key).upload_id
part_step = int(math.ceil(float(SUB_LOG)/PART_SIZE))
ts = []
left_size = total_size
for i in range(threadNum):
part_start = part_step * i + 1
size = SUB_LOG if SUB_LOG < left_size else left_size
subEvt = {
"src": SRC_LOG_FILE,
"dst": DST_FILE,
"offset": i * SUB_LOG,
"size": size,
"part_number": part_start,
"upload_id" : upload_id,
}
print(i, subEvt)
t = Thread(target=sub_gz, args=(fcClient, subEvt,))
left_size = left_size - SUB_LOG
t.start()
ts.append(t)
for t in ts:
t.join()
parts.sort(key=lambda k: (k.get('part_number', 0)))
#print(parts)
part_objs = []
for part in parts:
part_objs.append(oss2.models.PartInfo(part["part_number"], part["etag"], size = part["size"], part_crc = part["part_crc"]))
bucket.complete_multipart_upload(key, upload_id, part_objs)
#print(time.time() - start)
return "ok"
worker:
import paramiko
import gzip
import oss2
import logging
import os
import time
import json
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)
# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'
CHUNK_SIZE = 1024*1024
PART_SIZE = 16 * CHUNK_SIZE
def handler(event, context):
region = context.region
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)
scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
scp.connect(username=USR_NAME, password=USR_PWD)
window_size = 2 ** 30
max_packet_size = 2 ** 28
sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)
evt = json.loads(event)
src_log_file = evt['src']
offset = evt['offset']
size = evt['size']
dst_gz_file = evt['dst']
part_number = int(evt['part_number'])
upload_id = evt['upload_id']
key = dst_gz_file
data_out = []
partsInfo = []
with sftp.open(src_log_file, "r") as f:
f.seek(offset)
while 1:
data=f.read(CHUNK_SIZE)
cur = f.tell()
if not data or (cur > offset + size):
if data_out:
upload_data = b"".join(data_out)
size_to_upload = len(upload_data)
# 這裡有可能出現分片不足100K的情況, 比如你的檔案是 15G+1k, 這個時候出現1K漏單的情況或者即使大于100K但是壓縮之後小于100K的情況
# 對于日志檔案, 可以考慮填充點無效的字元在後面
if size_to_upload < 100 * 1024:
# fill_data=b"\n\n\n\n\n\n\n\n\n\nAliyunFCFill" + os.urandom(102400)
fill_data=b"\t\t\t\t\t\t\t\t\t\t\t\n"*1024*1024*5
upload_data += gzip.compress(fill_data) # fill_data 壓縮後的結果為>100K
size_to_upload = len(upload_data)
result = bucket.upload_part(key, upload_id, part_number, upload_data)
partsInfo.append({
"part_number":part_number,
"etag":result.etag,
"size":size_to_upload,
"part_crc":result.crc})
break
data_out.append(gzip.compress(data))
# 16M gz壓縮的結果生成一個分片, oss要求一個分片最小為100K(102400), 通常16M壓縮後的檔案應該大于100K
if (cur - offset) % PART_SIZE == 0:
upload_data = b"".join(data_out)
size_to_upload = len(upload_data)
result = bucket.upload_part(key, upload_id, part_number, upload_data)
partsInfo.append({
"part_number":part_number,
"etag":result.etag,
"size":size_to_upload,
"part_crc":result.crc})
part_number += 1
data_out = []
return json.dumps(partsInfo)