天天看點

SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

1. 概述

分布式鍊路追蹤系統,鍊路的追蹤大體流程如下:

  1. Agent 收集 Trace 資料。
  2. Agent 發送 Trace 資料給 Collector 。
  3. Collector 接收 Trace 資料。
  4. Collector 存儲 Trace 資料到存儲器,例如,資料庫。

本文主要分享【第四部分】 SkyWalking Collector 存儲 Trace 資料。

友情提示:Collector 接收到 TraceSegment 的資料,對應的類是 Protobuf 生成的。考慮到更加易讀易懂,本文使用 TraceSegment 相關的原始類。

Collector 在接收到 Trace 資料後,經過流式處理,最終存儲到存儲器。如下圖,紅圈部分,為本文分享的内容:

SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

2. SpanListener

在 《SkyWalking 源碼分析 —— Collector 接收 Trace 資料》 一文中,我們看到 

SegmentParse#parse(UpstreamSegment, Source)

 方法中:

  • 在 

    #preBuild(List<UniqueId>, SegmentDecorator)

     方法中,預建構的過程中,使用 Span 監聽器們,從 TraceSegment 解析出不同的資料。
  • 在預建構成功後,通知 Span 監聽器們,去建構各自的資料,經過流式處理,最終存儲到存儲器。

org.skywalking.apm.collector.agent.stream.parser.SpanListener

 ,Span 監聽器接口。

  • 定義了 

    #build()

     方法,建構資料,執行流式處理,最終存儲到存儲器。

SpanListener 的子類如下圖:

SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment
  • 第一層,通用接口層,定義了從 TraceSegment 解析資料的方法。
    • ① GlobalTraceSpanListener :解析鍊路追蹤全局編号數組( 

      TraceSegment.relatedGlobalTraces

       )。
    • ② RefsListener :解析父 Segment 指向數組( 

      TraceSegment.refs

       )。
    • ③ FirstSpanListener :解析第一個 Span (

      TraceSegment.spans[0]

      ) 。
    • ③ EntrySpanListener :解析 EntrySpan (

      TraceSegment.spans

      )。
    • ③ LocalSpanListener :解析 LocalSpan (

      TraceSegment.spans

      )。
    • ③ ExitSpanListener :解析 ExitSpan (

      TraceSegment.spans

      )。
  • 第二層,業務實作層,每個實作類對應一個資料實體類,一個 Graph 對象。如下圖所示:
    SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

下面,我們以每個資料實體類為中心,逐個分享。

3. GlobalTrace

org.skywalking.apm.collector.storage.table.global.GlobalTrace

 ,全局鍊路追蹤,記錄一次分布式鍊路追蹤,包括的 TraceSegment 編号。

  • GlobalTrace : TraceSegment = N : M ,一個 GlobalTrace 可以有多個 TraceSegment ,一個 TraceSegment 可以關聯多個 GlobalTrace 。參見 《SkyWalking 源碼分析 —— Agent 收集 Trace 資料》「2. Trace」 。
  • org.skywalking.apm.collector.storage.table.global.GlobalTraceTable

     , GlobalTrace 表( 

    global_trace

     )。字段如下:
    • global_trace_id

       :全局鍊路追蹤編号。
    • segment_id

       :TraceSegment 鍊路編号。
    • time_bucket

       :時間。
  • org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsPersistenceDAO

     ,GlobalTrace 的 EsDAO 。
  • 在 ES 存儲例子如下圖:
    SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTraceSpanListener

 ,GlobalTrace 的 SpanListener ,實作了 FirstSpanListener 、GlobalTraceIdsListener 接口,代碼如下:

  • globalTraceIds

     屬性,全局鍊路追蹤編号數組。
  • segmentId

     屬性,TraceSegment 鍊路編号。
  • timeBucket

     屬性,時間。
  • #parseFirst(SpanDecorator, applicationId, instanceId, segmentId)

     方法,從 Span 中解析到 

    segmentId

     ,

    timeBucket

     。
  • #parseGlobalTraceId(UniqueId)

     方法,解析全局鍊路追蹤編号,添加到 

    globalTraceIds

     數組。
  • #build()

     方法,建構,代碼如下:
    • 第 84 行:擷取 GlobalTrace 對應的 

      Graph<GlobalTrace>

       對象。
    • 第 86 至 92 行:循環 

      globalTraceIds

       數組,建立 GlobalTrace 對象,逐個調用 

      Graph#start(application)

       方法,進行流式處理。在這過程中,會儲存 GlobalTrace 到存儲器。

