
創作人:吳斌
審稿人:李捷
發揮 Elastic Stack 在日志和實時資料分析計算領域的一些優勢,對流媒體服務這樣規模較大、實時性要求偏高,且分析、業務探索流程要求靈活的業務是一個比較百搭的選擇。
資料邏輯架構如下
整體架構比較直覺簡單。我們省去了業務組建和存儲層可能會用到的其他引擎,把目光主要集中在 Elasticsearch 上。
日志采集
日志的收集在正式進入資料管道前,可以選擇落地或者直接吐到消息隊列。這裡采集的内容也主要分成 2 大部分
- CDN/網絡通路日志
- 業務打點資料
業務打點的資料可以根據需求采集,實時部分主要會聚焦在,例如卡頓等這樣的使用者體驗名額。網絡通路的日志通常比較通用,這裡我們也先給出一個例子,相信大家看上去會比較熟悉。
那麼根據這裡樣例的資料,Elasticsearch 可以輕松的利用内置的 processor 和聚合功能做快速的分析,後面我們會舉例說明。
{
"receiveTimestamp": "2021-04-28T14:30:17.90993285Z",
"spanId": "blahblah",
"trace": "blah/f5c7578feaf277dd9a8d96",
"@timestamp": "2021-04-28T14:30:17.549287Z",
"logName": "logs/requests",
"jsonPayload": {
"@type": "loadbalancing.type.LoadBalancerLogEntry",
"latencySeconds": "0.001749s",
"statusDetails": "response_from_cache",
"cacheIdCityCode": "ABC",
"cacheId": "ABC-abcabc123"
},
"httpRequest": {
"remoteIp": "10.0.0.1",
"remoteIpIsp": {
"ip": "10.0.0.2",
"organization_name": "China Telecom",
"asn": 8346,
"network": "10.0.0.0/15"
},
"requestMethod": "GET",
"responseSize": "125621",
"userAgent": "Mozilla/5.0 (Linux; Android 10) Bindiego/7.1-1840",
"frontendSrtt": 0.124,
"cacheLookup": true,
"geo": {
"continent_name": "Asia",
"country_iso_code": "CN",
"country_name": "China",
"location": {
"lon": 123,
"lat": 321
}
},
"backendLatency": 0.001749,
"requestUrl": "http://bindiego.com/vid/bindigo.m4s",
"requestDomain": "bindiego.com",
"cacheHit": true,
"requestSize": "671",
"requestProtocol": "http",
"user_agent": {
"original": "Mozilla/5.0 (Linux; Android 10) Bindiego/7.1-1840",
"os": {
"name": "Android",
"version": "10",
"full": "Android 10"
},
"name": "Android",
"device": {
"name": "Generic Smartphone"
},
"version": "10"
},
"status": 200,
"resourceType": "m4s"
}
}
這裡就是最終導入到 Elasticsearch 裡可分析的網絡性能資料。針對這個資料,我們分别對它經過的管道和處理做一個簡單快速的剖析。
消息隊列
日志采集器會直接把資料打到消息隊列,這裡主要起到一個抗反壓緩沖的作用。有些隊列有很多附加的功能,例如存儲和視窗計算,這裡我們隻使用最單純的功能。因為後面我們選取了分布式計算引擎來做這這部分。
分布式計算引擎
分布式計算引擎,其實在整體實時資料分析業務裡,扮演的着實是非常重要的角色。例如實時名額的視窗計算,遲報資料的修正等等。但在我們這個簡單的場景下為了後面在 Elasticsearch 内更友善快捷的分析、過濾資料。
我們這裡主要做了 ETL 和補全。例如把請求資源的域和資源類型提取出來,還有 CDN 緩存節點的區域代碼等等。但是例如 IP 位址地理位置、使用者裝置類型和營運商(ISP)的反查,友善起見,我們利用了 Elasticsearch Ingest 節點預置的 Pipeline 去做。
這裡要注意的就是,如果你的叢集配置是全角色的節點,會對資料節點的性能有影響。建議使用獨立的 ingest node 去做,且如果是在 K8S 上部署的話,還可以彈性擴容這組 nodeSet。
下面是 Ingest 節點配置舉例
完整代碼戳這裡: https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/index-gclb-pipeline.json
{
"description": "IP & user agent lookup",
"processors": [
{
"user_agent" : {
"field" : "httpRequest.userAgent",
"target_field" : "httpRequest.user_agent",
"ignore_missing": true
}
},
{
"geoip" : {
"field" : "httpRequest.remoteIp",
"target_field" : "httpRequest.geo",
"ignore_missing": true
}
},
{
"geoip" : {
"field" : "httpRequest.remoteIp",
"target_field" : "httpRequest.remoteIpIsp",
"database_file" : "GeoLite2-ASN.mmdb",
"ignore_missing": true
}
}
]
}
資料安全
資料安全這塊順帶提一下,現在 Elasticsearch 的認證、授權都可以在 Basic License 裡使用了,非常友善。這裡簡單提一下通訊這塊,很多小夥伴用的是自簽的證書。這個問題不大,經常被問到在使用 RestClient 開發的時候如何繞過去(例如在寫計算引擎最後入庫的時候)。其實方法也很簡單,這裡就給大家上個代碼片段說明看下.
配置: https://github.com/elasticsearch-cn/elastic-on-gke#option-2-regional-tcp-lb 完整代碼: https://github.com/cloudymoma/raycom/blob/streaming/src/main/java/bindiego/io/ElasticsearchIO.java#L273-L296
try {
SSLContext context = SSLContext.getInstance("TLS");
context.init(null, new TrustManager[] {
new X509TrustManager() {
public void checkClientTrusted(X509Certificate[] chain, String authType) {}
public void checkServerTrusted(X509Certificate[] chain, String authType) {}
public X509Certificate[] getAcceptedIssuers() { return null; }
}
}, null);
httpAsyncClientBuilder.setSSLContext(context)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
} catch (NoSuchAlgorithmException ex) {
logger.error("Error when setup dummy SSLContext", ex);
} catch (KeyManagementException ex) {
logger.error("Error when setup dummy SSLContext", ex);
} catch (Exception ex) {
logger.error("Error when setup dummy SSLContext", ex);
}
索引管理
索引生命周期管理 Elasticsearch 也提供了非常便利的工具。
生命周期配置,這裡應該根據業務需求和節點規模綜合考量
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "20GB",
"max_docs": 20000000,
"max_age": "7d"
}
}
},
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
}
索引模版
完整版: https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/index-gclb-template.json
模版為每次生成的索引應用相同的配置,且指定了生命周期的政策檔案和注入别名。
{
"index_patterns": [
"bindiego*"
],
"order": 999,
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1,
"final_pipeline": "bindiego",
"index.lifecycle.name": "bindiego-policy",
"index.lifecycle.rollover_alias": "bindiego-ingest"
},
"mappings": {
最後我們配置了腳本一次性把上述配置應用,且在 Kibana 裡為我們建立好查詢的 index pattern
詳細戳這裡: https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/init.sh
資料面闆
這裡雖然是個人弱項,但是借助 Kibana 強大的可視化功能,可以根據第一部分整理出來的資料繪制實時面闆。
完整可複用面闆: https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/gclb_dashboard.ndjson 部分截圖: https://github.com/cloudymoma/raycom/tree/gcp-lb-log#dashboards-in-kibana
下面我舉一些可能常被忽視的好用功能給大家打個樣。
- IP 反查出的 Geo 和 ISP 資訊
通過這些資訊,可以快速反映出各個營運商網絡的情況,甚至一些盜鍊的線索初判斷。
- Vega 在 Kibana 裡繪制資料
當我們覺得 Kibana 自身圖表不夠豐富的時候,可以借助 Vega。上面這個圖就展示了來自不同地區的使用者,分别命中 CDN 緩存點的流量配置設定。資料通過用 Elasticsearch 的 Composite Aggregation 提取。
- Kibana TSVB
這個是我個人最喜歡的繪圖方法了,可以非常靈活的對名額進行計算。下面這兩個圖表就展示過濾出直播業務的緩存命中、請求傳回和緩存填充的資料量這些資訊。
總結
由于業務資料的敏感性,這裡就不列舉細節了。但資料管道和治理,都依舊遵循同樣的原則。整體資料管道的選型也非常靈活,采集部分即可以是 Beats 生态中的産品,也可以是自己開發的 agent。隊列常用的有 Kafka 或者雲上托管服務。分布式計算層因為業務比較簡單,我比較推薦使用 Apache Beam,這樣執行引擎可以在比如 Flink、Spark Streaming 和任何 Beam 支援的平台上相對靈活的切換。
今天我們給出的案例是一個非常簡單,且可以快速複用的開源項目。
大家有任何需求和疑問也歡迎到社群一起交流、學習。
創作人簡介:
吳斌,Elastic 中文社群副主席,現就職于大型網際網路公司任職雲架構師。專注于海量數
據處理、挖掘、分析和企業級搜尋領域。十分熟悉分布式應用,高可用架構和自動化技
術。曾在海外世界百強大學計算機學院任教 6 年。更是一位開源軟體社群的積極貢獻者
群組織者。
部落格:
https://gist.github.com/bindiego