在微服務體系架構下,搜狐智能媒體使用 Zipkin 進行服務鍊路追蹤(Tracing)的埋點采集,将采集的 Trace 資訊存儲到 StarRocks 中。通過 StarRocks 強大的 SQL 計算能力,對 Tracing 資訊進行多元度的統計、分析等操作,提升了微服務監控能力,從簡單統計的 Monitoring 上升到更多元度探索分析的 Observability。
全文主要分為三個部分:第一節主要介紹微服務下的常用監控方式,其中鍊路追蹤技術,可以串聯整個服務調用鍊路,獲得整體服務的關鍵資訊,對微服務的監控有非常重要的意義。第二節主要介紹搜狐智能媒體是如何建構鍊路追蹤分析體系的,主要包括 Zipkin 的資料采集,StarRocks 的資料存儲,以及根據應用場景對 StarRocks 進行分析計算等三個部分。第三節主要介紹搜狐智能媒體通過引入 Zipkin 和 StarRocks 進行鍊路追蹤分析取得的一些實踐效果。
01 微服務架構中的鍊路追蹤
近年來,企業 IT 應用架構逐漸向微服務、雲原生等分布式應用架構演進,在搜狐智能媒體内部,應用服務按照微服務、Docker、Kubernetes、Spring Cloud 等架構思想和技術方案進行研發運維, 提升部門整體工程效率 。
微服務架構提升工程效率的同時,也帶來了一些新的問題。微服務是一個分布式架構,它按業務劃分服務單元,使用者的每次請求不再是由某一個服務獨立完成了,而是變成了多個服務一起配合完成。這些服務可能是由不同的團隊、使用不同的程式設計語言實作,可能布在了不同的伺服器、甚至不同的資料中心。如果使用者請求出現了錯誤和異常,微服務分布式調用的特性決定了這些 故障難以定位 ,相對于傳統的單體架構,微服務監控面臨着新的難題。
Logging、Metrics、Tracing
微服務監控可以包含很多方式,按照監測的資料類型主要劃分為 Logging、Metrics 和Tracing 三大領域:
Logging
使用者主動記錄的離散事件,記錄的資訊一般是非結構化的文本内容,在使用者進行問題分析判斷時可以提供更為詳盡的線索。
具有聚合屬性的采集資料,旨在為使用者展示某個名額在某個時段的運作狀态,用于檢視一些名額和趨勢。
Tracing
記錄一次請求調用的生命周期全過程,其中包括服務調用和處理時長等資訊,含有請求上下文環境,由一個全局唯一的 Trace ID 來進行辨別和串聯整個調用鍊路,非常适合微服務架構的監控場景。
圖 1
三者的關系如上圖所示,這三者之間也是有重疊的,比如 Logging 可以聚合相關字段生成 Metrics 資訊,關聯相關字段生成 Tracing 資訊;Tracing 可以聚合查詢次數生成 Metrics 資訊,可以記錄業務日志生成 Logging 資訊。一般情況下要在 Metrics 和 Logging 中增加字段串聯微服務請求調用生命周期比較困難,通過 Tracing 擷取 Metrics 和 Logging 則相對容易很多。
另外,這三者對存儲資源有着不同的需求,Metrics 是天然的壓縮資料,最節省資源;Logging 傾向于無限增加的,甚至會超出預期的容量;Tracing 的存儲容量,一般介于 Metrics 和 Logging 兩者之間,另外還可通過采樣率進一步控制容量需求。
從 Monitoring 到 Observability
Monitoring tells you whether the system works. Observability lets you ask why it's not working.
– Baron Schwarz
微服務監控從資料分析層次,可以簡單分為 Monitoring 和 Observability。
Monitoring
告訴你系統是否在工作,對已知場景的預定義計算,對各種監控問題的事前假設。對應上圖 Known Knowns 和 Known Unknowns,都是事先假設可能會發生的事件,包括已經明白和不明白的事件。
Observability
可以讓你詢問系統為什麼不工作,對未知場景的探索式分析,對任意監控問題的事後分析。對應上圖 Unknown Knowns 和 Unknown Unknowns,都是事未察覺可能會發生的事件,包括已經明白和不明白的事件。
很顯然,通過預先假設所有可能發生事件進行 Monitoring 的方式,已經不能滿足微服務複雜的監控場景,我們需要能夠提供探索式分析的 Observability 監控方式。在 Logging、Metrics 和 Tracing,Tracing 是目前能提供多元度監控分析能力的最有效方式。
Tracing
鍊路追蹤 Tracing Analysis 為分布式應用的開發者提供了完整的調用鍊路還原、調用請求量統計、鍊路拓撲、應用依賴分析等工具,可以幫助開發者快速分析和診斷分布式應用架構下的性能瓶頸,提高微服務時代下的開發診斷效率。
Tracing 可以串聯微服務中分布式請求的調用鍊路,在微服務監控體系中有着重要的作用。另外,Tracing 介于 Metrics 和 Logging 之間,既可以完成 Monitoring 的工作,也可以進行 Observability 的分析,提升監控體系建設效率。
系統模型
鍊路追蹤(Tracing)系統,需要記錄一次特定請求經過的上下遊服務調用鍊路,以及各服務所完成的相關工作資訊。
如下圖所示的微服務系統,使用者向服務 A 發起一個請求,服務 A 會生成一個全局唯一的 Trace ID,服務 A 内部 Messaging 方式調用相關處理子產品(比如跨線程異步調用等),服務 A 子產品再通過 RPC 方式并行調用服務 B 和服務 C;服務 B 會即刻傳回響應,但服務 C 會采用串行方式,先用 RPC 調用服務 D,再用 RPC 調用服務 E,然後再響應服務 A 的調用請求;服務 A 在内部兩個子產品調用處理完後,會響應最初的使用者請求。
最開始生成的 Trace ID 會在這一系列的服務内部或服務之間的請求調用中傳遞,進而将這些請求調用連接配接起來。另外,Tracing 系統還會記錄每一個請求調用處理的 Timestamp、服務名等等相關資訊。
圖 3(注:服務内部串行調用對系統性能有影響,一般采用并行調用方式,後續章節将隻考慮并行調用場景。)
在 Tracing 系統中,主要包含 Trace 和 Span 兩個基礎概念,下圖展示了一個由 Span 構成的 Trace。
圖 4
Trace 指一個外部請求經過的所有服務的調用鍊路,可以了解為一個有服務調用組成的樹狀結構,每條鍊路都有一個全局唯一的 ID 來辨別。
Span 指服務内部或服務之間的一次調用,即 Trace 樹中的節點,如下圖所示的由 Span 構成的 Trace 樹,樹中的 Span 節點之間存在父子關系。Span 主要包含 Span名稱、Span ID、父 ID,以及 Timestamp、Dration(包含子節點調用處理的 duration)、業務資料等其他 log 資訊。
Span 根據調用方式可以分為 RPC Span 和 Messaging Span:
RPC Span
由 RPC Tracing 生成,分為 Client 和 Server 兩類 Span,分别由 RPC 服務調用的 Client 節點和 Server 節點記錄生成,兩者共享 Span ID、Parent Span ID 等資訊,但要注意,這兩個 Span 記錄的時間是有偏差,這個偏差是服務間的調用開銷,一般是由網絡傳輸開銷、代理服務或服務接口消息排隊等情況引起的。
Messaging Span
由 Messaging Tracing 生成,一般用于 Tracing 服務内部調用,不同于 RPC Span,Messaging Span 之間不會共享 Span ID 等資訊。
應用場景
根據 Tracing 的系統模型,可獲得服務響應等各類 Metric 資訊,用于 Alerting、DashBoard 查詢等;也可根據 Span 組成的鍊路,分析單個或整體服務情況,發現服務性能瓶頸、網絡傳輸開銷、服務内異步調用設計等各種問題。如下圖所示,相比于 Metrics 和 Logging,Tracing 可以同時涵蓋監控的 Monitoring 和 Observability 場景,在監控體系中占據重要位置,Opentracing、Opencensus、Opentelemetry 等協會群組織都包含對 Tracing 的支援。
圖 5
從微服務的角度,Tracing 記錄的 Span 資訊可以進行各種次元的統計和分析。下圖基于 HTTP API 設計的微服務系統為例,使用者查詢 Service1的 /1/api 接口,Service1 再請求 Service2 的 /2/api,Service2 内部異步并發調用 msg2.1 和 msg2.2,msg2.1 請求 Service3的 /3/api接口,msg2.2 請求 Service4 的 /4/api接口,Service3 内部調用 msg3,Service4 再請求 Service5 的 /5/api,其中 Service5 沒有進行 Tracing 埋點,無法采集 Service5 的資訊。
圖 6
針對上圖的微服務系統,可以進行如下兩大類的統計分析操作:
服務内分析
關注單個服務運作情況,比如對外服務接口和上遊接口查詢的性能名額等,分析場景主要有:
1、上遊服務請求
如 Service1 提供的 /1/api ,Service4 提供的 /4/api等,統計獲得次數、QPS、耗時百分位數、出錯率、逾時率等等 metric 資訊。
2、下遊服務響應
如 Service1 請求的 /2/api 、Service4 請求的 /5/api等,統計查詢次數、QPS、耗時百分位數、出錯率、逾時率等等 Metric 資訊。
3、服務内部處理
服務對外接口在内部可能會被分拆為多個 Span,可以按照 Span Name 進行分組聚合統計,發現耗時最長的 Span 等,如 Service2 接口 /2/api ,接口服務内部 Span 包括 /2/api 的 Server Span,call2.1 對應的 Span 和 call2.2 對應的 Span,通過 Span 之間的依賴關系可以算出這些 Span 自身的耗時 Duraion,進行各類統計分析。
服務間分析
在進行微服務整體分析時,我們将單個服務看作黑盒,關注服務間的依賴、調用鍊路上的服務熱點等,分析場景主要有:
1、服務拓撲統計
可以根據服務間調用的 Client Span 和 Server Span,獲得整個服務系統的拓撲結構,以及服務之間調用請求次數、Duration 等統計資訊。
2、調用鍊路性能瓶頸分析
分析某個對外請求接口的調用鍊路上的性能瓶頸,這個瓶頸可能是某個服務内部處理開銷造成的,也可能是某兩個服務間的網絡調用開銷等等原因造成的。
對于一次調用涉及到數十個以上微服務的複雜調用請求,每次出現的性能瓶頸很可能都會不一樣,此時就需要進行聚合統計,算出性能瓶頸出現頻次的排名,分析出針對性能瓶頸熱點的服務或服務間調用。
以上僅僅是列舉的部分分析場景,Tracing 提供的資訊其實可以支援更多的 Metric 統計和探索式分析場景,本文不再一一例舉。
02 基于 Zipkin 和 StarRocks 建構鍊路追蹤分析系統
鍊路追蹤系統主要分為資料采集、資料存儲和分析計算三大部分,目前使用最廣泛的開源鍊路追蹤系統是 Zipkin,它主要包括資料采集和分析計算兩大部分,底層的存儲依賴其他存儲系統。搜狐智能媒體在建構鍊路追蹤系統時,最初采用 Zipkin + ElasticSearch 得方式進行建構,後增加 StarRocks 作為底層存儲系統,并基于 StarRocks 進行分析統計,系統總體架構如下圖。
圖 7
資料采集
Zipkin 支援用戶端全自動埋點,隻需将相關庫引入應用程式中并簡單配置,就可以實作 Span 資訊自動生成,Span 資訊通過 HTTP 或 Kafka 等方式自動進行上傳。Zipkin 目前提供了絕大部分語言的埋點采集庫,如 Java 語言的 Spring Cloud 提供了 Sleuth 與 Zipkin 進行深度綁定,對開發人員基本做到透明使用。為了解決存儲空間,在使用時一般要設定 1/100 左右的采樣率,Dapper 的論文中提到即便是 1/1000 的采樣率,對于跟蹤資料的通用使用層面上,也可以提供足夠多的資訊。
資料模型
對應 圖 6 ,下面給出了 Zipkin Span 埋點采集示意圖 (圖 8),具體流程如下:
圖 8
- 使用者發送給 Service1 的 Request 中,不含有 Trace 和 Span 資訊,Service1 會建立一個 Server Span,随機生成全局唯一的 TraceID(如圖中的 X)和 SpanId(如圖中的 A,此處的 X 和 A 會使用相同的值),記錄 Timestamp 等資訊;Service1 在給使用者傳回 Response 時,Service1 會統計 Server Span 的處理耗時 Duration,會将包含 TraceID、SpanID、Timestamp、Duration 等資訊的 Server Span 完整資訊進行上報。
- Service1 向 Service2 發送的請求,會建立一個 Client Span,使用 X 作為 Trace ID,随機生成全局唯一的 SpanID(如圖中的 B),記錄 Timestamp 等資訊,同時 Service1 會将 Trace ID(X)和 SpanID(B)傳遞給 Service2(如在 HTTP 協定的 HEADER 中添加 TraceID 和 SpanID 等相關字段);Service1 在收到 Service2 的響應後,Service1 會處理 Client Span 相關資訊,并将 Client Span 進行上報
- Service2 收到 Service1 的 Request 中,包含 Trace(X)和 Span(B)等資訊,Service2 會建立一個 Server Span,使用 X 作為 Trace ID,B 作為 SpanID,内部調用msg2.1 和 msg2.2 同時,将 Trace ID(X)和 SpanID(B)傳遞給它們;Service2 在收到 msg2.1 和 msg2.2 的傳回後,Service1 會處理 Server Span 相關資訊,并将此 Server Span 進行上報
- Service2 的 msg2.1 和 msg2.2 會分别建立一個 Messaging Span,使用 X 作為 Trace ID,随機生成全局唯一的 SpanID(如圖中的 C 和 F),記錄 Timestamp 等資訊,分别向 Service3 和 Service4 發送請求;msg2.1 和 msg2.2 收到響應後,會分别處理 Messaging Span 相關資訊,并将兩個 Messaging Span 進行上報
- Service2 向 Service3 和 Service4 發送的請求,會各建立一個 Client Span,使用 X 作為 Trace ID,随機生成全局唯一的 SpanID(如圖中的 D 和 G),記錄 Timestamp 等資訊,同時 Service2 會将 Trace ID(X)和 SpanID(D 或 G)傳遞給 Service3 和 Service4;Service12 在收到 Service3 和 Service3 的響應後,Service2 會分别處理 Client Span 相關資訊,并将兩個 Client Span 進行上報
- Service3 收到 Service2 的Request中,包含 Trace(X)和Span(D)等資訊,Service3 會建立一個 Server Span,使用 X 作為 Trace ID,D 作為 SpanID,内部調用 msg3;Service3 在收到 msg3 的傳回後,Service3 會處理此 Server Span 相關資訊,并将此 Server Span 進行上報
- Service3 的 msg3 會分别建立一個 Messaging Span,使用 X 作為 Trace ID,随機生成全局唯一的 SpanID(如圖中的 E),記錄 Timestamp 等資訊,msg3 處理完成後,處理此 Messaging Span 相關資訊,并将此 Messaging Span 進行上報
- Service4 收到 Service2 的 Request 中,包含 Trace(X)和 Span(G)等資訊,Service4 會建立一個 Server Span,使用 X 作為 Trace ID,G 作為 SpanID,再向 Service5 發送請求;Service4 在收到 Service5 的響應後,Service4 會處理此 Server Span 相關資訊,并将此 Server Span 進行上報
- Service4 向 Service5 發送的請求,會建立一個 Client Span,使用 X 作為 Trace ID,随機生成全局唯一的 SpanID(如圖中的 H),記錄 Timestamp 等資訊,同時 Service4 會将 Trace ID(X)和 SpanID(H)傳遞給 Service5;Service4 在收到 Service5 的響應後,Service4 會處理 Client Span 相關資訊,并将此 Client Span 進行上報
上面整個 Trace X 調用鍊路會生成的 Span 記錄如下圖,每個 Span 主要會記錄 Span Id、Parent Id、Kind(CLIENT 表示 RPC CLIENT 端 Span,SERVER 表示 RPC SERVER 端 SPAN,NULL 表示 Messaging SPAN),SN(Service Name),還會包含 Trace ID,時間戳、Duration 等資訊。Service5 沒有進行 Zipkin 埋點采集,是以不會有 Service5 的 Span 記錄。
圖 9
資料格式
設定了 Zipkin 埋點的應用服務,預設會使用 Json 格式向 Kafka 上報 Span 資訊,上報的資訊主要有如下幾個注意點:
每個應用服務每次會上報一組 Span,組成一個 Json 數組上報
Json 數組裡包含不同 Trace的Span,即不是所有的 Trace ID都 相同
不同形式的接口(如 Http、Grpc、Dubbo 等),除了主要字段相同外,在 tags 中會各自記錄一些不同的字段
[
{
"traceId": "3112dd04c3112036",
"id": "3112dd04c3112036",
"kind": "SERVER",
"name": "get /2/api",
"timestamp": 1618480662355011,
"duration": 12769,
"localEndpoint": {
"serviceName": "SERVICE2",
"ipv4": "172.24.132.32"
},
"remoteEndpoint": {
"ipv4": "111.25.140.166",
"port": 50214
},
"tags": {
"http.method": "GET",
"http.path": "/2/api",
"mvc.controller.class": "Controller",
"mvc.controller.method": "get2Api"
}
},
{
"traceId": "3112dd04c3112036",
"parentId": "3112dd04c3112036",
"id": "b4bd9859c690160a",
"name": "msg2.1",
"timestamp": 1618480662357211,
"duration": 11069,
"localEndpoint": {
"serviceName": "SERVICE2"
},
"tags": {
"class": "MSG",
"method": "msg2.1"
}
},
{
"traceId": "3112dd04c3112036",
"parentId": "3112dd04c3112036",
"id": "c31d9859c69a2b21",
"name": "msg2.2",
"timestamp": 1618480662357201,
"duration": 10768,
"localEndpoint": {
"serviceName": "SERVICE2"
},
"tags": {
"class": "MSG",
"method": "msg2.2"
}
},
{
"traceId": "3112dd04c3112036",
"parentId": "b4bd9859c690160a",
"id": "f1659c981c0f4744",
"kind": "CLIENT",
"name": "get /3/api",
"timestamp": 1618480662358201,
"duration": 9206,
"localEndpoint": {
"serviceName": "SERVICE2",
"ipv4": "172.24.132.32"
},
"tags": {
"http.method": "GET",
"http.path": "/3/api"
}
},
{
"traceId": "3112dd04c3112036",
"parentId": "c31d9859c69a2b21",
"id": "73cd1cab1d72a971",
"kind": "CLIENT",
"name": "get /4/api",
"timestamp": 1618480662358211,
"duration": 9349,
"localEndpoint": {
"serviceName": "SERVICE2",
"ipv4": "172.24.132.32"
},
"tags": {
"http.method": "GET",
"http.path": "/4/api"
}
}
]
圖 10
資料存儲
Zipkin 支援 MySQL、Cassandra 和 ElasticSearch 三種資料存儲,這三者都存在各自的缺點:
- MySQL:采集的 Tracing 資訊基本都在每天上億行甚至百億行以上,MySQL 無法支撐這麼大資料量。
- Cassandra:能支援對單個 Trace 的 Span 資訊分析,但對聚合查詢等資料統計分析場景支援不好
- ElasticSearch:能支援單個 Trace 的分析和簡單的聚合查詢分析,但對于一些較複雜的資料分析計算不能很好的支援,比如涉及到 Join、視窗函數等等的計算需求,尤其是任務間依賴計算,Zipkin 目前還不能實時計算,需要通過離線跑 Spark 任務計算任務間依賴資訊。
我們在實踐中也是首先使用 ElasticSearch,發現了上面提到的問題,比如 Zipkin 的服務依賴拓撲必須使用離線方式計算,便新增了 StarRocks 作為底層資料存儲。将 Zipkin 的 trace 資料導入到StarRocks很友善,基本步驟隻需要兩步,CREATE TABLE + CREATE ROUTINE LOAD。
另外,在調用鍊路性能瓶頸分析場景中,要将單個服務看作黑盒,隻關注 RPC SPAN,屏蔽掉服務内部的 Messaging Span,使用了 Flink 對服務内部 span 進行 ParentID 溯源,即從 RPC Client SPAN,一直追溯到同一服務同一 Trace ID 的 RPC Server SPAN,用 RPC Server SPAN 的 ID 替換 RPC Client SPAN 的parentId,最後通過Flink-Connector-StarRocks将轉換後的資料實時寫入StarRocks。
基于 StarRocks 的資料存儲架構流程如下圖所示。
圖 11
CREATE TABLE
建表語句示例參考如下,有如下幾點注意點:
- 包括 Zipkin 和 zipkin_trace_perf 兩張表,zipkin_trace_perf 表隻用于調用鍊路性能瓶頸分析場景,其他統計分析都适用 Zipkin 表
- 通過采集資訊中的 Timestamp 字段,生成 dt、hr、min 時間字段,便于後續統計分析
- 采用 DUPLICATE 模型、Bitmap 索引等設定,加快查詢速度
- Zipkin 表使用id作為分桶字段,在查詢服務拓撲時,查詢計劃會優化為 Colocate Join,提升查詢性能。
Zipkin
CREATE TABLE `zipkin` (
`traceId` varchar(24) NULL COMMENT "",
`id` varchar(24) NULL COMMENT "Span ID",
`localEndpoint_serviceName` varchar(512) NULL COMMENT "",
`dt` int(11) NULL COMMENT "",
`parentId` varchar(24) NULL COMMENT "",
`timestamp` bigint(20) NULL COMMENT "",
`hr` int(11) NULL COMMENT "",
`min` bigint(20) NULL COMMENT "",
`kind` varchar(16) NULL COMMENT "",
`duration` int(11) NULL COMMENT "",
`name` varchar(300) NULL COMMENT "",
`localEndpoint_ipv4` varchar(16) NULL COMMENT "",
`remoteEndpoint_ipv4` varchar(16) NULL COMMENT "",
`remoteEndpoint_port` varchar(16) NULL COMMENT "",
`shared` int(11) NULL COMMENT "",
`tag_error` int(11) NULL DEFAULT "0" COMMENT "",
`error_msg` varchar(1024) NULL COMMENT "",
`tags_http_path` varchar(2048) NULL COMMENT "",
`tags_http_method` varchar(1024) NULL COMMENT "",
`tags_controller_class` varchar(100) NULL COMMENT "",
`tags_controller_method` varchar(1024) NULL COMMENT "",
INDEX service_name_idx (`localEndpoint_serviceName`) USING BITMAP COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`traceId`, `parentId`, `id`, `timestamp`, `localEndpoint_serviceName`, `dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p20220104 VALUES [("20220104"), ("20220105")),
PARTITION p20220105 VALUES [("20220105"), ("20220106")))
DISTRIBUTED BY HASH(`id`) BUCKETS 100
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "100",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
zipkin_trace_perf
CREATE TABLE `zipkin_trace_perf` (
`traceId` varchar(24) NULL COMMENT "",
`id` varchar(24) NULL COMMENT "",
`dt` int(11) NULL COMMENT "",
`parentId` varchar(24) NULL COMMENT "",
`localEndpoint_serviceName` varchar(512) NULL COMMENT "",
`timestamp` bigint(20) NULL COMMENT "",
`hr` int(11) NULL COMMENT "",
`min` bigint(20) NULL COMMENT "",
`kind` varchar(16) NULL COMMENT "",
`duration` int(11) NULL COMMENT "",
`name` varchar(300) NULL COMMENT "",
`tag_error` int(11) NULL DEFAULT "0" COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`traceId`, `id`, `dt`, `parentId`, `localEndpoint_serviceName`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p20220104 VALUES [("20220104"), ("20220105")),
PARTITION p20220105 VALUES [("20220105"), ("20220106")))
DISTRIBUTED BY HASH(`traceId`) BUCKETS 32
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-60",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "12",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
ROUTINE LOAD
ROUTINE LOAD 建立語句示例如下:
CREATE ROUTINE LOAD zipkin_routine_load ON zipkin COLUMNS(
id,
kind,
localEndpoint_serviceName,
traceId,
`name`,
`timestamp`,
`duration`,
`localEndpoint_ipv4`,
`remoteEndpoint_ipv4`,
`remoteEndpoint_port`,
`shared`,
`parentId`,
`tags_http_path`,
`tags_http_method`,
`tags_controller_class`,
`tags_controller_method`,
tmp_tag_error,
tag_error = if(`tmp_tag_error` IS NULL, 0, 1),
error_msg = tmp_tag_error,
dt = from_unixtime(`timestamp` / 1000000, '%Y%m%d'),
hr = from_unixtime(`timestamp` / 1000000, '%H'),
`min` = from_unixtime(`timestamp` / 1000000, '%i')
) PROPERTIES (
"desired_concurrent_number" = "3",
"max_batch_interval" = "50",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"max_error_number" = "1000000",
"strict_mode" = "false",
"format" = "json",
"strip_outer_array" = "true",
"jsonpaths" = "[\"$.id\",\"$.kind\",\"$.localEndpoint.serviceName\",\"$.traceId\",\"$.name\",\"$.timestamp\",\"$.duration\",\"$.localEndpoint.ipv4\",\"$.remoteEndpoint.ipv4\",\"$.remoteEndpoint.port\",\"$.shared\",\"$.parentId\",\"$.tags.\\\"http.path\\\"\",\"$.tags.\\\"http.method\\\"\",\"$.tags.\\\"mvc.controller.class\\\"\",\"$.tags.\\\"mvc.controller.method\\\"\",\"$.tags.error\"]"
)
FROM
KAFKA (
"kafka_broker_list" = "IP1:PORT1,IP2:PORT2,IP3:PORT3",
"kafka_topic" = "XXXXXXXXX"
);
Flink 溯源 Parent ID
針對調用鍊路性能瓶頸分析場景中,使用 Flink 進行 Parent ID 溯源,代碼示例如下:
env
// 添加kafka資料源
.addSource(getKafkaSource())
// 将采集到的Json字元串轉換為JSONArray,
// 這個JSONArray是從單個服務采集的資訊,裡面會包含多個Trace的Span資訊
.map(JSON.parseArray(_))
// 将JSONArray轉換為JSONObject,每個JSONObejct就是一個Span
.flatMap(_.asScala.map(_.asInstanceOf[JSONObject]))
// 将Span的JSONObject對象轉換為Bean對象
.map(jsonToBean(_))
// 以traceID+localEndpoint_serviceName作為key對span進行分區生成keyed stream
.keyBy(span => keyOfTrace(span))
// 使用會話視窗,将同一個Trace的不同服務上的所有Span,分發到同一個固定間隔的processing-time視窗
// 這裡為了實作簡單,使用了processing-time session視窗,後續我們會使用starrocks的UDAF函數進行優化,去掉對Flink的依賴
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 使用Aggregate視窗函數
.aggregate(new TraceAggregateFunction)
// 将經過溯源的span集合展開,便于調用flink-connector-starrocks
.flatMap(spans => spans)
// 使用flink-connector-starrocks sink,将資料寫入starrocks中
.addSink(
StarRocksSink.sink(
StarRocksSinkOptions.builder().withProperty("XXX", "XXX").build()))
分析計算
以 圖 6 作為一個微服務系統用例,給出各個統計分析場景對應的 StarRocks SQL 語句。
服務内分析
上遊服務請求名額統計
下面的 SQL 使用 Zipkin 表資料,計算服務 Service2 請求上遊服務 Service3 和上遊服務 Service4 的查詢統計資訊,按小時和接口分組統計查詢名額
select
hr,
name,
req_count,
timeout / req_count * 100 as timeout_rate,
error_count / req_count * 100 as error_rate,
avg_duration,
tp95,
tp99
from
(
select
hr,
name,
count(1) as req_count,
AVG(duration) / 1000 as avg_duration,
sum(if(duration > 200000, 1, 0)) as timeout,
sum(tag_error) as error_count,
percentile_approx(duration, 0.95) / 1000 AS tp95,
percentile_approx(duration, 0.99) / 1000 AS tp99
from
zipkin
where
localEndpoint_serviceName = 'Service2'
and kind = 'CLIENT'
and dt = 20220105
group by
hr,
name
) tmp
order by
hr
下遊服務響應名額統計
下面的 SQL 使用 Zipkin 表資料,計算服務 Service2 響應下遊服務 Service1 的查詢統計資訊,按小時和接口分組統計查詢名額。
select
hr,
name,
req_count,
timeout / req_count * 100 as timeout_rate,
error_count / req_count * 100 as error_rate,
avg_duration,
tp95,
tp99
from
(
select
hr,
name,
count(1) as req_count,
AVG(duration) / 1000 as avg_duration,
sum(if(duration > 200000, 1, 0)) as timeout,
sum(tag_error) as error_count,
percentile_approx(duration, 0.95) / 1000 AS tp95,
percentile_approx(duration, 0.99) / 1000 AS tp99
from
zipkin
where
localEndpoint_serviceName = 'Service2'
and kind = 'SERVER'
and dt = 20220105
group by
hr,
name
) tmp
order by
hr
服務内部處理分析
下面的 SQL 使用 Zipkin 表資料,查詢服務 Service2 的接口 /2/api,按 Span Name 分組統計 Duration 等資訊。
with
spans as (
select * from zipkin where dt = 20220105 and localEndpoint_serviceName = "Service2"
),
api_spans as (
select
spans.id as id,
spans.parentId as parentId,
spans.name as name,
spans.duration as duration
from
spans
inner JOIN
(select * from spans where kind = "SERVER" and name = "/2/api") tmp
on spans.traceId = tmp.traceId
)
SELECT
name,
AVG(inner_duration) / 1000 as avg_duration,
percentile_approx(inner_duration, 0.95) / 1000 AS tp95,
percentile_approx(inner_duration, 0.99) / 1000 AS tp99
from
(
select
l.name as name,
(l.duration - ifnull(r.duration, 0)) as inner_duration
from
api_spans l
left JOIN
api_spans r
on l.parentId = r.id
) tmp
GROUP BY
name
服務間分析
服務拓撲統計
下面的 SQL 使用 Zipkin 表資料,計算服務間的拓撲關系,以及服務間接口 Duration 的統計資訊。
with tbl as (select * from zipkin where dt = 20220105)
select
client,
server,
name,
AVG(duration) / 1000 as avg_duration,
percentile_approx(duration, 0.95) / 1000 AS tp95,
percentile_approx(duration, 0.99) / 1000 AS tp99
from
(
select
c.localEndpoint_serviceName as client,
s.localEndpoint_serviceName as server,
c.name as name,
c.duration as duration
from
(select * from tbl where kind = "CLIENT") c
left JOIN
(select * from tbl where kind = "SERVER") s
on c.id = s.id and c.traceId = s.traceId
) as tmp
group by
client,
server,
name
調用鍊路性能瓶頸分析
下面的 SQL 使用 zipkin_trace_perf 表資料,針對某個服務接口響應逾時的查詢請求,統計出每次請求的調用鍊路中處理耗時最長的服務或服務間調用,進而分析出性能熱點是在某個服務或服務間調用。
select
service,
ROUND(count(1) * 100 / sum(count(1)) over(), 2) as percent
from
(
select
traceId,
service,
duration,
ROW_NUMBER() over(partition by traceId order by duration desc) as rank4
from
(
with tbl as (
SELECT
l.traceId as traceId,
l.id as id,
l.parentId as parentId,
l.kind as kind,
l.duration as duration,
l.localEndpoint_serviceName as localEndpoint_serviceName
FROM
zipkin_trace_perf l
INNER JOIN
zipkin_trace_perf r
on l.traceId = r.traceId
and l.dt = 20220105
and r.dt = 20220105
and r.tag_error = 0 -- 過濾掉出錯的trace
and r.localEndpoint_serviceName = "Service1"
and r.name = "/1/api"
and r.kind = "SERVER"
and r.duration > 200000 -- 過濾掉未逾時的trace
)
select
traceId,
id,
service,
duration
from
(
select
traceId,
id,
service,
(c_duration - s_duration) as duration,
ROW_NUMBER() over(partition by traceId order by (c_duration - s_duration) desc) as rank2
from
(
select
c.traceId as traceId,
c.id as id,
concat(c.localEndpoint_serviceName, "=>", ifnull(s.localEndpoint_serviceName, "?")) as service,
c.duration as c_duration,
ifnull(s.duration, 0) as s_duration
from
(select * from tbl where kind = "CLIENT") c
left JOIN
(select * from tbl where kind = "SERVER") s
on c.id = s.id and c.traceId = s.traceId
) tmp1
) tmp2
where
rank2 = 1
union ALL
select
traceId,
id,
service,
duration
from
(
select
traceId,
id,
service,
(s_duration - c_duration) as duration,
ROW_NUMBER() over(partition by traceId order by (s_duration - c_duration) desc) as rank2
from
(
select
s.traceId as traceId,
s.id as id,
s.localEndpoint_serviceName as service,
s.duration as s_duration,
ifnull(c.duration, 0) as c_duration,
ROW_NUMBER() over(partition by s.traceId, s.id order by ifnull(c.duration, 0) desc) as rank
from
(select * from tbl where kind = "SERVER") s
left JOIN
(select * from tbl where kind = "CLIENT") c
on s.id = c.parentId and s.traceId = c.traceId
) tmp1
where
rank = 1
) tmp2
where
rank2 = 1
) tmp3
) tmp4
where
rank4 = 1
GROUP BY
service
order by
percent desc
SQL 查詢的結果如下圖所示,在逾時的 Trace 請求中,性能瓶頸服務或服務間調用的比例分布。
圖 12
03 實踐效果
目前搜狐智能媒體已在 30+ 個服務中接入 Zipkin,涵蓋上百個線上服務執行個體,1% 的采樣率每天産生近 10億 多行的日志。
通過 Zipkin Server 查詢 StarRocks,擷取的 Trace 資訊如下圖所示:
圖 13
通過 Zipkin Server 查詢 StarRocks,擷取的服務拓撲資訊如下圖所示:
圖 14
基于 Zipkin StarRocks 的鍊路追蹤體系實踐過程中,明顯提升了微服務監控分析能力和工程效率:
提升微服務監控分析能力
- 在監控報警方面,可以基于 StarRocks 查詢統計線上服務目前時刻的響應延遲百分位數、錯誤率等名額,根據這些名額及時産生各類告警;
- 在名額統計方面,可以基于 StarRocks 按天、小時、分鐘等粒度統計服務響應延遲的各項名額,更好的了解服務運作狀況;
- 在故障分析方面,基于 StarRocks 強大的 SQL 計算能力,可以進行服務、時間、接口等多個次元的探索式分析查詢,定位故障原因。
提升微服務監控工程效率
Metric 和 Logging 資料采集,很多需要使用者手動埋點和安裝各種采集器 Agent,資料采集後存儲到 ElasticSearch 等存儲系統,每上一個業務,這些流程都要操作一遍,非常繁瑣,且資源分散不易管理。
而使用 Zipkin + StarRocks 的方式,隻需在代碼中引入對應庫 SDK,設定上報的 Kafka 位址和采樣率等少量配置資訊,Tracing 便可自動埋點采集,通過 zikpin server 界面進行查詢分析,非常簡便。
04 總結與展望
基于 Zipkin+StarRocks 建構鍊路追蹤系統,能夠提供微服務監控的 Monitoring 和 Observability 能力,提升微服務監控的分析能力和工程效率。
後續有幾個 優化點 ,可以進一步提升鍊路追蹤系統的分析能力和易用性:
- 使用 StarRocks 的 UDAF、視窗函數等功能,将 Parent ID 溯源下沉到 StarRocks計算,通過計算後置的方式,取消對 Flink 的依賴,進一步簡化整個系統架構。
- 目前對原始日志中的 tag s等字段,并沒有完全采集,StarRocks 正在實作 Json 資料類型,能夠更好的支援 tags 等嵌套資料類型。
- Zipkin Server 目前的界面還稍顯簡陋,我們已經打通了 Zipkin Server 查詢 StarRokcs,後續會對 Zipkin Server 進行 U I等優化,通過 StarRocks 強大的計算能力實作更多的名額查詢,進一步提升使用者體驗。