天天看點

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

作者:Java開發技術分享

在微服務體系架構下,搜狐智能媒體使用 Zipkin 進行服務鍊路追蹤(Tracing)的埋點采集,将采集的 Trace 資訊存儲到 StarRocks 中。通過 StarRocks 強大的 SQL 計算能力,對 Tracing 資訊進行多元度的統計、分析等操作,提升了微服務監控能力,從簡單統計的 Monitoring 上升到更多元度探索分析的 Observability。

全文主要分為三個部分:第一節主要介紹微服務下的常用監控方式,其中鍊路追蹤技術,可以串聯整個服務調用鍊路,獲得整體服務的關鍵資訊,對微服務的監控有非常重要的意義。第二節主要介紹搜狐智能媒體是如何建構鍊路追蹤分析體系的,主要包括 Zipkin 的資料采集,StarRocks 的資料存儲,以及根據應用場景對 StarRocks 進行分析計算等三個部分。第三節主要介紹搜狐智能媒體通過引入 Zipkin 和 StarRocks 進行鍊路追蹤分析取得的一些實踐效果。

01 微服務架構中的鍊路追蹤

近年來,企業 IT 應用架構逐漸向微服務、雲原生等分布式應用架構演進,在搜狐智能媒體内部,應用服務按照微服務、Docker、Kubernetes、Spring Cloud 等架構思想和技術方案進行研發運維, 提升部門整體工程效率 。

微服務架構提升工程效率的同時,也帶來了一些新的問題。微服務是一個分布式架構,它按業務劃分服務單元,使用者的每次請求不再是由某一個服務獨立完成了,而是變成了多個服務一起配合完成。這些服務可能是由不同的團隊、使用不同的程式設計語言實作,可能布在了不同的伺服器、甚至不同的資料中心。如果使用者請求出現了錯誤和異常,微服務分布式調用的特性決定了這些 故障難以定位 ,相對于傳統的單體架構,微服務監控面臨着新的難題。

Logging、Metrics、Tracing

微服務監控可以包含很多方式,按照監測的資料類型主要劃分為 Logging、Metrics 和Tracing 三大領域:

Logging

使用者主動記錄的離散事件,記錄的資訊一般是非結構化的文本内容,在使用者進行問題分析判斷時可以提供更為詳盡的線索。

具有聚合屬性的采集資料,旨在為使用者展示某個名額在某個時段的運作狀态,用于檢視一些名額和趨勢。

Tracing

記錄一次請求調用的生命周期全過程,其中包括服務調用和處理時長等資訊,含有請求上下文環境,由一個全局唯一的 Trace ID 來進行辨別和串聯整個調用鍊路,非常适合微服務架構的監控場景。

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 1

三者的關系如上圖所示,這三者之間也是有重疊的,比如 Logging 可以聚合相關字段生成 Metrics 資訊,關聯相關字段生成 Tracing 資訊;Tracing 可以聚合查詢次數生成 Metrics 資訊,可以記錄業務日志生成 Logging 資訊。一般情況下要在 Metrics 和 Logging 中增加字段串聯微服務請求調用生命周期比較困難,通過 Tracing 擷取 Metrics 和 Logging 則相對容易很多。

另外,這三者對存儲資源有着不同的需求,Metrics 是天然的壓縮資料,最節省資源;Logging 傾向于無限增加的,甚至會超出預期的容量;Tracing 的存儲容量,一般介于 Metrics 和 Logging 兩者之間,另外還可通過采樣率進一步控制容量需求。

從 Monitoring 到 Observability

Monitoring tells you whether the system works. Observability lets you ask why it's not working.

– Baron Schwarz

微服務監控從資料分析層次,可以簡單分為 Monitoring 和 Observability。

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

Monitoring

告訴你系統是否在工作,對已知場景的預定義計算,對各種監控問題的事前假設。對應上圖 Known Knowns 和 Known Unknowns,都是事先假設可能會發生的事件,包括已經明白和不明白的事件。

Observability

可以讓你詢問系統為什麼不工作,對未知場景的探索式分析,對任意監控問題的事後分析。對應上圖 Unknown Knowns 和 Unknown Unknowns,都是事未察覺可能會發生的事件,包括已經明白和不明白的事件。

很顯然,通過預先假設所有可能發生事件進行 Monitoring 的方式,已經不能滿足微服務複雜的監控場景,我們需要能夠提供探索式分析的 Observability 監控方式。在 Logging、Metrics 和 Tracing,Tracing 是目前能提供多元度監控分析能力的最有效方式。

Tracing

鍊路追蹤 Tracing Analysis 為分布式應用的開發者提供了完整的調用鍊路還原、調用請求量統計、鍊路拓撲、應用依賴分析等工具,可以幫助開發者快速分析和診斷分布式應用架構下的性能瓶頸,提高微服務時代下的開發診斷效率。

