天天看點

由淺入深SCF無伺服器雲函數實踐

近年來,網際網路服務從一開始的實體伺服器托管,虛拟機,容器,發展到現在的雲函數,逐漸無伺服器化。程式員逐漸聚焦于最核心的業務邏輯開發,解放了生産力,顯著提升服務上線效率。雲函數帶來了真正的計算服務。

歡迎大家前往雲+社群,擷取更多騰訊海量技術實踐幹貨哦~

作者:陳傑,騰訊雲架構平台部技術專家

近年來,網際網路服務從一開始的實體伺服器托管,虛拟機,容器,發展到現在的雲函數,逐漸無伺服器化,如下表所示。程式員逐漸聚焦于最核心的業務邏輯開發,解放了生産力,顯著提升了服務上線效率。

雲函數帶來了真正的計算服務,如下表所示,類比騰訊雲COS對象存儲,SCF以函數為機關封裝計算,按需排程執行,無須關心函數的自動擴縮容,故障容災等,無任何閑置成本。

雲函數給使用者帶來的價值主要4點:

  1. 簡化架構:函數粒度的微服務架構,使得系統的各個功能天然解耦,能像搭積木一樣組合自有及外部服務,實作所看即所得的背景服務;
  2. 簡化開發:無需關注底層硬體配置、OS,服務啟停、網絡收發,故障容災,服務擴縮容等,隻需寫最核心的業務邏輯,實作真正的代碼即服務;
  3. 簡化運維:無須關注服務部署,伺服器運維,安全管控,擴縮容配置等,且應用能無縫更新,實作無痛切換到DevOps模式。
  4. 減少支出:無閑置成本,僅對函數資源大小,執行時間,執行次數按需計費,相對雲主機平均5%~15% 的使用率,價格優勢明顯,實作了最徹底的按需計費。

我們團隊正在做彈性計算相關的事情,業務需求多,平台自身也需持續優化來支撐不斷擴大的營運規模,現在5人左右的小團隊要支撐100w核級别的計算營運,雲函數的出現,正好解了我們在人力上的燃眉之急,在這裡分享一下,希望能對大家有所啟發。

使用雲函數實作主動撥測工具

我們有一些低頻調用的http服務,比如buffer池空閑機器借還,上架等,這類服務使用者調用出錯時處理代價較大,要確定使用者調用時服務正常,需要有主動撥測的機制,先于使用者發現并修複問題,在雲函數出現之前,需要開發撥測工具,實作定時調用,并實作工具本身的故障容災能力,且要申請2台以上的虛拟機或容器釋出部署,既耗費人力,也耗費資源。應用雲函數後,我們隻需簡單的3步便可實作:

  1. 在SCF雲函數平台建立一個函數,如下圖所示。 
    由淺入深SCF無伺服器雲函數實踐
  2. 配置該函數為定時觸發,比如5分鐘觸發一次,如下圖所示,配置完成後服務即刻啟用。 
    由淺入深SCF無伺服器雲函數實踐
  3. 可以在日志頁面檢視函數運作狀态,當檢測到異常時,會調用告警工具發送告警微信。 
    由淺入深SCF無伺服器雲函數實踐

在主動撥測工具這個場景,我們從雲函數獲得的收益主要是快速成形,且無需營運維護,達到了既定目标同時,沒有額外增加營運成本。SCF無伺服器雲函數為每個使用者設定了免費額度,該應用場景幾乎肯定能包含在免費額度之内。

使用雲函數規整營運統計腳本

我們之前用Python開發了大量統計腳本,用來展現平台的營運概況,可用性,品質,趨勢等,由crontab驅動每日定期執行,随着時間累積及人員的更替,這些腳本部署管理逐漸混亂,比如想要修複某個資料時,可能不知道腳本部署在哪,或者某天伺服器故障,恢複統計腳本的正常營運比較麻煩,針對這些問題,我們利用雲函數簡單包裝便可解決,比如下面是一個統計營運中母機數的函數,直接import原腳本,在入口函數内調用即可。