在 

TraceStreamGraph#createGlobalTraceGraph()

 方法中,我們可以看到 GlobalTrace 對應的 

Graph<GlobalTrace>

 對象的建立。

  • org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTracePersistenceWorker

     ,繼承 PersistenceWorker 抽象類,GlobalTrace 批量儲存 Worker 。
    • Factory 内部類,實作 AbstractLocalAsyncWorkerProvider 抽象類,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「3.2.2 AbstractLocalAsyncWorker」 有詳細解析。
    • PersistenceWorker ,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(二)》「4. PersistenceWorker」 有詳細解析。
    • #id()

       實作方法,傳回 120 。
    • #needMergeDBData()

       實作方法,傳回 

      false

       ,存儲時,不需要合并資料。GlobalTrace 隻有新增操作,沒有更新操作,是以無需合并資料。

4. InstPerformance

旁白君:InstPerformance 和 GlobalTrace 整體比較相似,分享的會比較簡潔一些。

org.skywalking.apm.collector.storage.table.instance.InstPerformance

 ,應用執行個體性能,記錄應用執行個體每秒的請求總次數,請求總時長。

  • org.skywalking.apm.collector.storage.table.instance.InstPerformanceTable

     , GlobalTrace 表( 

    global_trace

     )。字段如下:
    • application_id

       :應用編号。
    • instance_id

       :應用執行個體編号。
    • calls

       :調用總次數。
    • cost_total

       :消耗總時長。
    • time_bucket

       :時間。
  • org.skywalking.apm.collector.storage.es.dao.InstPerformanceEsPersistenceDAO

     ,InstPerformance 的 EsDAO 。
  • 在 ES 存儲例子如下圖:
    SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformanceSpanListener

 ,InstPerformance 的 SpanListener ,實作了 FirstSpanListener 、EntrySpanListener 接口。

在 

TraceStreamGraph#createInstPerformanceGraph()

 方法中,我們可以看到 InstPerformance 對應的 

Graph<InstPerformance>

 對象的建立。

  • org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformancePersistenceWorker

     ,繼承 PersistenceWorker 抽象類,InstPerformance 批量儲存 Worker 。
    • 類似 GlobalTracePersistenceWorker ,… 省略其它類和方法。
    • #needMergeDBData()

       實作方法,傳回 

      true

       ,存儲時,需要合并資料。

      calls

       、

      cost_total

       需要累加合并。

5. SegmentCost

旁白君:SegmentCost 和 GlobalTrace 整體比較相似,分享的會比較簡潔一些。

org.skywalking.apm.collector.storage.table.segment.SegmentCost

 ,TraceSegment 消耗時長,記錄 TraceSegment 開始時間,結束時間,花費時長等等。

  • SegmentCost : TraceSegment = 1 : 1 。
  • org.skywalking.apm.collector.storage.table.instance.SegmentCostTable

     , SegmentCostTable 表( 

    segment_cost

     )。字段如下:
    • segment_id

       :TraceSegment 編号。
    • application_id

       :應用編号。
    • start_time

       :開始時間。
    • end_time

       :結束時間。
    • service_name

       :操作名。
    • cost

       :消耗時長。
    • time_bucket

       :時間( 

      yyyyMMddHHmm

       )。
  • org.skywalking.apm.collector.storage.es.dao.SegmentCostEsPersistenceDAO

     ,SegmentCost 的 EsDAO 。
  • 在 ES 存儲例子如下圖:
    SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener

 ,SegmentCost 的 SpanListener ,實作了 FirstSpanListener 、EntrySpanListener 、ExitSpanListener 、LocalSpanListener 接口。

在 