Tracing 可以串聯微服務中分布式請求的調用鍊路,在微服務監控體系中有着重要的作用。另外,Tracing 介于 Metrics 和 Logging 之間,既可以完成 Monitoring 的工作,也可以進行 Observability 的分析,提升監控體系建設效率。

系統模型

鍊路追蹤(Tracing)系統,需要記錄一次特定請求經過的上下遊服務調用鍊路,以及各服務所完成的相關工作資訊。

如下圖所示的微服務系統,使用者向服務 A 發起一個請求,服務 A 會生成一個全局唯一的 Trace ID,服務 A 内部 Messaging 方式調用相關處理子產品(比如跨線程異步調用等),服務 A 子產品再通過 RPC 方式并行調用服務 B 和服務 C;服務 B 會即刻傳回響應,但服務 C 會采用串行方式,先用 RPC 調用服務 D,再用 RPC 調用服務 E,然後再響應服務 A 的調用請求;服務 A 在内部兩個子產品調用處理完後,會響應最初的使用者請求。

最開始生成的 Trace ID 會在這一系列的服務内部或服務之間的請求調用中傳遞,進而将這些請求調用連接配接起來。另外,Tracing 系統還會記錄每一個請求調用處理的 Timestamp、服務名等等相關資訊。

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 3(注:服務内部串行調用對系統性能有影響,一般采用并行調用方式,後續章節将隻考慮并行調用場景。)

在 Tracing 系統中,主要包含 Trace 和 Span 兩個基礎概念,下圖展示了一個由 Span 構成的 Trace。

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 4

Trace 指一個外部請求經過的所有服務的調用鍊路,可以了解為一個有服務調用組成的樹狀結構,每條鍊路都有一個全局唯一的 ID 來辨別。

Span 指服務内部或服務之間的一次調用,即 Trace 樹中的節點,如下圖所示的由 Span 構成的 Trace 樹,樹中的 Span 節點之間存在父子關系。Span 主要包含 Span名稱、Span ID、父 ID,以及 Timestamp、Dration(包含子節點調用處理的 duration)、業務資料等其他 log 資訊。

Span 根據調用方式可以分為 RPC Span 和 Messaging Span:

RPC Span

由 RPC Tracing 生成,分為 Client 和 Server 兩類 Span,分别由 RPC 服務調用的 Client 節點和 Server 節點記錄生成,兩者共享 Span ID、Parent Span ID 等資訊,但要注意,這兩個 Span 記錄的時間是有偏差,這個偏差是服務間的調用開銷,一般是由網絡傳輸開銷、代理服務或服務接口消息排隊等情況引起的。

Messaging Span

由 Messaging Tracing 生成,一般用于 Tracing 服務内部調用,不同于 RPC Span,Messaging Span 之間不會共享 Span ID 等資訊。

應用場景

根據 Tracing 的系統模型,可獲得服務響應等各類 Metric 資訊,用于 Alerting、DashBoard 查詢等;也可根據 Span 組成的鍊路,分析單個或整體服務情況,發現服務性能瓶頸、網絡傳輸開銷、服務内異步調用設計等各種問題。如下圖所示,相比于 Metrics 和 Logging,Tracing 可以同時涵蓋監控的 Monitoring 和 Observability 場景,在監控體系中占據重要位置,Opentracing、Opencensus、Opentelemetry 等協會群組織都包含對 Tracing 的支援。

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 5

從微服務的角度,Tracing 記錄的 Span 資訊可以進行各種次元的統計和分析。下圖基于 HTTP API 設計的微服務系統為例,使用者查詢 Service1的 /1/api 接口,Service1 再請求 Service2 的 /2/api,Service2 内部異步并發調用 msg2.1 和 msg2.2,msg2.1 請求 Service3的 /3/api接口,msg2.2 請求 Service4 的 /4/api接口,Service3 内部調用 msg3,Service4 再請求 Service5 的 /5/api,其中 Service5 沒有進行 Tracing 埋點,無法采集 Service5 的資訊。

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 6

針對上圖的微服務系統,可以進行如下兩大類的統計分析操作:

服務内分析

關注單個服務運作情況,比如對外服務接口和上遊接口查詢的性能名額等,分析場景主要有:

1、上遊服務請求

如 Service1 提供的 /1/api ,Service4 提供的 /4/api等,統計獲得次數、QPS、耗時百分位數、出錯率、逾時率等等 metric 資訊。

2、下遊服務響應

如 Service1 請求的 /2/api 、Service4 請求的 /5/api等,統計查詢次數、QPS、耗時百分位數、出錯率、逾時率等等 Metric 資訊。

3、服務内部處理