# -*- coding: utf-8 -*-
from ctypes import *
import os
import base64
import json
import calculate_biz_host_num

def main_handler(event, context):
return calculate_biz_host_num.main()

if __name__ == '__main__':
'''just for test'''
event = {}
lambda_handler(event, 4)      

在規整營運統計腳本這個場景,我們從雲函數獲得的主要收益是快速幫助我們把散落到各台伺服器的腳本規整起來統一維護,且再也不用擔心統計腳本營運與伺服器故障問題。

使用雲函數快速嵌入圖檔類型識别功能

我們有一個圖檔壓縮服務,上傳時壓縮圖檔以降低存儲容量及下載下傳帶寬消耗,壓縮的效果要達到圖檔品質與壓縮比的均衡,在某些場景,比如微信朋友圈,存在一些廣告圖檔,使用者一般不會關注其細節,故可以提高壓縮比,犧牲品質以進一步的降低營運成本,而圖檔類型的識别計算複雜度高,無法在邏輯svr本地完成,傳統的辦法是實作一個圖檔類型識别服務,但實作該服務需要開發工作量較大,比如需要寫接入,邏輯server,實作容災分布,負載均衡等,且由于圖檔上傳有明顯的波峰波谷效應,還需要實作自動擴縮容,不僅如此,部署也較為複雜,難以滿足快速試錯的需求。

由淺入深SCF無伺服器雲函數實踐

應用雲函數後,我們隻需建立并實作一個類型識别函數,如下所示,在函數裡調用算法工程師實作的C++圖檔識别程式即可,無須關心容災分布,負載均衡,自動擴縮容及服務的部署與運維等。

# -*- coding: utf-8 -*-
from ctypes import *
import os
import pictype
import base64
import json

def main_handler(event, context):
str = pictype.cppmain(event["pic_data"])
jso = json.loads(str)
print jso["QRCode"]
return str

if __name__ == '__main__':
'''just for test'''
event = {}
imageFile = open("2qrcode.jpg","rb")
event["pic_data"] = base64.b64encode(imageFile.read())
imageFile.close()
lambda_handler(event, 4)      

在嵌入圖檔類型識别功能這個場景,我們從雲函數獲得的主要收益是使用極小的成本便快速擴充了現有平台的能力,短時間内便試錯驗證了依據圖檔類型選擇不同壓縮比在營運成本上的收益。

使用雲函數實作遊戲AI資料預處理

嘗到甜頭後,我們越來越有信心使用雲函數來實作更複雜的需求,正好目前在支援遊戲AI團隊做一些計算,典型的AI計算過程如下圖所示,模型訓練前的資料預處理耗費了大量的時間與計算資源。

由淺入深SCF無伺服器雲函數實踐

以王者榮耀的AI為例,如下圖所示,資料預處理一般分為兩步:

  1. Mapper計算:從cos讀取遊戲錄像檔案,提取英雄等級,血量,攻擊,法強,技能冷卻等特征,使用HDF5檔案儲存;
  2. Reducer計算:讀取標明範圍的HDF5檔案,shuffle處理随機化後,規整成每個檔案5120幀,再輸出供模型訓練使用;
由淺入深SCF無伺服器雲函數實踐

我們應用雲函數實作該預處理,隻需實作mapper/reducer計算函數,并配置合适的計算觸發規則即可,比如實作Mapper函數如下所示(省略若工具型函數代碼),并配置為cos上傳觸發,這樣當有錄像檔案上傳時,可自動調用mapper函數轉化為HDF5檔案。

# -*- coding utf-8 -*-
import os
import sys
import datetime
import traceback
import shutil
import commands
import cos_sdk


def main_handler(event, context):
res = map_caller(event, context)
if res == 0:
    return "succ"
