
作者 | 計緣
來源|
阿裡巴巴雲原生公衆号衆所周知,遊戲行業在當今的網際網路行業中算是一棵常青樹。在疫情之前的 2019 年,中國遊戲市場營收規模約 2884.8 億元,同比增長 17.1%。2020 年因為疫情,遊戲行業更是突飛猛進。玩遊戲本就是中國網民最普遍的娛樂方式之一,疫情期間更甚。據不完全統計,截至 2019 年,中國移動遊戲使用者規模約 6.6 億人,占中國總網民規模 8.47 億的 77.92%,可見遊戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習以為常的一部分。
對于玩家而言,市面上的遊戲數量多如牛毛,那麼玩家如何能發現和認知到一款遊戲,并且持續的玩下去恐怕是所有遊戲廠商需要思考的問題。加之 2018 年遊戲版号停發事件,遊戲廠商更加珍惜每一個已獲得版号的遊戲産品,是以這也使得“深度打磨産品品質”和“提高營運精細程度”這兩個遊戲産業發展方向成為廣大遊戲廠商的發展思路,無論是新遊戲還是老遊戲都在努力落實這兩點:
- 新遊戲:面向玩家需要提供更充足的推廣資源和更完整的遊戲内容。
- 老遊戲:通過使用者行為分析,投入更多的精力和成本,制作更優質的版本内容。
這裡我們重點來看新遊戲。一家遊戲企業辛辛苦苦研發三年,等着新遊戲發售時一飛沖天。那麼問題來了,新遊戲如何被廣大玩家看到?
首先來看看遊戲行業公司的分類:
- 遊戲研發商:研發遊戲的公司,生産和制作遊戲内容。比如王者榮耀的所有英雄設計、遊戲戰鬥場景、戰鬥邏輯等,全部由遊戲研發公司提供。
- 遊戲發行商:遊戲發行商的主要工作分三大塊:市場工作、營運工作、客服工作。遊戲發行商把控遊戲命脈,市場工作核心是導入玩家,營運工作核心是将使用者價值最大化、賺取更多利益。
- 遊戲平台/管道商:遊戲平台和管道商的核心目的就是曝光遊戲,讓盡量多的人能發現你的遊戲。
這三種類型的業務,有專注于其中某一領域的獨立公司,也有能承接全部業務的公司,但無論那一種,這三者之間的關系是不會變的:
是以不難了解,想讓更多的玩家看到你的遊戲,遊戲發行和營運是關鍵。通俗來講,如果你的遊戲出現在目前所有大家熟知的平台廣告中,那麼最起碼遊戲的新使用者注冊數量是很可觀的。是以這就引入了一個關鍵詞:買量。
根據資料顯示,2019 年月均買量手遊數達 6000+ 款,而 2018 年僅為 4200 款。另一方面,随着抖音、微網誌等超級 APP 在遊戲買量市場的資源傾斜,也助推手遊買量的效果和效率有所提升,遊戲廠商更願意使用買量的方式來吸引使用者。
但需要注意的是,在遊戲買量的精準化程度不斷提高的同時,買量的成本也在節節攀升,唯有合理配置買量、管道與整合營銷之間的關系,才能将宣發資源發揮到最大的效果。
通俗來講,買量其實就是在各大主流平台投放廣告,廣大使用者看到遊戲廣告後,有可能會點選廣告,然後進入遊戲廠商的宣傳頁面,同時會采集使用者的一些資訊,然後遊戲廠商對采集到的使用者資訊進行大資料分析,進行進一步的定向推廣。
遊戲營運核心訴求
遊戲廠商花錢買量,換來的使用者資訊以及新使用者注冊資訊是為持續的遊戲營運服務的,那麼這個場景的核心訴求就是采集使用者資訊的完整性。
比如說,某遊戲廠商一天花 5000w 投放廣告,在某平台某時段産生了每秒 1w 次的廣告點選率,那麼在這個時段内每一個點選廣告的使用者資訊要完整的被采集到,然後入庫進行後續分析。這就對資料采集系統提出了很高的要求。
這其中,最核心的一點就是系統暴露接口的環節要能夠平穩承載買量期間不定時的流量脈沖。在買量期間,遊戲廠商通常會在多個平台投放廣告,每個平台投放廣告的時間是不一樣的,是以就會出現全天不定時的流量脈沖現象。如果這個環節出現問題,那麼相當于買量的錢就打水漂了。
資料采集系統傳統架構
上圖是一個相對傳統的資料采集系統架構,最關鍵的就是暴露 HTTP 接口回傳資料這部分,這部分如果出問題,那麼采集資料的鍊路就斷了。但這部分往往會面臨兩個挑戰:
- 當流量脈沖來的時候,這部分是否可以快速擴容以應對流量沖擊。
- 遊戲營運具備潮汐特性,并非天天都在進行,這就需要考慮如何優化資源使用率。
通常情況下,在遊戲有營運活動之前,會提前通知運維同學,對這個環節的服務增加節點,但要增加多少其實是無法預估的,隻能大概拍一個數字。這是在傳統架構下經常會出現的場景,這就會導緻兩個問題:
- 流量太大,節點加少了,導緻一部分流量的資料沒有采集到。
- 流量沒有預期那麼大,節點加多了,導緻資源浪費。
資料采集系統 Serverless 架構
我們可以通過函數計算 FC 來取代傳統架構中暴露 HTTP 回傳資料這部分,進而完美解決傳統架構中存在問題,參考文章:《
資源成本雙優化!看 Serverless 颠覆程式設計教育的創新實踐》。
先來看架構圖:
傳統架構中的兩個問題均可以通過函數計算百毫秒彈性的特性來解決。我們并不需要去估算營銷活動會帶來多大的流量,也不需要去擔心和考慮對資料采集系統的性能,運維同學更不需要提前預備 ECS。
因為函數計算的極緻彈性特性,當沒有買量、沒有營銷活動的時候,函數計算的運作執行個體是零。有買量活動時,在流量脈沖的情況下,函數計算會快速拉起執行個體來承載流量壓力;當流量減少時,函數計算會及時釋放沒有請求的執行個體進行縮容。是以 Serverless 架構帶來的優勢有以下三點:
- 無需運維介入,研發同學就可以很快的搭建出來。
- 無論流量大小,均可以平穩的承接。
- 函數計算拉起的執行個體數量可以緊貼流量大小的曲線,做到資源使用率最優化,再加上按量計費的模式,可以最大程度優化成本。
架構解析
從上面的架構圖可以看到,整個采集資料階段,分了兩個函數來實作,第一個函數的作用是單純的暴露 HTTP 接口接收資料,第二個函數用于處理資料,然後将資料發送至消息隊列 Kafka 和資料庫 RDS。
1. 接收資料函數
我們打開函數計算控制台,建立一個函數:
- 函數類型:HTTP(即觸發器為 HTTP)
- 函數名稱:receiveData
- 運作環境:Python3
- 函數執行個體類型:彈性執行個體
- 函數執行記憶體:512MB
- 函數運作逾時時間:60 秒
- 函數單執行個體并發度:1
- 觸發器類型:HTTP 觸發器
- 觸發器名稱:defaultTrigger
- 認證方式:anonymous(即無需認證)
- 請求方式:GET,POST
建立好函數之後,我們通過線上編輯器編寫代碼:
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
HELLO_WORLD = b'Hello world!\n'
def handler(environ, start_response):
logger = logging.getLogger()
context = environ['fc.context']
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith('HTTP_'):
# process custom request headers
pass
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
# 接收回傳的資料
request_body = environ['wsgi.input'].read(request_body_size)
request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
request_body_obj = json.loads(request_body_str)
logger.info(request_body_obj["action"])
logger.info(request_body_obj["articleAuthorId"])
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [HELLO_WORLD]
此時的代碼非常簡單,就是接收使用者傳來的參數,我們可以調用接口進行驗證:
可以在函數的日志查詢中看到此次調用的日志:
同時,我們也可以檢視函數的鍊路追蹤來分析每一個步驟的調用耗時,比如函數接到請求→冷啟動(無活躍執行個體時)→準備代碼→執行初始化方法→執行入口函數邏輯這個過程:
從調用鍊路圖中可以看到,剛才的那次請求包含了冷啟動的時間,因為當時沒有活躍執行個體,整個過程耗時 418 毫秒,真正執行入口函數代碼的時間為 8 毫秒。
當再次調用接口時,可以看到就直接執行了入口函數的邏輯,因為此時已經有執行個體在運作,整個耗時隻有 2.3 毫秒:
2. 處理資料的函數
第一個函數是通過在函數計算控制台在界面上建立的,選擇了運作環境是 Python3,我們可以在官方文檔中檢視預置的 Python3 運作環境内置了哪些子產品,因為第二個函數要操作 Kafka 和 RDS,是以需要我們确認對應的子產品。
從文檔中可以看到,内置的子產品中包含 RDS 的 SDK 子產品,但是沒有 Kafka 的 SDK 子產品,此時就需要我們手動安裝 Kafka SDK 子產品,并且建立函數也會使用另一種方式。
1)Funcraft
Funcraft 是一個用于支援 Serverless 應用部署的指令行工具,能幫助我們便捷地管理函數計算、API 網關、日志服務等資源。它通過一個資源配置檔案(template.yml),協助我們進行開發、建構、部署操作。
是以第二個函數我們需要使用 Fun 來進行操作,整個操作分為四個步驟:
- 安裝 Fun 工具。
- 編寫 template.yml 模闆檔案,用來描述函數。
- 安裝我們需要的第三方依賴。
- 上傳部署函數。
2)安裝 Fun
Fun 提供了三種安裝方式:
- 通過 npm 包管理安裝 —— 适合所有平台(Windows/Mac/Linux)且已經預裝了 npm 的開發者。
- 通過下載下傳二進制安裝 —— 适合所有平台(Windows/Mac/Linux)。
- 通過 Homebrew 包管理器安裝 —— 适合 Mac 平台,更符合 MacOS 開發者習慣。
文本示例環境為 Mac,是以使用 npm 方式安裝,非常的簡單,一行指令搞定:
sudo npm install @alicloud/fun -g
安裝完成之後。在控制終端輸入 fun 指令可以檢視版本資訊:
$ fun --version
3.6.20
在第一次使用 fun 之前需要先執行 fun config 指令進行配置,按照提示,依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以從函數計算控制台首頁的右上方獲得:
fun config
? Aliyun Account ID *01
? Aliyun Access Key ID *qef6j
? Aliyun Access Key Secret *UFJG
? Default region name cn-hangzhou
? The timeout in seconds for each SDK client invoking 60
? The maximum number of retries for each SDK client 3
3)編寫 template.yml
建立一個目錄,在該目錄下建立一個名為 template.yml 的 YAML 檔案,該檔案主要描述要建立的函數的各項配置,說白了就是将函數計算控制台上配置的那些配置資訊以 YAML 格式寫在檔案裡:
ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
FCBigDataDemo:
Type: 'Aliyun::Serverless::Service'
Properties:
Description: 'local invoke demo'
VpcConfig:
VpcId: 'vpc-xxxxxxxxxxx'
VSwitchIds: [ 'vsw-xxxxxxxxxx' ]
SecurityGroupId: 'sg-xxxxxxxxx'
LogConfig:
Project: fcdemo
Logstore: fc_demo_store
dataToKafka:
Type: 'Aliyun::Serverless::Function'
Properties:
Initializer: index.my_initializer
Handler: index.handler
CodeUri: './'
Description: ''
Runtime: python3
我們來解析以上檔案的核心内容:
- FCBigDataDemo:自定義的服務名稱。通過下面的 Type 屬性标明是服務,即 Aliyun::Serverless::Service。
- Properties:Properties 下的屬性都是該服務的各配置項。
- VpcConfig:服務的 VPC 配置,包含:
- VpcId:VPC ID。
- VSwitchIds:交換機 ID,這裡是數組,可以配置多個交換機。
- SecurityGroupId:安全組 ID。
- LogConfig:服務綁定的日志服務(SLS)配置,包含:
- Project:日志服務項目。
- Logstore:LogStore 名稱。
- dataToKafka:該服務下自定義的函數名稱。通過下面的 Type 屬性标明是函數,即 Aliyun::Serverless::Function。
- Properties:Properties下的屬性都是該函數的各配置項。
- Initializer:配置初始化函數。
- Handler:配置入口函數。
- Runtime:函數運作環境。
目錄結構為:
4)安裝第三方依賴
服務和函數的模闆建立好之後,我們來安裝需要使用的第三方依賴。在這個示例的場景中,第二個函數需要使用 Kafka SDK,是以可以通過 fun 工具結合 Python 包管理工具 pip 進行安裝:
fun install --runtime python3 --package-type pip kafka-python
執行指令後有如下提示資訊:
此時我們會發現在目錄下會生成一個.fun檔案夾 ,我們安裝的依賴包就在該目錄下:
5)部署函數
現在編寫好了模闆檔案以及安裝好了我們需要的 Kafka SDK 後,還需要添加我們的代碼檔案 index.py,代碼内容如下:
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
from kafka import KafkaProducer
producer = None
def my_initializer(context):
logger = logging.getLogger()
logger.info("init kafka producer")
global producer
producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
def handler(event, context):
logger = logging.getLogger()
# 接收回傳的資料
event_str = json.loads(event)
event_obj = json.loads(event_str)
logger.info(event_obj["action"])
logger.info(event_obj["articleAuthorId"])
# 向Kafka發送消息
global producer
producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
producer.close()
return 'hello world'
代碼很簡單,這裡做以簡單的解析:
- my_initializer:函數執行個體被拉起時會先執行該函數,然後再執行 handler 函數 ,當函數執行個體在運作時,之後的請求都不會執行 my_initializer 函數 。一般用于各種連接配接的初始化工作,這裡将初始化 Kafka Producer 的方法放在了這裡,避免反複初始化 Produer。
- handler:該函數隻有兩個邏輯,接收回傳的資料和将資料發送至 Kafka 的指定 Topic。
- 下面通過 fun deploy 指令部署函數,該指令會做兩件事:
- 根據 template.yml 中的配置建立服務和函數。
- 将 index.py 和 .fun 上傳至函數中。
登入函數計算控制台,可以看到通過 fun 指令部署的服務和函數:
進入函數,也可以清晰的看到第三方依賴包的目錄結構:
3. 函數之間調用
目前兩個函數都建立好了,下面的工作就是由第一個函數接收到資料後拉起第二個函數發送消息給 Kafka。我們隻需要對第一個函數做些許改動即可:
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
import fc2
HELLO_WORLD = b'Hello world!\n'
client = None
def my_initializer(context):
logger = logging.getLogger()
logger.info("init fc client")
global client
client = fc2.Client(
endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
accessKeyID="your_ak",
accessKeySecret="your_sk"
)
def handler(environ, start_response):
logger = logging.getLogger()
context = environ['fc.context']
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith('HTTP_'):
# process custom request headers
pass
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
# 接收回傳的資料
request_body = environ['wsgi.input'].read(request_body_size)
request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
request_body_obj = json.loads(request_body_str)
logger.info(request_body_obj["action"])
logger.info(request_body_obj["articleAuthorId"])
global client
client.invoke_function(
'FCBigDataDemo',
'dataToKafka',
payload=json.dumps(request_body_str),
headers = {'x-fc-invocation-type': 'Async'}
)
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [HELLO_WORLD]
如上面代碼所示,對第一個函數的代碼做了三個地方的改動:
- 導入函數計算的庫:import fc2
- 添加初始化方法,用于建立函數計算 Client:
def my_initializer(context):
logger = logging.getLogger()
logger.info("init fc client")
global client
client = fc2.Client(
endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
accessKeyID="your_ak",
accessKeySecret="your_sk"
)
這裡需要注意的時,當我們在代碼裡增加了初始化方法後,需要在函數配置中指定初始化方法的入口:
- 通過函數計算 Client 調用第二個函數
global client
client.invoke_function(
'FCBigDataDemo',
'dataToKafka',
payload=json.dumps(request_body_str),
headers = {'x-fc-invocation-type': 'Async'}
)
invoke_function 函數有四個參數:
- 第一個參數:調用函數所在的服務名稱。
- 第二個參數:調用函數的函數名稱。
- 第三個參數:向調用函數傳的資料。
- 第四個參數:調用第二個函數 Request Header 資訊。這裡主要通過 x-fc-invocation-type 這個 Key 來設定是同步調用還是異步調用。這裡設定 Async 為異步調用。
如此設定,我們便可以驗證通過第一個函數提供的 HTTP 接口發起請求→采集資料→調用第二個函數→将資料作為消息傳給 Kafka 這個流程了。
使用兩個函數的目的
到這裡有些同學可能會有疑問,為什麼需要兩個函數,而不在第一個函數裡直接向 Kafka 發送資料呢?我們先來看這張圖:
當我們使用異步調用函數時,在函數内部會預設先将請求的資料放入消息隊列進行第一道削峰填谷,然後每一個隊列在對應函數執行個體,通過函數執行個體的彈性拉起多個執行個體進行第二道削峰填谷。是以這也就是為什麼這個架構能穩定承載大并發請求的核心原因之一。
4. 配置 Kafka
在遊戲營運這個場景中,資料量是比較大的,是以對 Kafka 的性能要求也是比較高的,相比開源自建,使用雲上的 Kafka 省去很多的運維操作,比如:
- 我們不再需要再維護 Kafka 叢集的各個節點。
- 不需要關心主從節點資料同步問題。
- 可以快速、動态擴充 Kafka 叢集規格,動态增加 Topic,動态增加分區數。
- 完善的名額監控功能,消息查詢功能。
總的來說,就是一切 SLA 都有雲上兜底,我們隻需要關注在消息發送和消息消費即可。
是以我們可以打開 Kafka 開通界面,根據實際場景的需求一鍵開通 Kafka 執行個體,開通 Kafka 後登入控制台,在基本資訊中可以看到 Kafka 的接入點:
- 預設接入點:走 VPC 内網場景的接入點。
- SSL 接入點:走公網場景的接入點。
将預設接入點配置到函數計算的第二個函數中即可。
....
producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
....
然後點選左側控制台 Topic 管理,建立 Topic:
将建立好的 Topic 配置到函數計算的第二個函數中即可。
...
# 第一個參數為Topic名稱
producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
...
上文已經列舉過雲上 Kafka 的優勢,比如動态增加 Topic 的分區數,我們可以在 Topic 清單中,對 Topic 的分區數進行動态調整:
單 Topic 最大支援到 360 個分區,這是開源自建無法做到的。
接下來點選控制台左側 Consumer Group 管理,建立 Consumer Group:
至此,雲上的 Kafka 就算配置完畢了,即 Producer 可以往剛剛建立的 Topic 中發消息了,Consumer 可以設定剛剛建立的 GID 以及訂閱 Topic 進行消息接受和消費。
Flink Kafka 消費者
在這個場景中,Kafka 後面往往會跟着 Flink,是以這裡簡要給大家介紹一下在 Flink 中如何建立 Kafka Consumer 并消費資料。代碼片段如下:
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo");
String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092");
Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo");
FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps);
kafka.setStartFromLatest();
kafka.setCommitOffsetsOnCheckpoints(false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserBehaviorEvent> dataStreamByEventTime = env.addSource(kafka);
以上就是建構 Flink Kafka Consumer 和添加 Kafka Source 的代碼片段,還是非常簡單的。
壓測驗證
至此,整個資料采集的架構就搭建完畢了,下面我們通過壓測來檢驗一下整個架構的性能。這裡使用阿裡雲 PTS 來進行壓測。
建立壓測場景
打開 PTS 控制台,點選左側菜單建立壓測/建立 PTS 場景:
在場景配置中,将第一個函數計算函數暴露的 HTTP 接口作為串聯鍊路,配置如下圖所示:
接口配置完後,我們來配置施壓:
- 壓力模式:
- 并發模式:指定有多少并發使用者同時發請求。
- RPS模式:指定每秒有多少請求數。
- 遞增模式:在壓測過程中可以通過手動調節壓力,也可以自動按百分比遞增壓力。
- 最大并發:同時有多少個虛拟使用者發起請求。
- 遞增百分比:如果是自動遞增的話,按這裡的百分比遞增。
- 單量級持續時長:在未完全達到壓力全量的時候,每一級梯度的壓力保持的時長。
- 壓測總時長:一共需要壓測的時長。
這裡因為資源成本原因,并發使用者數設定為 2500 來進行驗證。
從上圖壓測中的情況來看,TPS 達到了 2w 的封頂,549w+ 的請求,99.99% 的請求是成功的,那 369 個異常也可以點選檢視,都是壓測工具請求逾時導緻的。
總結
至此,整個基于 Serverless 搭建的大資料采集傳輸的架構就搭建好了,并且進行了壓測驗證,整體的性能也是不錯的,并且整個架構搭建起來也非常簡單和容易了解。這個架構不光适用于遊戲營運行業,其實任何大資料采集傳輸的場景都是适用的,目前也已經有很多客戶正在基于 Serverless 的架構跑在生産環境,或者正走在改造 Serverless 架構的路上。
基于 Serverless 還有很多其他的應用場景,之後我會一一分享給大家,大家如果有任何疑問也可以加入釘釘群:35712134 來尋找答案,我們不見不散!
重磅下載下傳
由 23 位阿裡雲技術專家精心打造 222 頁“橙子書”《Serverless 入門與實戰》重磅上線,帶你由表及裡,由淺入深學習 Serverless,助你 2021 年“橙”風而行,心想事“橙”!
本書亮點
- 從架構演進開始,介紹 Serverless 架構及技術選型建構 Serverless 思維;
- 了解業界流行的 Serverless 架構運作原理;
- 掌握 10 大 Serverless 真實落地案例,活學活用。