服務對外接口在内部可能會被分拆為多個 Span,可以按照 Span Name 進行分組聚合統計,發現耗時最長的 Span 等,如 Service2 接口 /2/api ,接口服務内部 Span 包括 /2/api 的 Server Span,call2.1 對應的 Span 和 call2.2 對應的 Span,通過 Span 之間的依賴關系可以算出這些 Span 自身的耗時 Duraion,進行各類統計分析。

服務間分析

在進行微服務整體分析時,我們将單個服務看作黑盒,關注服務間的依賴、調用鍊路上的服務熱點等,分析場景主要有:

1、服務拓撲統計

可以根據服務間調用的 Client Span 和 Server Span,獲得整個服務系統的拓撲結構,以及服務之間調用請求次數、Duration 等統計資訊。

2、調用鍊路性能瓶頸分析

分析某個對外請求接口的調用鍊路上的性能瓶頸,這個瓶頸可能是某個服務内部處理開銷造成的,也可能是某兩個服務間的網絡調用開銷等等原因造成的。

對于一次調用涉及到數十個以上微服務的複雜調用請求,每次出現的性能瓶頸很可能都會不一樣,此時就需要進行聚合統計,算出性能瓶頸出現頻次的排名,分析出針對性能瓶頸熱點的服務或服務間調用。

以上僅僅是列舉的部分分析場景,Tracing 提供的資訊其實可以支援更多的 Metric 統計和探索式分析場景,本文不再一一例舉。

02 基于 Zipkin 和 StarRocks 建構鍊路追蹤分析系統

鍊路追蹤系統主要分為資料采集、資料存儲和分析計算三大部分,目前使用最廣泛的開源鍊路追蹤系統是 Zipkin,它主要包括資料采集和分析計算兩大部分,底層的存儲依賴其他存儲系統。搜狐智能媒體在建構鍊路追蹤系統時,最初采用 Zipkin + ElasticSearch 得方式進行建構,後增加 StarRocks 作為底層存儲系統,并基于 StarRocks 進行分析統計,系統總體架構如下圖。

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 7

資料采集

Zipkin 支援用戶端全自動埋點,隻需将相關庫引入應用程式中并簡單配置,就可以實作 Span 資訊自動生成,Span 資訊通過 HTTP 或 Kafka 等方式自動進行上傳。Zipkin 目前提供了絕大部分語言的埋點采集庫,如 Java 語言的 Spring Cloud 提供了 Sleuth 與 Zipkin 進行深度綁定,對開發人員基本做到透明使用。為了解決存儲空間,在使用時一般要設定 1/100 左右的采樣率,Dapper 的論文中提到即便是 1/1000 的采樣率,對于跟蹤資料的通用使用層面上,也可以提供足夠多的資訊。

資料模型

對應 圖 6 ,下面給出了 Zipkin Span 埋點采集示意圖 (圖 8),具體流程如下:

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 8

  1. 使用者發送給 Service1 的 Request 中,不含有 Trace 和 Span 資訊,Service1 會建立一個 Server Span,随機生成全局唯一的 TraceID(如圖中的 X)和 SpanId(如圖中的 A,此處的 X 和 A 會使用相同的值),記錄 Timestamp 等資訊;Service1 在給使用者傳回 Response 時,Service1 會統計 Server Span 的處理耗時 Duration,會将包含 TraceID、SpanID、Timestamp、Duration 等資訊的 Server Span 完整資訊進行上報。
  2. Service1 向 Service2 發送的請求,會建立一個 Client Span,使用 X 作為 Trace ID,随機生成全局唯一的 SpanID(如圖中的 B),記錄 Timestamp 等資訊,同時 Service1 會将 Trace ID(X)和 SpanID(B)傳遞給 Service2(如在 HTTP 協定的 HEADER 中添加 TraceID 和 SpanID 等相關字段);Service1 在收到 Service2 的響應後,Service1 會處理 Client Span 相關資訊,并将 Client Span 進行上報
  3. Service2 收到 Service1 的 Request 中,包含 Trace(X)和 Span(B)等資訊,Service2 會建立一個 Server Span,使用 X 作為 Trace ID,B 作為 SpanID,内部調用msg2.1 和 msg2.2 同時,将 Trace ID(X)和 SpanID(B)傳遞給它們;Service2 在收到 msg2.1 和 msg2.2 的傳回後,Service1 會處理 Server Span 相關資訊,并将此 Server Span 進行上報
  4. Service2 的 msg2.1 和 msg2.2 會分别建立一個 Messaging Span,使用 X 作為 Trace ID,随機生成全局唯一的 SpanID(如圖中的 C 和 F),記錄 Timestamp 等資訊,分别向 Service3 和 Service4 發送請求;msg2.1 和 msg2.2 收到響應後,會分别處理 Messaging Span 相關資訊,并将兩個 Messaging Span 進行上報
  5. Service2 向 Service3 和 Service4 發送的請求,會各建立一個 Client Span,使用 X 作為 Trace ID,随機生成全局唯一的 SpanID(如圖中的 D 和 G),記錄 Timestamp 等資訊,同時 Service2 會将 Trace ID(X)和 SpanID(D 或 G)傳遞給 Service3 和 Service4;Service12 在收到 Service3 和 Service3 的響應後,Service2 會分别處理 Client Span 相關資訊,并将兩個 Client Span 進行上報
  6. Service3 收到 Service2 的Request中,包含 Trace(X)和Span(D)等資訊,Service3 會建立一個 Server Span,使用 X 作為 Trace ID,D 作為 SpanID,内部調用 msg3;Service3 在收到 msg3 的傳回後,Service3 會處理此 Server Span 相關資訊,并将此 Server Span 進行上報
  7. Service3 的 msg3 會分别建立一個 Messaging Span,使用 X 作為 Trace ID,随機生成全局唯一的 SpanID(如圖中的 E),記錄 Timestamp 等資訊,msg3 處理完成後,處理此 Messaging Span 相關資訊,并将此 Messaging Span 進行上報
  8. Service4 收到 Service2 的 Request 中,包含 Trace(X)和 Span(G)等資訊,Service4 會建立一個 Server Span,使用 X 作為 Trace ID,G 作為 SpanID,再向 Service5 發送請求;Service4 在收到 Service5 的響應後,Service4 會處理此 Server Span 相關資訊,并将此 Server Span 進行上報
  9. Service4 向 Service5 發送的請求,會建立一個 Client Span,使用 X 作為 Trace ID,随機生成全局唯一的 SpanID(如圖中的 H),記錄 Timestamp 等資訊,同時 Service4 會将 Trace ID(X)和 SpanID(H)傳遞給 Service5;Service4 在收到 Service5 的響應後,Service4 會處理 Client Span 相關資訊,并将此 Client Span 進行上報