else:
    return "fail"

def map_caller(event, context):
# Note: this is test account, change to own cos appid and secret_id
appid = '123443xxxx'
secret_id = 'QmFzZTY0IGlzIGEgZ2Vxxxx'
secret_key = 'AKIDZfbOA78asKUYBcXFrJD0a1ICvxxxx'
host = 'sz.cxxxxxx'
addr = '10xxxx'

bucket = event['bucket']
cos_input_file = event['input']
cos_output_key = event['output']
cos_file_name = cos_input_file.split('/', 1)[1]
print("cos_file_name: ", cos_file_name)

# step 1. Download .abs file from cos
cos = cos_sdk.CosHandler(appid, bucket, secret_id, secret_key, host, addr )
container_base_path = '/tmp/AITest/mapper'
container_input_path = '/tmp/AITest/mapper/input/'
container_output_path = '/tmp/AITest/mapper/output/'
cos_output_path = 'mapdata/'

try:
    if not os.path.exists(container_base_path):
        os.makedirs(container_base_path)
    if not os.path.exists(container_input_path):
        os.mkdir(container_input_path)
    if not os.path.exists(container_output_path):
        os.mkdir(container_output_path)
except:
    traceback.print_exc()
    return -1

ret = cos.download_file('/', cos_input_file, container_input_path, cos_file_name)
if not ret:
    print("Download file from cos Failed [%s]" % cos_file_name)
    return -1
print("Download file [%s] succ" % cos_file_name)

# step 2. transfer .abs file to .hdf5
ret = transfer_data(container_base_path, container_input_path, cos_file_name, container_output_path)
if not ret:
    print("transfer data fail")
    return -1

# step 3. upload .hdf5 file to cos
output_filename = get_output_file(cos_file_name, container_output_path)
if output_filename == "":
    return -1
print(container_output_path+output_filename)
if not os.path.exists(container_output_path+output_filename):
    print "output file not exist"
print(cos_output_path+output_filename)
ret = cos.upload_file(container_output_path+output_filename, cos_output_path+output_filename)
if not ret:
    return -1

## clean up result files
shutil.rmtree(container_output_path)
return 0

def transfer_data(base_path, file_path, file_name, output_path):
try:
    CurPath = '/var/user'
    if os.path.exists(base_path + '/transfer_script'):
        shutil.rmtree(base_path + '/transfer_script')
    shutil.copytree(CurPath + '/transfer_script', base_path + '/transfer_script')
    StartPath = base_path + "/transfer_script/5v5_vecmodel_tactics"
    InputFilePath = file_path + file_name
    OutputPath = output_pat
    sgameBinFile = base_path + "/transfer_script/log_transform/bin/transform/sgame_log_transform"
    os.chmod(sgameBinFile, 755)
    labelBinFile = StartPath + "/label"
    featureBinFile = StartPath + "/VecFeatureExtract"
    os.chmod(labelBinFile, 755)
    os.chmod(featureBinFile, 755)
    os.chdir(StartPath)
    cmd = 'sh start.sh ' + InputFilePath + ' ' + OutputPath
    (status, output) = commands.getstatusoutput(cmd)
    print("status: ", status)
    print("output: ", output)
except:
    traceback.print_exc()
    return False

if status == 0 and output == "pipeline success":
    return True
else:
    return False      

實作Reducer函數如下所示(省略若工具型函數代碼),亦可配置cos寫檔案觸發,當上傳檔案數達到一定數量且符合其他條件時,執行reducer函數的處理功能。

# -*- coding utf-8 -*-
import os
import sys
import traceback
import shutil
import commands
import re
import common
import cos_sdk

try:
import xml.etree.cElementTree as ET
except ImportError:
import xml.etree.ElementTree as ET


def main_handler(event, context):
res = reducer_caller(event, context)
if res == False:
    return "fail"
else:
    return "succ"