TraceStreamGraph#createSegmentCostGraph()

 方法中,我們可以看到 SegmentCost 對應的 

Graph<SegmentCost>

 對象的建立。

  • org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostPersistenceWorker

     ,繼承 PersistenceWorker 抽象類,InstPerformance 批量儲存 Worker 。
    • 類似 GlobalTracePersistenceWorker ,… 省略其它類和方法。

6. NodeComponent

org.skywalking.apm.collector.storage.table.node.NodeComponent

 ,節點元件。

  • org.skywalking.apm.collector.storage.table.node.NodeComponentTable

     , NodeComponentTable 表( 

    node_component

     )。字段如下:
    • component_id

       :元件編号,參見 ComponentsDefine 的枚舉。
    • peer_id

       :對等編号。每個元件,或是服務提供者,有服務位址;又或是服務消費者,有調用服務位址。這兩者都脫離不開服務位址。SkyWalking 将服務位址作為 

      applicationCode

       ,注冊到 Application 。是以,此處的 

      peer_id

       實際上是,服務位址對應的應用編号。
    • time_bucket

       :時間( 

      yyyyMMddHHmm

       )。
  • org.skywalking.apm.collector.storage.es.dao.NodeComponentEsPersistenceDAO

     ,NodeComponent 的 EsDAO 。
  • 在 ES 存儲例子如下圖:
    SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentSpanListener

 ,NodeComponent 的 SpanListener ,實作了 FirstSpanListener 、EntrySpanListener 、ExitSpanListener 接口,代碼如下:

  • nodeComponents

     屬性,節點元件數組,一次 TraceSegment 可以經過個節點元件,例如 SpringMVC => MongoDB 。
  • segmentId

     屬性,TraceSegment 鍊路編号。
  • timeBucket

     屬性,時間( 

    yyyyMMddHHmm

     )。
  • #parseEntry(SpanDecorator, applicationId, instanceId, segmentId)

     方法,從 EntrySpan 中解析到 

    segmentId

     ,

    applicationId

     ,建立 NodeComponent 對象,添加到 

    nodeComponents

     。注意,EntrySpan 使用 

    applicationId

     作為 

    peerId

     。
  • #parseExit(SpanDecorator, applicationId, instanceId, segmentId)

     方法,從 ExitSpan 中解析到 

    segmentId

     ,

    peerId

     ,建立 NodeComponent 對象,添加到 

    nodeComponents

     。注意,ExitSpan 使用 

    peerId

     作為 

    peerId

     。
  • #parseFirst(SpanDecorator, applicationId, instanceId, segmentId)

     方法,從首個 Span 中解析到 

    timeBucket

     。
  • #build()

     方法,建構,代碼如下:
    • 第 84 行:擷取 NodeComponent 對應的 

      Graph<NodeComponent>

       對象。
    • 第 86 至 92 行:循環 

      nodeComponents

       數組,逐個調用 

      Graph#start(nodeComponent)

       方法,進行流式處理。在這過程中,會儲存 NodeComponent 到存儲器。

在 

TraceStreamGraph#createNodeComponentGraph()

 方法中,我們可以看到 NodeComponent 對應的 

Graph<NodeComponent>

 對象的建立。

  • org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentAggregationWorker

     ,繼承 AggregationWorker 抽象類,NodeComponent 聚合 Worker 。
    • NodeComponent 的編号生成規則為 

      ${timeBucket}_${componentId}_${peerId}

       ,并且 

      timeBucket

       是分鐘級 ,可以使用 AggregationWorker 進行聚合,合并相同操作,減小 Collector 和 ES 的壓力。
    • Factory 内部類,實作 AbstractLocalAsyncWorkerProvider 抽象類,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「3.2.2 AbstractLocalAsyncWorker」 有詳細解析。
    • AggregationWorker ,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(二)》「3. AggregationWorker」 有詳細解析。
    • #id()

       實作方法,傳回 106 。
  • org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryRemoteWorker

     ,繼承 AbstractRemoteWorker 抽象類,應用注冊遠端 Worker 。
    • Factory 内部類,實作 AbstractRemoteWorkerProvider 抽象類,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「3.2.3 AbstractRemoteWorker」 有詳細解析。
    • AbstractRemoteWorker ,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「3.2.3 AbstractRemoteWorker」 有詳細解析。
    • #id()

       實作方法,傳回 10002 。
    • #selector

       實作方法,傳回 

      Selector.HashCode

       。将相同編号的 NodeComponent 發給同一個 Collector 節點,統一處理。在 《SkyWalking 源碼分析 —— Collector Remote 遠端通信服務》 有詳細解析。
  • org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryPersistenceWorker

     ,繼承 PersistenceWorker 抽象類,NodeComponent 批量儲存 Worker 。
    • 類似 GlobalTracePersistenceWorker ,… 省略其它類和方法。
    • #needMergeDBData()

       實作方法,傳回 

      true

       ,存儲時,需要合并資料。