上面整個 Trace X 調用鍊路會生成的 Span 記錄如下圖,每個 Span 主要會記錄 Span Id、Parent Id、Kind(CLIENT 表示 RPC CLIENT 端 Span,SERVER 表示 RPC SERVER 端 SPAN,NULL 表示 Messaging SPAN),SN(Service Name),還會包含 Trace ID,時間戳、Duration 等資訊。Service5 沒有進行 Zipkin 埋點采集,是以不會有 Service5 的 Span 記錄。

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 9

資料格式

設定了 Zipkin 埋點的應用服務,預設會使用 Json 格式向 Kafka 上報 Span 資訊,上報的資訊主要有如下幾個注意點:

每個應用服務每次會上報一組 Span,組成一個 Json 數組上報

Json 數組裡包含不同 Trace的Span,即不是所有的 Trace ID都 相同

不同形式的接口(如 Http、Grpc、Dubbo 等),除了主要字段相同外,在 tags 中會各自記錄一些不同的字段

[
  {
    "traceId": "3112dd04c3112036",
    "id": "3112dd04c3112036",
    "kind": "SERVER",
    "name": "get /2/api",
    "timestamp": 1618480662355011,
    "duration": 12769,
    "localEndpoint": {
      "serviceName": "SERVICE2",
      "ipv4": "172.24.132.32"
    },
    "remoteEndpoint": {
      "ipv4": "111.25.140.166",
      "port": 50214
    },
    "tags": {
      "http.method": "GET",
      "http.path": "/2/api",
      "mvc.controller.class": "Controller",
      "mvc.controller.method": "get2Api"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "3112dd04c3112036",
    "id": "b4bd9859c690160a",
    "name": "msg2.1",
    "timestamp": 1618480662357211,
    "duration": 11069,
    "localEndpoint": {
      "serviceName": "SERVICE2"
    },
    "tags": {
      "class": "MSG",
      "method": "msg2.1"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "3112dd04c3112036",
    "id": "c31d9859c69a2b21",
    "name": "msg2.2",
    "timestamp": 1618480662357201,
    "duration": 10768,
    "localEndpoint": {
      "serviceName": "SERVICE2"
    },
    "tags": {
      "class": "MSG",
      "method": "msg2.2"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "b4bd9859c690160a",
    "id": "f1659c981c0f4744",
    "kind": "CLIENT",
    "name": "get /3/api",
    "timestamp": 1618480662358201,
    "duration": 9206,
    "localEndpoint": {
      "serviceName": "SERVICE2",
      "ipv4": "172.24.132.32"
    },
    "tags": {
      "http.method": "GET",
      "http.path": "/3/api"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "c31d9859c69a2b21",
    "id": "73cd1cab1d72a971",
    "kind": "CLIENT",
    "name": "get /4/api",
    "timestamp": 1618480662358211,
    "duration": 9349,
    "localEndpoint": {
      "serviceName": "SERVICE2",
      "ipv4": "172.24.132.32"
    },
    "tags": {
      "http.method": "GET",
      "http.path": "/4/api"
    }
  }
]           
詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 10

資料存儲

Zipkin 支援 MySQL、Cassandra 和 ElasticSearch 三種資料存儲,這三者都存在各自的缺點:

  • MySQL:采集的 Tracing 資訊基本都在每天上億行甚至百億行以上,MySQL 無法支撐這麼大資料量。
  • Cassandra:能支援對單個 Trace 的 Span 資訊分析,但對聚合查詢等資料統計分析場景支援不好
  • ElasticSearch:能支援單個 Trace 的分析和簡單的聚合查詢分析,但對于一些較複雜的資料分析計算不能很好的支援,比如涉及到 Join、視窗函數等等的計算需求,尤其是任務間依賴計算,Zipkin 目前還不能實時計算,需要通過離線跑 Spark 任務計算任務間依賴資訊。

我們在實踐中也是首先使用 ElasticSearch,發現了上面提到的問題,比如 Zipkin 的服務依賴拓撲必須使用離線方式計算,便新增了 StarRocks 作為底層資料存儲。将 Zipkin 的 trace 資料導入到StarRocks很友善,基本步驟隻需要兩步,CREATE TABLE + CREATE ROUTINE LOAD。

另外,在調用鍊路性能瓶頸分析場景中,要将單個服務看作黑盒,隻關注 RPC SPAN,屏蔽掉服務内部的 Messaging Span,使用了 Flink 對服務内部 span 進行 ParentID 溯源,即從 RPC Client SPAN,一直追溯到同一服務同一 Trace ID 的 RPC Server SPAN,用 RPC Server SPAN 的 ID 替換 RPC Client SPAN 的parentId,最後通過Flink-Connector-StarRocks将轉換後的資料實時寫入StarRocks。

基于 StarRocks 的資料存儲架構流程如下圖所示。

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 11

CREATE TABLE

建表語句示例參考如下,有如下幾點注意點:

  • 包括 Zipkin 和 zipkin_trace_perf 兩張表,zipkin_trace_perf 表隻用于調用鍊路性能瓶頸分析場景,其他統計分析都适用 Zipkin 表
  • 通過采集資訊中的 Timestamp 字段,生成 dt、hr、min 時間字段,便于後續統計分析
  • 采用 DUPLICATE 模型、Bitmap 索引等設定,加快查詢速度
  • Zipkin 表使用id作為分桶字段,在查詢服務拓撲時,查詢計劃會優化為 Colocate Join,提升查詢性能。

Zipkin

CREATE TABLE `zipkin` (
  `traceId` varchar(24) NULL COMMENT "",
  `id` varchar(24) NULL COMMENT "Span ID",
  `localEndpoint_serviceName` varchar(512) NULL COMMENT "",
  `dt` int(11) NULL COMMENT "",
  `parentId` varchar(24) NULL COMMENT "",
  `timestamp` bigint(20) NULL COMMENT "",
  `hr` int(11) NULL COMMENT "",
  `min` bigint(20) NULL COMMENT "",
  `kind` varchar(16) NULL COMMENT "",
  `duration` int(11) NULL COMMENT "",
  `name` varchar(300) NULL COMMENT "",
  `localEndpoint_ipv4` varchar(16) NULL COMMENT "",
  `remoteEndpoint_ipv4` varchar(16) NULL COMMENT "",
  `remoteEndpoint_port` varchar(16) NULL COMMENT "",
  `shared` int(11) NULL COMMENT "",
  `tag_error` int(11) NULL DEFAULT "0" COMMENT "",
  `error_msg` varchar(1024) NULL COMMENT "",
  `tags_http_path` varchar(2048) NULL COMMENT "",
  `tags_http_method` varchar(1024) NULL COMMENT "",
  `tags_controller_class` varchar(100) NULL COMMENT "",
  `tags_controller_method` varchar(1024) NULL COMMENT "",
  INDEX service_name_idx (`localEndpoint_serviceName`) USING BITMAP COMMENT ''
) ENGINE=OLAP 
DUPLICATE KEY(`traceId`, `parentId`, `id`, `timestamp`, `localEndpoint_serviceName`, `dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p20220104 VALUES [("20220104"), ("20220105")),
 PARTITION p20220105 VALUES [("20220105"), ("20220106")))
DISTRIBUTED BY HASH(`id`) BUCKETS 100 
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "100",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);           

zipkin_trace_perf

CREATE TABLE `zipkin_trace_perf` (
  `traceId` varchar(24) NULL COMMENT "",
  `id` varchar(24) NULL COMMENT "",
  `dt` int(11) NULL COMMENT "",
  `parentId` varchar(24) NULL COMMENT "",
  `localEndpoint_serviceName` varchar(512) NULL COMMENT "",
  `timestamp` bigint(20) NULL COMMENT "",
  `hr` int(11) NULL COMMENT "",
  `min` bigint(20) NULL COMMENT "",
  `kind` varchar(16) NULL COMMENT "",
  `duration` int(11) NULL COMMENT "",
  `name` varchar(300) NULL COMMENT "",
  `tag_error` int(11) NULL DEFAULT "0" COMMENT ""
) ENGINE=OLAP 
DUPLICATE KEY(`traceId`, `id`, `dt`, `parentId`, `localEndpoint_serviceName`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p20220104 VALUES [("20220104"), ("20220105")),
 PARTITION p20220105 VALUES [("20220105"), ("20220106")))
DISTRIBUTED BY HASH(`traceId`) BUCKETS 32 
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-60",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "12",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);           

ROUTINE LOAD

ROUTINE LOAD 建立語句示例如下:

CREATE ROUTINE LOAD zipkin_routine_load ON zipkin COLUMNS(
  id,
  kind,
  localEndpoint_serviceName,
  traceId,
  `name`,
  `timestamp`,
  `duration`,
  `localEndpoint_ipv4`,
  `remoteEndpoint_ipv4`,
  `remoteEndpoint_port`,
  `shared`,
  `parentId`,
  `tags_http_path`,
  `tags_http_method`,
  `tags_controller_class`,
  `tags_controller_method`,
  tmp_tag_error,
  tag_error = if(`tmp_tag_error` IS NULL, 0, 1),
  error_msg = tmp_tag_error,
  dt = from_unixtime(`timestamp` / 1000000, '%Y%m%d'),
  hr = from_unixtime(`timestamp` / 1000000, '%H'),
  `min` = from_unixtime(`timestamp` / 1000000, '%i')
) PROPERTIES (
  "desired_concurrent_number" = "3",
  "max_batch_interval" = "50",
  "max_batch_rows" = "300000",
  "max_batch_size" = "209715200",
  "max_error_number" = "1000000",
  "strict_mode" = "false",
  "format" = "json",
  "strip_outer_array" = "true",
  "jsonpaths" = "[\"$.id\",\"$.kind\",\"$.localEndpoint.serviceName\",\"$.traceId\",\"$.name\",\"$.timestamp\",\"$.duration\",\"$.localEndpoint.ipv4\",\"$.remoteEndpoint.ipv4\",\"$.remoteEndpoint.port\",\"$.shared\",\"$.parentId\",\"$.tags.\\\"http.path\\\"\",\"$.tags.\\\"http.method\\\"\",\"$.tags.\\\"mvc.controller.class\\\"\",\"$.tags.\\\"mvc.controller.method\\\"\",\"$.tags.error\"]"
)
FROM
  KAFKA (
    "kafka_broker_list" = "IP1:PORT1,IP2:PORT2,IP3:PORT3",
    "kafka_topic" = "XXXXXXXXX"
  );           

Flink 溯源 Parent ID

針對調用鍊路性能瓶頸分析場景中,使用 Flink 進行 Parent ID 溯源,代碼示例如下:

env
  // 添加kafka資料源
  .addSource(getKafkaSource())
  // 将采集到的Json字元串轉換為JSONArray,
  // 這個JSONArray是從單個服務采集的資訊,裡面會包含多個Trace的Span資訊
  .map(JSON.parseArray(_))
  // 将JSONArray轉換為JSONObject,每個JSONObejct就是一個Span
  .flatMap(_.asScala.map(_.asInstanceOf[JSONObject]))
  // 将Span的JSONObject對象轉換為Bean對象
  .map(jsonToBean(_))
  // 以traceID+localEndpoint_serviceName作為key對span進行分區生成keyed stream
  .keyBy(span => keyOfTrace(span))
  // 使用會話視窗,将同一個Trace的不同服務上的所有Span,分發到同一個固定間隔的processing-time視窗
  // 這裡為了實作簡單,使用了processing-time session視窗,後續我們會使用starrocks的UDAF函數進行優化,去掉對Flink的依賴
  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  // 使用Aggregate視窗函數
  .aggregate(new TraceAggregateFunction)
  // 将經過溯源的span集合展開,便于調用flink-connector-starrocks
  .flatMap(spans => spans)
  // 使用flink-connector-starrocks sink,将資料寫入starrocks中
  .addSink(
    StarRocksSink.sink(
      StarRocksSinkOptions.builder().withProperty("XXX", "XXX").build()))           

分析計算

以 圖 6 作為一個微服務系統用例,給出各個統計分析場景對應的 StarRocks SQL 語句。

服務内分析

上遊服務請求名額統計

下面的 SQL 使用 Zipkin 表資料,計算服務 Service2 請求上遊服務 Service3 和上遊服務 Service4 的查詢統計資訊,按小時和接口分組統計查詢名額

select
  hr,
  name,
  req_count,
  timeout / req_count * 100 as timeout_rate,
  error_count / req_count * 100 as error_rate,
  avg_duration,
  tp95,
  tp99
from
  (
    select
      hr,
      name,
      count(1) as req_count,
      AVG(duration) / 1000 as avg_duration,
      sum(if(duration > 200000, 1, 0)) as timeout,
      sum(tag_error) as error_count,
      percentile_approx(duration, 0.95) / 1000 AS tp95,
      percentile_approx(duration, 0.99) / 1000 AS tp99
    from
      zipkin
    where
      localEndpoint_serviceName = 'Service2'
      and kind = 'CLIENT'
      and dt = 20220105
    group by
      hr,
      name
  ) tmp
order by
  hr           

下遊服務響應名額統計

下面的 SQL 使用 Zipkin 表資料,計算服務 Service2 響應下遊服務 Service1 的查詢統計資訊,按小時和接口分組統計查詢名額。

select
  hr,
  name,
  req_count,
  timeout / req_count * 100 as timeout_rate,
  error_count / req_count * 100 as error_rate,
  avg_duration,
  tp95,
  tp99
from
  (
    select
      hr,
      name,
      count(1) as req_count,
      AVG(duration) / 1000 as avg_duration,
      sum(if(duration > 200000, 1, 0)) as timeout,
      sum(tag_error) as error_count,
      percentile_approx(duration, 0.95) / 1000 AS tp95,
      percentile_approx(duration, 0.99) / 1000 AS tp99
    from
      zipkin
    where
      localEndpoint_serviceName = 'Service2'
      and kind = 'SERVER'
      and dt = 20220105
    group by
      hr, 
      name
  ) tmp
order by
  hr           

服務内部處理分析

下面的 SQL 使用 Zipkin 表資料,查詢服務 Service2 的接口 /2/api,按 Span Name 分組統計 Duration 等資訊。

with 
spans as (
  select * from zipkin where dt = 20220105 and localEndpoint_serviceName = "Service2"
),
api_spans as (
  select
    spans.id as id,
    spans.parentId as parentId,
    spans.name as name,
    spans.duration as duration
  from
    spans
    inner JOIN 
    (select * from spans where kind = "SERVER" and name = "/2/api") tmp 
    on spans.traceId = tmp.traceId
)
SELECT
  name,
  AVG(inner_duration) / 1000 as avg_duration,
  percentile_approx(inner_duration, 0.95) / 1000 AS tp95,
  percentile_approx(inner_duration, 0.99) / 1000 AS tp99
from
  (
    select
      l.name as name,
      (l.duration - ifnull(r.duration, 0)) as inner_duration
    from
      api_spans l
      left JOIN 
      api_spans r 
      on l.parentId = r.id
  ) tmp
GROUP BY
  name           

服務間分析

服務拓撲統計

下面的 SQL 使用 Zipkin 表資料,計算服務間的拓撲關系,以及服務間接口 Duration 的統計資訊。

with tbl as (select * from zipkin where dt = 20220105)
select 
  client, 
  server, 
  name,
  AVG(duration) / 1000 as avg_duration,
  percentile_approx(duration, 0.95) / 1000 AS tp95,
  percentile_approx(duration, 0.99) / 1000 AS tp99
from
  (
    select
      c.localEndpoint_serviceName as client,
      s.localEndpoint_serviceName as server,
      c.name as name,
      c.duration as duration
    from
    (select * from tbl where kind = "CLIENT") c
    left JOIN 
    (select * from tbl where kind = "SERVER") s 
    on c.id = s.id and c.traceId = s.traceId
  ) as tmp
group by 
  client,  
  server,
  name           

調用鍊路性能瓶頸分析

下面的 SQL 使用 zipkin_trace_perf 表資料,針對某個服務接口響應逾時的查詢請求,統計出每次請求的調用鍊路中處理耗時最長的服務或服務間調用,進而分析出性能熱點是在某個服務或服務間調用。

select
  service,
  ROUND(count(1) * 100 / sum(count(1)) over(), 2) as percent
from
  (
    select
      traceId,
      service,
      duration,
      ROW_NUMBER() over(partition by traceId order by duration desc) as rank4
    from
      (
        with tbl as (
          SELECT
            l.traceId as traceId,
            l.id as id,
            l.parentId as parentId,
            l.kind as kind,
            l.duration as duration,
            l.localEndpoint_serviceName as localEndpoint_serviceName
          FROM
            zipkin_trace_perf l
            INNER JOIN 
            zipkin_trace_perf r 
            on l.traceId = r.traceId
              and l.dt = 20220105
              and r.dt = 20220105
              and r.tag_error = 0     -- 過濾掉出錯的trace
              and r.localEndpoint_serviceName = "Service1"
              and r.name = "/1/api"
              and r.kind = "SERVER"
              and r.duration > 200000  -- 過濾掉未逾時的trace
        )
        select
          traceId,
          id,
          service,
          duration
        from
          (
            select
              traceId,
              id,
              service,
              (c_duration - s_duration) as duration,
              ROW_NUMBER() over(partition by traceId order by (c_duration - s_duration) desc) as rank2
            from
              (
                select
                  c.traceId as traceId,
                  c.id as id,
                  concat(c.localEndpoint_serviceName, "=>", ifnull(s.localEndpoint_serviceName, "?")) as service,
                  c.duration as c_duration,
                  ifnull(s.duration, 0) as s_duration
                from
                  (select * from tbl where kind = "CLIENT") c
                  left JOIN 
                  (select * from tbl where kind = "SERVER") s 
                  on c.id = s.id and c.traceId = s.traceId
              ) tmp1
          ) tmp2
        where
          rank2 = 1
        union ALL
        select
          traceId,
          id,
          service,
          duration
        from
          (
            select
              traceId,
              id,
              service,
              (s_duration - c_duration) as duration,
              ROW_NUMBER() over(partition by traceId order by (s_duration - c_duration) desc) as rank2
            from
              (
                select
                  s.traceId as traceId,
                  s.id as id,
                  s.localEndpoint_serviceName as service,
                  s.duration as s_duration,
                  ifnull(c.duration, 0) as c_duration,
                  ROW_NUMBER() over(partition by s.traceId, s.id order by ifnull(c.duration, 0) desc) as rank
                from
                  (select * from tbl where kind = "SERVER") s
                  left JOIN 
                  (select * from tbl where kind = "CLIENT") c 
                  on s.id = c.parentId and s.traceId = c.traceId
              ) tmp1
            where
              rank = 1
          ) tmp2
        where
          rank2 = 1
      ) tmp3
  ) tmp4
where
  rank4 = 1
GROUP BY
  service
order by
  percent desc           

SQL 查詢的結果如下圖所示,在逾時的 Trace 請求中,性能瓶頸服務或服務間調用的比例分布。

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 12

03 實踐效果

目前搜狐智能媒體已在 30+ 個服務中接入 Zipkin,涵蓋上百個線上服務執行個體,1% 的采樣率每天産生近 10億 多行的日志。

通過 Zipkin Server 查詢 StarRocks,擷取的 Trace 資訊如下圖所示:

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 13

通過 Zipkin Server 查詢 StarRocks,擷取的服務拓撲資訊如下圖所示:

詳解!搜狐智能媒體基于 Zipkin 和 StarRocks 的微服務鍊路追蹤實踐

圖 14

基于 Zipkin StarRocks 的鍊路追蹤體系實踐過程中,明顯提升了微服務監控分析能力和工程效率:

提升微服務監控分析能力

  • 在監控報警方面,可以基于 StarRocks 查詢統計線上服務目前時刻的響應延遲百分位數、錯誤率等名額,根據這些名額及時産生各類告警;
  • 在名額統計方面,可以基于 StarRocks 按天、小時、分鐘等粒度統計服務響應延遲的各項名額,更好的了解服務運作狀況;
  • 在故障分析方面,基于 StarRocks 強大的 SQL 計算能力,可以進行服務、時間、接口等多個次元的探索式分析查詢,定位故障原因。

提升微服務監控工程效率

Metric 和 Logging 資料采集,很多需要使用者手動埋點和安裝各種采集器 Agent,資料采集後存儲到 ElasticSearch 等存儲系統,每上一個業務,這些流程都要操作一遍,非常繁瑣,且資源分散不易管理。

而使用 Zipkin + StarRocks 的方式,隻需在代碼中引入對應庫 SDK,設定上報的 Kafka 位址和采樣率等少量配置資訊,Tracing 便可自動埋點采集,通過 zikpin server 界面進行查詢分析,非常簡便。

04 總結與展望

基于 Zipkin+StarRocks 建構鍊路追蹤系統,能夠提供微服務監控的 Monitoring 和 Observability 能力,提升微服務監控的分析能力和工程效率。

後續有幾個 優化點 ,可以進一步提升鍊路追蹤系統的分析能力和易用性:

  1. 使用 StarRocks 的 UDAF、視窗函數等功能,将 Parent ID 溯源下沉到 StarRocks計算,通過計算後置的方式,取消對 Flink 的依賴,進一步簡化整個系統架構。
  2. 目前對原始日志中的 tag s等字段,并沒有完全采集,StarRocks 正在實作 Json 資料類型,能夠更好的支援 tags 等嵌套資料類型。
  3. Zipkin Server 目前的界面還稍顯簡陋,我們已經打通了 Zipkin Server 查詢 StarRokcs,後續會對 Zipkin Server 進行 U I等優化,通過 StarRocks 強大的計算能力實作更多的名額查詢,進一步提升使用者體驗。

繼續閱讀