def reducer_caller(event, context):
# Note: this is test account, change to own cos appid and secret_id
appid = '12344321xxx'
secret_id = 'QmFzZTY0IGlzIGEgZ2xxxx'
secret_key = 'AKIDZfbOA78asKUYBcXFrJD0axxx'
host = 'sz.xxxx'
addr = '10.xxxx'

bucket = 'mapreduce'
container_base_path = '/tmp/AITest/reducer'
container_mapfile_path = '/tmp/AITest/reducer/mapdata/'
container_output_path = '/tmp/AITest/reducer/output/'
cos_mapdata_dir = '/'
cos_output_key = u'output/'

## AI shuffle config
p0_thread_num = '30'
p1_thread_num = '4'
ai_bucket = '4'
sample_num = '5120'

## init container directory
try:
    if not os.path.exists(container_base_path):
        os.makedirs(container_base_path)
    if not os.path.exists(container_mapfile_path):
        os.mkdir(container_mapfile_path)
    if not os.path.exists(container_output_path):
        os.mkdir(container_output_path)
except:
    traceback.print_exc()
    return False

cos = cos_sdk.CosHandler(appid, bucket, secret_id, secret_key, host, addr)

## step 1. get all mapper output data name (*.abs)
min_mapfiles = 40
mapfiles = get_mapfiles(cos, bucket)
if mapfiles == []:
    print("No exist data map file in cos, please run lambda mapper first")
    return False
elif len(mapfiles) < min_mapfiles:
    print("No enough map files in cos, at least %d map files can trigger shuffle process" % min_mapfiles)
    return False

## step 2. download mapper data from cos
for mapfile in mapfiles:
    download_ret = download_file(cos, cos_mapdata_dir, mapfile, container_mapfile_path)
    if download_ret != 0:
        return False   

# step 3. shuffle mapper input file
ret = shuffle_data(container_base_path, container_mapfile_path, container_output_path, p0_thread_num, p1_thread_num, ai_bucket, sample_num)
if not ret:
    print("shuffle data fail")
    return False

# step 4. upload .hdf5 file to cos
output_files = get_output_files(container_output_path)
if len(output_files) == 0:
    print("No output results in *.hdf5")
    return False

all_upload_ret = 0
print(output_files)
for output_file in output_files:
    upload_ret = upload_file(cos, container_output_path, output_file, cos_output_key)
    all_upload_ret += upload_ret
    if upload_ret != 0:
        print("Upload output file [%s] to cos failed" % output_file)
if all_upload_ret != 0:
    return False

## clean up result files
shutil.rmtree(container_output_path)
return "Finish shuffle data"


def get_mapfiles(cos_client, bucket):
status, ret_msg = cos_client.list_object(bucket)
if str(status)[0] != '2':
    print("Get map data file error")
    return -1

mapfiles = []
root = ET.fromstring(ret_msg)
for key in root.findall('Contents'):
    filename = key.find('Key').text
    if re.match(r'^mapdata/', filename):
        mapfiles.append(filename)
return mapfiles

def shuffle_data(container_base_path, container_mapfile_path, container_output_path, p0_thread_num, p1_thread_num, ai_bucket, sample_num):
try:
    CurPath = '/var/user'
    shuffle_tools = '/shuffle_all_tools/'
    if os.path.exists(container_base_path + shuffle_tools):
        shutil.rmtree(container_base_path + shuffle_tools)
    shutil.copytree(CurPath + shuffle_tools, container_base_path + shuffle_tools)

    os.chdir(container_base_path + shuffle_tools)
    cmd = 'sh king_shuffle_start.sh ' + container_mapfile_path + ' ' + p0_thread_num + ' ' + p1_thread_num + ' ' + container_output_path + ' ' + ai_bucket + ' ' + sample_num
    print cmd
    val = os.system(cmd)
    print val
    return True
except:
    traceback.print_exc()
    return False      