7. NodeMapping

org.skywalking.apm.collector.storage.table.node.NodeComponent

 ,節點比對,用于比對服務消費者與提供者。

  • org.skywalking.apm.collector.storage.table.node.NodeMappingTable

     , NodeMappingTable 表( 

    node_mapping

     )。字段如下:
    • application_id

       :服務消費者應用編号。
    • address_id

       :服務提供者應用編号。
    • time_bucket

       :時間( 

      yyyyMMddHHmm

       )。
  • org.skywalking.apm.collector.storage.es.dao.NodeMappingEsPersistenceDAO

     ,NodeMapping 的 EsDAO 。
  • 在 ES 存儲例子如下圖:
    SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingSpanListener

 ,NodeMapping 的 SpanListener ,實作了 FirstSpanListener 、RefsListener 接口,代碼如下:

  • nodeMappings

     屬性,節點比對數組,一次 TraceSegment 可以經過個節點元件,例如調用多次遠端服務,或者資料庫。
  • timeBucket

     屬性,時間( 

    yyyyMMddHHmm

     )。
  • #parseRef(SpanDecorator, applicationId, instanceId, segmentId)

     方法,從 TraceSegmentRef 中解析到 

    applicationId

     ,

    peerId

     ,建立 NodeMapping 對象,添加到 

    nodeMappings

     。
  • #parseFirst(SpanDecorator, applicationId, instanceId, segmentId)

     方法,從首個 Span 中解析到

    timeBucket

     。
  • #build()

     方法,建構,代碼如下:
    • 第 84 行:擷取 NodeMapping 對應的 

      Graph<NodeMapping>

       對象。
    • 第 86 至 92 行:循環 

      nodeMappings

       數組,逐個調用 

      Graph#start(nodeMapping)

       方法,進行流式處理。在這過程中,會儲存 NodeMapping 到存儲器。

在 

TraceStreamGraph#createNodeMappingGraph()

 方法中,我們可以看到 NodeMapping 對應的 

Graph<NodeMapping>

 對象的建立。

  • 和 NodeComponent 的 

    Graph<NodeComponent>

     基本一緻,胖友自己看下源碼。
  • org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingAggregationWorker

  • org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingRemoteWorker

  • org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingPersistenceWorker

8. NodeReference

org.skywalking.apm.collector.storage.table.noderef.NodeReference

 ,節點調用統計,用于記錄服務消費者對服務提供者的調用,基于應用級别的,以分鐘為時間最小粒度的聚合統計。

  • org.skywalking.apm.collector.storage.table.noderef.NodeReference

     , NodeReference 表( 

    node_reference

     )。字段如下:
    • front_application_id

       :服務消費者應用編号。
    • behind_application_id

       :服務提供者應用編号。
    • s1_lte

       :( 0, 1000 ms ] 的調用次數。
    • s3_lte

       :( 1000, 3000 ms ] 的調用次數。
    • s5_lte

       :( 3000, 5000ms ] 的調用次數
    • s5_gt

       :( 5000, +∞ ] 的調用次數。
    • error

       :發生異常的調用次數。
    • summary

       :總共的調用次數。
    • time_bucket

       :時間( 

      yyyyMMddHHmm

       )。
  • org.skywalking.apm.collector.storage.es.dao.NodeReferenceEsPersistenceDAO

     ,NodeReference 的 EsDAO 。
  • 在 ES 存儲例子如下圖:
    SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceSpanListener

 ,NodeReference 的 SpanListener ,實作了 EntrySpanListener 、ExitSpanListener 、RefsListener 接口,代碼如下:

  • references

     屬性,父 TraceSegment 調用産生的 NodeReference 數組。
  • nodeReferences

     屬性,NodeReference 數組,最終會包含 

    references

     數組。
  • timeBucket

     屬性,時間( 

    yyyyMMddHHmm

     )。
  • #parseRef(SpanDecorator, applicationId, instanceId, segmentId)

     方法,代碼如下:
    • 第 106 至 109 行:使用父 TraceSegment 的應用編号作為服務消費者編号,自己的應用編号作為服務提供者應用編号,建立 NodeReference 對象。
    • 第 111 行:将 NodeReference 對象,添加到 

      references

       。注意,是 

      references

       ,而不是 

      nodeReference

       。
  • #parseEntry(SpanDecorator, applicationId, instanceId, segmentId)

     方法,代碼如下:
    • 作為服務提供者,接受調用。
    • ——- 父 TraceSegment 存在 ——–
    • 第 79 至 85 行:

      references

       非空,說明被父 TraceSegment 調用。是以,循環 

      references

       數組,設定 

      id

       ,

      timeBucket

       屬性( 因為 

      timeBucket

       需要從 EntrySpan 中擷取,是以 

      #parseRef(...)

       的目的,就是臨時存儲父 TraceSegment 的應用編号到 

      references

       中 )。
    • 第 87 行:調用 

      #buildserviceSum(...)

       方法,設定調用次數,然後添加到 

      nodeReferences

       中。
    • ——- 父 TraceSegment 不存在 ——–
    • 第 91 至 97 行:使用 

      USER_ID

       的應用編号( 特殊,代表 “使用者“ )作為服務消費者編号,自己的應用編号作為服務提供者應用編号,建立 NodeReference 對象。
    • 第 99 行:調用 

      #buildserviceSum(...)

       方法,設定調用次數,然後添加到 

      nodeReferences

       中。
  • #parseExit(SpanDecorator, applicationId, instanceId, segmentId)

     方法,代碼如下:
    • 作為服務消費者,發起調用。
    • 第 64 至 71 行:使用自己的應用編号作為服務消費者編号,

      peerId

       作為服務提供者應用編号,建立 NodeReference 對象。
    • 第 73 行:調用 

      #buildserviceSum(...)

       方法,設定調用次數,然後添加到 

      nodeReferences

       中。
  • #build()

     方法,建構,代碼如下:
    • 第 84 行:擷取 NodeReference 對應的 

      Graph<NodeReference>

       對象。
    • 第 86 至 92 行:循環 

      nodeReferences

       數組,逐個調用 

      Graph#start(nodeReference)

       方法,進行流式處理。在這過程中,會儲存 NodeReference 到存儲器。

在 

TraceStreamGraph#createNodeReferenceGraph()

 方法中,我們可以看到 NodeReference 對應的 

Graph<NodeReference>

 對象的建立。

  • 和 NodeComponent 的 

    Graph<NodeComponent>

     基本一緻,胖友自己看下源碼。
  • org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceAggregationWorker

  • org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceRemoteWorker

  • org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferencePersistenceWorker

9. ServiceEntry