在遊戲AI訓練資料預處理這個場景,我們從雲函數獲得的主要收益是快速實作資料預處理服務,避免AI工程師陷入到考慮計算分布化,容災,擴容,伺服器故障處理等平台性事項中,能夠更專注于算法設計;另外AI計算資源耗費量巨大,雲函數實作了資源真正按需配置設定,無需保留大批伺服器造成資源浪費。

在應用SCF無伺服器雲函數實踐過程中,深刻體會到了其減少設計開發,營運維護工作量及在營運成本方面的優勢,如果開發新的功能,雲函數會成為我們團隊的首選,作為團隊架構師,應該承當好的一個責任是與時俱進的引入新生産力工具,持續推進團隊開發營運效率提升及持續的追求成本優化,由于雲函數在加速服務上線時間方面革命性的優勢及按需使用計費的特點,它可能會比docker容器更快被廣泛接受,誰能更快的擁抱雲函數,誰便能更快的建立研發與營運的優勢,歡迎大家試用騰訊雲-SCF無伺服器雲函數,一起更好的迎接并促進無伺服器時代的到來。

Q&A

Q:雲函數怎麼和其它業務系統內建?

A:在騰訊雲産品中,雲函數已經和COS、CMQ、API gateway、日志等打通,可直接配置事件觸發關系,另外使用者在函數代碼裡,可自行實作與其它業務系統內建的代碼,目前雲函數直接可通路公網服務,馬上可通路使用者VPC裡的服務。

Q:雲函數實際應用中,和一般寫法有什麼不同,有什麼缺點?

A:雲函數目前支援Python2.7、3.6,Node.js 6.10,Java 8等運作環境,可在本地開發編寫代碼上傳,也可在雲端直接編寫,對比一般程式的寫法,無須實作網絡監聽,故障容災,擴容,日志監控等相關代碼,極大的減少大家的代碼開發量,缺點是調試不如本地友善,比如不能直接用GDB等工具單步調試。

Q: 騰訊SCF對于有狀态服務是怎麼滾動更新的擴容的?

A: SCF一般用來承載無狀态的微服務,如果是有狀态的實作滾動更新,需要把狀态資料儲存到CMQ,COS等持久化存儲裡。

Q:騰訊SCF對于容器擴容怎麼做到不影響業務下擴容?

A: 騰訊SCF的函數調用由中控invoker子產品統一發起,invoker子產品知道每次函數調用在容器中的執行延時,執行結果等,且能判斷容器是否空閑等;擴容容器時,完成内部函數運作時環境初始化後,才标記容器為空閑狀态,可接收調用請求。

Q: 對雲函數不是太懂,雲函數與函數之間調用是通過http協定嗎 還是rpc或者其他方式?

A: 函數之間調用采用http協定,這是業内cloud function的通用做法,内部子產品之間使用rpc通信。

Q: 如果一個函數一個容器 那一個項目函數至少幾萬個吧 這樣豈不是要部署上萬個容器?

A: 函數被真實調用時,才會去配置設定容器,同時存在的容器數取決于有多少個函數正在被調用,調用的并發次數是多少,這是雲函數的最大價值之一:避免資源閑置。

Q: 代碼檔案是通過Dockerfile打包進容器的嗎?這樣建構會不會有點慢,像線上執行代碼這類的,感覺都很及時。

A: 通過Dockerfile打包成鏡像再下載下傳,确實耗時很長,是以實際運作代碼沒打包到鏡像裡,而是直接下發到母機,再将目錄挂到容器裡面。

相關閱讀

使用騰訊雲無伺服器雲函數(scf)分析天氣資料

使用騰訊雲 scf 雲函數壓縮 cos 對象存儲檔案

騰訊雲無伺服器雲函數架構精解

此文已由作者授權雲+社群釋出,轉載請注明原文出處

海量技術實踐經驗,盡在雲加社群!

https://cloud.tencent.com/developer

繼續閱讀