org.skywalking.apm.collector.storage.table.service.ServiceEntry

 ,入口操作。

  • ServiceEntry 隻儲存分布式鍊路的入口操作,不同于 ServiceName 儲存所有操作,即 ServiceEntry 是 ServiceName 的子集。
    • 注意,子 TraceSegment 的入口操作也不記錄。
  • org.skywalking.apm.collector.storage.table.service.ServiceEntryTable

     , ServiceEntry 表( 

    service_entry

     )。字段如下:
    • application_id

       :應用編号。
    • entry_service_id

       :入口操作編号。
    • entry_service_name

       :入口操作名。
    • register_time

       :注冊時間。
    • newest_time

       :最後調用時間。
  • org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsPersistenceDAO

     ,ServiceEntry 的 EsDAO 。
  • 在 ES 存儲例子如下圖:
    SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener

 ,ServiceEntry 的 SpanListener ,實作了 EntrySpanListener 、FirstSpanListener 、RefsListener 接口,代碼如下:

  • hasReference

     屬性, 是否有 TraceSegmentRef 。
  • applicationId

     屬性,應用編号。
  • entryServiceId

     屬性,入口操作編号。
  • entryServiceName

     屬性,入口操作名。
  • hasEntry

     屬性,是否有 EntrySpan 。
  • timeBucket

     屬性,時間( 

    yyyyMMddHHmm

     )。
  • #parseRef(SpanDecorator, applicationId, instanceId, segmentId)

     方法,是否有 TraceSegmentRef 。
  • #parseFirst(SpanDecorator, applicationId, instanceId, segmentId)

     方法,從首個 Span 中解析到 

    timeBucket

     。
  • #parseEntry(SpanDecorator, applicationId, instanceId, segmentId)

     方法,從 EntrySpan 中解析到 

    applicationId

     、

    entryServiceId

     、

    entryServiceName

     、

    hasEntry

     。
  • #build()

     方法,建構,代碼如下:
    • 第 96 行:隻儲存分布式鍊路的入口操作。
    • 第 98 至 103 行:建立 ServiceEntry 對象。
    • 第 107 行:擷取 ServiceEntry 對應的 

      Graph<ServiceEntry>

       對象。
    • 第 108 行:調用 

      Graph#start(serviceEntry)

       方法,進行流式處理。在這過程中,會儲存 ServiceEntry 到存儲器。

在 

TraceStreamGraph#createServiceEntryGraph()

 方法中,我們可以看到 ServiceEntry 對應的 

Graph<ServiceEntry>

 對象的建立。

  • 和 NodeComponent 的 

    Graph<NodeComponent>

     基本一緻,胖友自己看下源碼。
  • org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryAggregationWorker

  • org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryRemoteWorker

  • org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryPersistenceWorker

10. ServiceReference

org.skywalking.apm.collector.storage.table.serviceref.ServiceReference

 ,入口操作調用統計,用于記錄入口操作的調用,基于入口操作級别的,以分鐘為時間最小粒度的聚合統計。

  • 和 NodeReference 類似。
  • 注意,此處的 “入口操作“ 不同于 ServiceEntry ,包含每一條 TraceSegment 的入口操作。
  • org.skywalking.apm.collector.storage.table.serviceref.ServiceReferenceTable

     , ServiceReference 表( 

    service_reference

     )。字段如下:
    • entry_service_id

       :入口操作編号。
    • front_service_id

       :服務消費者操作編号。
    • behind_service_id

       :服務提供者操作編号。
    • s1_lte

       :( 0, 1000 ms ] 的調用次數。
    • s3_lte

       :( 1000, 3000 ms ] 的調用次數。
    • s5_lte

       :( 3000, 5000ms ] 的調用次數
    • s5_gt

       :( 5000, +∞ ] 的調用次數。
    • error

       :發生異常的調用次數。
    • summary

       :總共的調用次數。
    • cost_summary

       :總共的花費時間。
    • time_bucket

       :時間( 

      yyyyMMddHHmm

       )。
  • org.skywalking.apm.collector.storage.es.dao.ServiceReference

     ,ServiceReference 的 EsDAO 。
  • 在 ES 存儲例子如下圖:
    SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

org.skywalking.apm.collector.agent.stream.worker.trace.segment.ServiceReferenceSpanListener

 ,ServiceReference 的 SpanListener ,實作了 EntrySpanListener 、FirstSpanListener 、RefsListener 接口,代碼如下:

  • referenceServices

     屬性,ReferenceDecorator 數組,記錄 TraceSegmentRef 數組。
  • serviceId

     屬性,入口操作編号。
  • startTime

     屬性,開始時間。
  • endTime

     屬性,結束時間。
  • isError

     屬性,是否有錯誤。
  • hasEntry

     屬性,是否有 SpanEntry 。
  • timeBucket

     屬性,時間( 

    yyyyMMddHHmm

     )。
  • #parseRef(SpanDecorator, applicationId, instanceId, segmentId)

     方法,将 TraceSegmentRef 添加到 

    referenceServices

     。
  • #parseFirst(SpanDecorator, applicationId, instanceId, segmentId)

     方法,從首個 Span 中解析到 

    timeBucket

     。
  • #parseEntry(SpanDecorator, applicationId, instanceId, segmentId)

     方法,從 EntrySpan 中解析 

    serviceId

     、

    startTime

     、

    endTime

     、

    isError

     、

    hasEntry

     。
  • #build()

     方法,建構,代碼如下:
    • 第 114 行:判斷 

      hasEntry = true

       ,存在 EntrySpan 。
    • ——— 有 TraceSegmentRef ———
    • 第 117 至 120 行:建立 ServiceReference 對象,其中:
      • entryServiceId

         :TraceSegmentRef 的入口編号。
      • frontServiceId

         :TraceSegmentRef 的操作編号。
      • behindServiceId

         : 自己 EntrySpan 的操作編号。
    • 第 121 行:調用 

      #calculateCost(...)

       方法,設定調用次數。
    • 第 126 行:調用 

      #sendToAggregationWorker(...)

       方法,發送 ServiceReference 給 AggregationWorker ,執行流式處理。
    • ——— 無 TraceSegmentRef ———
    • 第 117 至 120 行:建立 ServiceReference 對象,其中:
      • entryServiceId

         :自己 EntrySpan 的操作編号。
      • frontServiceId

         :

        Const.NONE_SERVICE_ID

         對應的操作編号( 系統内置,代表【空】 )。
      • behindServiceId

         : 自己 EntrySpan 的操作編号。
    • 第 121 行:調用 

      #calculateCost(...)

       方法,設定調用次數。
    • 第 126 行:調用 

      #sendToAggregationWorker(...)

       方法,發送 ServiceReference 給 AggregationWorker ,執行流式處理。

在 

TraceStreamGraph#createServiceReferenceGraph()

 方法中,我們可以看到 ServiceReference 對應的 

Graph<ServiceReference>

 對象的建立。

  • 和 NodeComponent 的 

    Graph<NodeComponent>

     基本一緻,胖友自己看下源碼。
  • org.skywalking.apm.collector.agent.stream.worker.trace.noderef.ServiceEntryAggregationWorker

  • org.skywalking.apm.collector.agent.stream.worker.trace.noderef.ServiceEntryRemoteWorker

  • org.skywalking.apm.collector.agent.stream.worker.trace.noderef.ServiceEntryPersistenceWorker

11. Segment

不同于上述所有資料實體,Segment 無需解析,直接使用 TraceSegment 建構,參見如下方法:

  • SegmentParse#parse(UpstreamSegment, Source)

  • SegmentParse#buildSegment(id, dataBinary)

org.skywalking.apm.collector.storage.table.segment.Segment

 ,全局鍊路追蹤,記錄一次分布式鍊路追蹤,包括的 TraceSegment 編号。

  • org.skywalking.apm.collector.storage.table.global.GlobalTraceTable

     , Segment 表( 

    segment

     )。字段如下:
    • _id

       :TraceSegment 編号。
    • data_binary

       :TraceSegment 鍊路編号。
    • time_bucket

       :時間( 

      yyyyMMddHHmm

       )。
  • org.skywalking.apm.collector.storage.es.dao.SegmentEsPersistenceDAO

     ,GlobalTrace 的 EsDAO 。
  • 在 ES 存儲例子如下圖:
    SkyWalking 源碼分析 —— Collector 存儲 Trace 資料1. 概述2. SpanListener3. GlobalTrace4. InstPerformance5. SegmentCost6. NodeComponent7. NodeMapping8. NodeReference9. ServiceEntry10. ServiceReference11. Segment

在 

TraceStreamGraph#createSegmentGraph()

 方法中,我們可以看到 Segment 對應的 

Graph<Segment>

 對象的建立。

  • org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentPersistenceWorker

繼續閱讀