1. 概述
分布式鍊路追蹤系統,鍊路的追蹤大體流程如下:
- Agent 收集 Trace 資料。
- Agent 發送 Trace 資料給 Collector 。
- Collector 接收 Trace 資料。
- Collector 存儲 Trace 資料到存儲器,例如,資料庫。
本文主要分享【第四部分】 SkyWalking Collector 存儲 Trace 資料。
友情提示:Collector 接收到 TraceSegment 的資料,對應的類是 Protobuf 生成的。考慮到更加易讀易懂,本文使用 TraceSegment 相關的原始類。
Collector 在接收到 Trace 資料後,經過流式處理,最終存儲到存儲器。如下圖,紅圈部分,為本文分享的内容:
2. SpanListener
在 《SkyWalking 源碼分析 —— Collector 接收 Trace 資料》 一文中,我們看到
SegmentParse#parse(UpstreamSegment, Source)
方法中:
- 在
方法中,預建構的過程中,使用 Span 監聽器們,從 TraceSegment 解析出不同的資料。#preBuild(List<UniqueId>, SegmentDecorator)
- 在預建構成功後,通知 Span 監聽器們,去建構各自的資料,經過流式處理,最終存儲到存儲器。
org.skywalking.apm.collector.agent.stream.parser.SpanListener
,Span 監聽器接口。
- 定義了
方法,建構資料,執行流式處理,最終存儲到存儲器。#build()
SpanListener 的子類如下圖:
- 第一層,通用接口層,定義了從 TraceSegment 解析資料的方法。
- ① GlobalTraceSpanListener :解析鍊路追蹤全局編号數組(
)。TraceSegment.relatedGlobalTraces
- ② RefsListener :解析父 Segment 指向數組(
)。TraceSegment.refs
- ③ FirstSpanListener :解析第一個 Span (
) 。TraceSegment.spans[0]
- ③ EntrySpanListener :解析 EntrySpan (
)。TraceSegment.spans
- ③ LocalSpanListener :解析 LocalSpan (
)。TraceSegment.spans
- ③ ExitSpanListener :解析 ExitSpan (
)。TraceSegment.spans
- ① GlobalTraceSpanListener :解析鍊路追蹤全局編号數組(
- 第二層,業務實作層,每個實作類對應一個資料實體類,一個 Graph 對象。如下圖所示:
下面,我們以每個資料實體類為中心,逐個分享。
3. GlobalTrace
org.skywalking.apm.collector.storage.table.global.GlobalTrace
,全局鍊路追蹤,記錄一次分布式鍊路追蹤,包括的 TraceSegment 編号。
- GlobalTrace : TraceSegment = N : M ,一個 GlobalTrace 可以有多個 TraceSegment ,一個 TraceSegment 可以關聯多個 GlobalTrace 。參見 《SkyWalking 源碼分析 —— Agent 收集 Trace 資料》「2. Trace」 。
-
, GlobalTrace 表(org.skywalking.apm.collector.storage.table.global.GlobalTraceTable
)。字段如下:global_trace
-
:全局鍊路追蹤編号。global_trace_id
-
:TraceSegment 鍊路編号。segment_id
-
:時間。time_bucket
-
-
,GlobalTrace 的 EsDAO 。org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsPersistenceDAO
- 在 ES 存儲例子如下圖:
org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTraceSpanListener
,GlobalTrace 的 SpanListener ,實作了 FirstSpanListener 、GlobalTraceIdsListener 接口,代碼如下:
-
屬性,全局鍊路追蹤編号數組。globalTraceIds
-
屬性,TraceSegment 鍊路編号。segmentId
-
屬性,時間。timeBucket
-
方法,從 Span 中解析到#parseFirst(SpanDecorator, applicationId, instanceId, segmentId)
,segmentId
。timeBucket
-
方法,解析全局鍊路追蹤編号,添加到#parseGlobalTraceId(UniqueId)
數組。globalTraceIds
-
方法,建構,代碼如下:#build()
- 第 84 行:擷取 GlobalTrace 對應的
對象。Graph<GlobalTrace>
- 第 86 至 92 行:循環
數組,建立 GlobalTrace 對象,逐個調用globalTraceIds
方法,進行流式處理。在這過程中,會儲存 GlobalTrace 到存儲器。Graph#start(application)
- 第 84 行:擷取 GlobalTrace 對應的
在
TraceStreamGraph#createGlobalTraceGraph()
方法中,我們可以看到 GlobalTrace 對應的
Graph<GlobalTrace>
對象的建立。
-
,繼承 PersistenceWorker 抽象類,GlobalTrace 批量儲存 Worker 。org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTracePersistenceWorker
- Factory 内部類,實作 AbstractLocalAsyncWorkerProvider 抽象類,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「3.2.2 AbstractLocalAsyncWorker」 有詳細解析。
- PersistenceWorker ,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(二)》「4. PersistenceWorker」 有詳細解析。
-
實作方法,傳回 120 。#id()
-
實作方法,傳回#needMergeDBData()
,存儲時,不需要合并資料。GlobalTrace 隻有新增操作,沒有更新操作,是以無需合并資料。false
4. InstPerformance
旁白君:InstPerformance 和 GlobalTrace 整體比較相似,分享的會比較簡潔一些。
org.skywalking.apm.collector.storage.table.instance.InstPerformance
,應用執行個體性能,記錄應用執行個體每秒的請求總次數,請求總時長。
-
, GlobalTrace 表(org.skywalking.apm.collector.storage.table.instance.InstPerformanceTable
)。字段如下:global_trace
-
:應用編号。application_id
-
:應用執行個體編号。instance_id
-
:調用總次數。calls
-
:消耗總時長。cost_total
-
:時間。time_bucket
-
-
,InstPerformance 的 EsDAO 。org.skywalking.apm.collector.storage.es.dao.InstPerformanceEsPersistenceDAO
- 在 ES 存儲例子如下圖:
org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformanceSpanListener
,InstPerformance 的 SpanListener ,實作了 FirstSpanListener 、EntrySpanListener 接口。
在
TraceStreamGraph#createInstPerformanceGraph()
方法中,我們可以看到 InstPerformance 對應的
Graph<InstPerformance>
對象的建立。
-
,繼承 PersistenceWorker 抽象類,InstPerformance 批量儲存 Worker 。org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformancePersistenceWorker
- 類似 GlobalTracePersistenceWorker ,… 省略其它類和方法。
-
實作方法,傳回#needMergeDBData()
,存儲時,需要合并資料。true
、calls
需要累加合并。cost_total
5. SegmentCost
旁白君:SegmentCost 和 GlobalTrace 整體比較相似,分享的會比較簡潔一些。
org.skywalking.apm.collector.storage.table.segment.SegmentCost
,TraceSegment 消耗時長,記錄 TraceSegment 開始時間,結束時間,花費時長等等。
- SegmentCost : TraceSegment = 1 : 1 。
-
, SegmentCostTable 表(org.skywalking.apm.collector.storage.table.instance.SegmentCostTable
)。字段如下:segment_cost
-
:TraceSegment 編号。segment_id
-
:應用編号。application_id
-
:開始時間。start_time
-
:結束時間。end_time
-
:操作名。service_name
-
:消耗時長。cost
-
:時間(time_bucket
)。yyyyMMddHHmm
-
-
,SegmentCost 的 EsDAO 。org.skywalking.apm.collector.storage.es.dao.SegmentCostEsPersistenceDAO
- 在 ES 存儲例子如下圖:
org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener
,SegmentCost 的 SpanListener ,實作了 FirstSpanListener 、EntrySpanListener 、ExitSpanListener 、LocalSpanListener 接口。
在
TraceStreamGraph#createSegmentCostGraph()
方法中,我們可以看到 SegmentCost 對應的
Graph<SegmentCost>
對象的建立。
-
,繼承 PersistenceWorker 抽象類,InstPerformance 批量儲存 Worker 。org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostPersistenceWorker
- 類似 GlobalTracePersistenceWorker ,… 省略其它類和方法。
6. NodeComponent
org.skywalking.apm.collector.storage.table.node.NodeComponent
,節點元件。
-
, NodeComponentTable 表(org.skywalking.apm.collector.storage.table.node.NodeComponentTable
)。字段如下:node_component
-
:元件編号,參見 ComponentsDefine 的枚舉。component_id
-
:對等編号。每個元件,或是服務提供者,有服務位址;又或是服務消費者,有調用服務位址。這兩者都脫離不開服務位址。SkyWalking 将服務位址作為peer_id
,注冊到 Application 。是以,此處的applicationCode
實際上是,服務位址對應的應用編号。peer_id
-
:時間(time_bucket
)。yyyyMMddHHmm
-
-
,NodeComponent 的 EsDAO 。org.skywalking.apm.collector.storage.es.dao.NodeComponentEsPersistenceDAO
- 在 ES 存儲例子如下圖:
org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentSpanListener
,NodeComponent 的 SpanListener ,實作了 FirstSpanListener 、EntrySpanListener 、ExitSpanListener 接口,代碼如下:
-
屬性,節點元件數組,一次 TraceSegment 可以經過個節點元件,例如 SpringMVC => MongoDB 。nodeComponents
-
屬性,TraceSegment 鍊路編号。segmentId
-
屬性,時間(timeBucket
)。yyyyMMddHHmm
-
方法,從 EntrySpan 中解析到#parseEntry(SpanDecorator, applicationId, instanceId, segmentId)
,segmentId
,建立 NodeComponent 對象,添加到applicationId
。注意,EntrySpan 使用nodeComponents
作為applicationId
。peerId
-
方法,從 ExitSpan 中解析到#parseExit(SpanDecorator, applicationId, instanceId, segmentId)
,segmentId
,建立 NodeComponent 對象,添加到peerId
。注意,ExitSpan 使用nodeComponents
作為peerId
。peerId
-
方法,從首個 Span 中解析到#parseFirst(SpanDecorator, applicationId, instanceId, segmentId)
。timeBucket
-
方法,建構,代碼如下:#build()
- 第 84 行:擷取 NodeComponent 對應的
對象。Graph<NodeComponent>
- 第 86 至 92 行:循環
數組,逐個調用nodeComponents
方法,進行流式處理。在這過程中,會儲存 NodeComponent 到存儲器。Graph#start(nodeComponent)
- 第 84 行:擷取 NodeComponent 對應的
在
TraceStreamGraph#createNodeComponentGraph()
方法中,我們可以看到 NodeComponent 對應的
Graph<NodeComponent>
對象的建立。
-
,繼承 AggregationWorker 抽象類,NodeComponent 聚合 Worker 。org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentAggregationWorker
- NodeComponent 的編号生成規則為
,并且${timeBucket}_${componentId}_${peerId}
是分鐘級 ,可以使用 AggregationWorker 進行聚合,合并相同操作,減小 Collector 和 ES 的壓力。timeBucket
- Factory 内部類,實作 AbstractLocalAsyncWorkerProvider 抽象類,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「3.2.2 AbstractLocalAsyncWorker」 有詳細解析。
- AggregationWorker ,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(二)》「3. AggregationWorker」 有詳細解析。
-
實作方法,傳回 106 。#id()
- NodeComponent 的編号生成規則為
-
,繼承 AbstractRemoteWorker 抽象類,應用注冊遠端 Worker 。org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryRemoteWorker
- Factory 内部類,實作 AbstractRemoteWorkerProvider 抽象類,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「3.2.3 AbstractRemoteWorker」 有詳細解析。
- AbstractRemoteWorker ,在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「3.2.3 AbstractRemoteWorker」 有詳細解析。
-
實作方法,傳回 10002 。#id()
-
實作方法,傳回#selector
。将相同編号的 NodeComponent 發給同一個 Collector 節點,統一處理。在 《SkyWalking 源碼分析 —— Collector Remote 遠端通信服務》 有詳細解析。Selector.HashCode
-
,繼承 PersistenceWorker 抽象類,NodeComponent 批量儲存 Worker 。org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryPersistenceWorker
- 類似 GlobalTracePersistenceWorker ,… 省略其它類和方法。
-
實作方法,傳回#needMergeDBData()
,存儲時,需要合并資料。true
7. NodeMapping
org.skywalking.apm.collector.storage.table.node.NodeComponent
,節點比對,用于比對服務消費者與提供者。
-
, NodeMappingTable 表(org.skywalking.apm.collector.storage.table.node.NodeMappingTable
)。字段如下:node_mapping
-
:服務消費者應用編号。application_id
-
:服務提供者應用編号。address_id
-
:時間(time_bucket
)。yyyyMMddHHmm
-
-
,NodeMapping 的 EsDAO 。org.skywalking.apm.collector.storage.es.dao.NodeMappingEsPersistenceDAO
- 在 ES 存儲例子如下圖:
org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingSpanListener
,NodeMapping 的 SpanListener ,實作了 FirstSpanListener 、RefsListener 接口,代碼如下:
-
屬性,節點比對數組,一次 TraceSegment 可以經過個節點元件,例如調用多次遠端服務,或者資料庫。nodeMappings
-
屬性,時間(timeBucket
)。yyyyMMddHHmm
-
方法,從 TraceSegmentRef 中解析到#parseRef(SpanDecorator, applicationId, instanceId, segmentId)
,applicationId
,建立 NodeMapping 對象,添加到peerId
。nodeMappings
-
方法,從首個 Span 中解析到#parseFirst(SpanDecorator, applicationId, instanceId, segmentId)
。timeBucket
-
方法,建構,代碼如下:#build()
- 第 84 行:擷取 NodeMapping 對應的
對象。Graph<NodeMapping>
- 第 86 至 92 行:循環
數組,逐個調用nodeMappings
方法,進行流式處理。在這過程中,會儲存 NodeMapping 到存儲器。Graph#start(nodeMapping)
- 第 84 行:擷取 NodeMapping 對應的
在
TraceStreamGraph#createNodeMappingGraph()
方法中,我們可以看到 NodeMapping 對應的
Graph<NodeMapping>
對象的建立。
- 和 NodeComponent 的
基本一緻,胖友自己看下源碼。Graph<NodeComponent>
-
org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingAggregationWorker
-
org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingRemoteWorker
-
org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingPersistenceWorker
8. NodeReference
org.skywalking.apm.collector.storage.table.noderef.NodeReference
,節點調用統計,用于記錄服務消費者對服務提供者的調用,基于應用級别的,以分鐘為時間最小粒度的聚合統計。
-
, NodeReference 表(org.skywalking.apm.collector.storage.table.noderef.NodeReference
)。字段如下:node_reference
-
:服務消費者應用編号。front_application_id
-
:服務提供者應用編号。behind_application_id
-
:( 0, 1000 ms ] 的調用次數。s1_lte
-
:( 1000, 3000 ms ] 的調用次數。s3_lte
-
:( 3000, 5000ms ] 的調用次數s5_lte
-
:( 5000, +∞ ] 的調用次數。s5_gt
-
:發生異常的調用次數。error
-
:總共的調用次數。summary
-
:時間(time_bucket
)。yyyyMMddHHmm
-
-
,NodeReference 的 EsDAO 。org.skywalking.apm.collector.storage.es.dao.NodeReferenceEsPersistenceDAO
- 在 ES 存儲例子如下圖:
org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceSpanListener
,NodeReference 的 SpanListener ,實作了 EntrySpanListener 、ExitSpanListener 、RefsListener 接口,代碼如下:
-
屬性,父 TraceSegment 調用産生的 NodeReference 數組。references
-
屬性,NodeReference 數組,最終會包含nodeReferences
數組。references
-
屬性,時間(timeBucket
)。yyyyMMddHHmm
-
方法,代碼如下:#parseRef(SpanDecorator, applicationId, instanceId, segmentId)
- 第 106 至 109 行:使用父 TraceSegment 的應用編号作為服務消費者編号,自己的應用編号作為服務提供者應用編号,建立 NodeReference 對象。
- 第 111 行:将 NodeReference 對象,添加到
。注意,是references
,而不是references
。nodeReference
-
方法,代碼如下:#parseEntry(SpanDecorator, applicationId, instanceId, segmentId)
- 作為服務提供者,接受調用。
- ——- 父 TraceSegment 存在 ——–
- 第 79 至 85 行:
非空,說明被父 TraceSegment 調用。是以,循環references
數組,設定references
,id
屬性( 因為timeBucket
需要從 EntrySpan 中擷取,是以timeBucket
的目的,就是臨時存儲父 TraceSegment 的應用編号到#parseRef(...)
中 )。references
- 第 87 行:調用
方法,設定調用次數,然後添加到#buildserviceSum(...)
中。nodeReferences
- ——- 父 TraceSegment 不存在 ——–
- 第 91 至 97 行:使用
的應用編号( 特殊,代表 “使用者“ )作為服務消費者編号,自己的應用編号作為服務提供者應用編号,建立 NodeReference 對象。USER_ID
- 第 99 行:調用
方法,設定調用次數,然後添加到#buildserviceSum(...)
中。nodeReferences
-
方法,代碼如下:#parseExit(SpanDecorator, applicationId, instanceId, segmentId)
- 作為服務消費者,發起調用。
- 第 64 至 71 行:使用自己的應用編号作為服務消費者編号,
作為服務提供者應用編号,建立 NodeReference 對象。peerId
- 第 73 行:調用
方法,設定調用次數,然後添加到#buildserviceSum(...)
中。nodeReferences
-
方法,建構,代碼如下:#build()
- 第 84 行:擷取 NodeReference 對應的
對象。Graph<NodeReference>
- 第 86 至 92 行:循環
數組,逐個調用nodeReferences
方法,進行流式處理。在這過程中,會儲存 NodeReference 到存儲器。Graph#start(nodeReference)
- 第 84 行:擷取 NodeReference 對應的
在
TraceStreamGraph#createNodeReferenceGraph()
方法中,我們可以看到 NodeReference 對應的
Graph<NodeReference>
對象的建立。
- 和 NodeComponent 的
基本一緻,胖友自己看下源碼。Graph<NodeComponent>
-
org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceAggregationWorker
-
org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceRemoteWorker
-
org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferencePersistenceWorker
9. ServiceEntry
org.skywalking.apm.collector.storage.table.service.ServiceEntry
,入口操作。
- ServiceEntry 隻儲存分布式鍊路的入口操作,不同于 ServiceName 儲存所有操作,即 ServiceEntry 是 ServiceName 的子集。
- 注意,子 TraceSegment 的入口操作也不記錄。
-
, ServiceEntry 表(org.skywalking.apm.collector.storage.table.service.ServiceEntryTable
)。字段如下:service_entry
-
:應用編号。application_id
-
:入口操作編号。entry_service_id
-
:入口操作名。entry_service_name
-
:注冊時間。register_time
-
:最後調用時間。newest_time
-
-
,ServiceEntry 的 EsDAO 。org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsPersistenceDAO
- 在 ES 存儲例子如下圖:
org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener
,ServiceEntry 的 SpanListener ,實作了 EntrySpanListener 、FirstSpanListener 、RefsListener 接口,代碼如下:
-
屬性, 是否有 TraceSegmentRef 。hasReference
-
屬性,應用編号。applicationId
-
屬性,入口操作編号。entryServiceId
-
屬性,入口操作名。entryServiceName
-
屬性,是否有 EntrySpan 。hasEntry
-
屬性,時間(timeBucket
)。yyyyMMddHHmm
-
方法,是否有 TraceSegmentRef 。#parseRef(SpanDecorator, applicationId, instanceId, segmentId)
-
方法,從首個 Span 中解析到#parseFirst(SpanDecorator, applicationId, instanceId, segmentId)
。timeBucket
-
方法,從 EntrySpan 中解析到#parseEntry(SpanDecorator, applicationId, instanceId, segmentId)
、applicationId
、entryServiceId
、entryServiceName
。hasEntry
-
方法,建構,代碼如下:#build()
- 第 96 行:隻儲存分布式鍊路的入口操作。
- 第 98 至 103 行:建立 ServiceEntry 對象。
- 第 107 行:擷取 ServiceEntry 對應的
對象。Graph<ServiceEntry>
- 第 108 行:調用
方法,進行流式處理。在這過程中,會儲存 ServiceEntry 到存儲器。Graph#start(serviceEntry)
在
TraceStreamGraph#createServiceEntryGraph()
方法中,我們可以看到 ServiceEntry 對應的
Graph<ServiceEntry>
對象的建立。
- 和 NodeComponent 的
基本一緻,胖友自己看下源碼。Graph<NodeComponent>
-
org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryAggregationWorker
-
org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryRemoteWorker
-
org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryPersistenceWorker
10. ServiceReference
org.skywalking.apm.collector.storage.table.serviceref.ServiceReference
,入口操作調用統計,用于記錄入口操作的調用,基于入口操作級别的,以分鐘為時間最小粒度的聚合統計。
- 和 NodeReference 類似。
- 注意,此處的 “入口操作“ 不同于 ServiceEntry ,包含每一條 TraceSegment 的入口操作。
-
, ServiceReference 表(org.skywalking.apm.collector.storage.table.serviceref.ServiceReferenceTable
)。字段如下:service_reference
-
:入口操作編号。entry_service_id
-
:服務消費者操作編号。front_service_id
-
:服務提供者操作編号。behind_service_id
-
:( 0, 1000 ms ] 的調用次數。s1_lte
-
:( 1000, 3000 ms ] 的調用次數。s3_lte
-
:( 3000, 5000ms ] 的調用次數s5_lte
-
:( 5000, +∞ ] 的調用次數。s5_gt
-
:發生異常的調用次數。error
-
:總共的調用次數。summary
-
:總共的花費時間。cost_summary
-
:時間(time_bucket
)。yyyyMMddHHmm
-
-
,ServiceReference 的 EsDAO 。org.skywalking.apm.collector.storage.es.dao.ServiceReference
- 在 ES 存儲例子如下圖:
org.skywalking.apm.collector.agent.stream.worker.trace.segment.ServiceReferenceSpanListener
,ServiceReference 的 SpanListener ,實作了 EntrySpanListener 、FirstSpanListener 、RefsListener 接口,代碼如下:
-
屬性,ReferenceDecorator 數組,記錄 TraceSegmentRef 數組。referenceServices
-
屬性,入口操作編号。serviceId
-
屬性,開始時間。startTime
-
屬性,結束時間。endTime
-
屬性,是否有錯誤。isError
-
屬性,是否有 SpanEntry 。hasEntry
-
屬性,時間(timeBucket
)。yyyyMMddHHmm
-
方法,将 TraceSegmentRef 添加到#parseRef(SpanDecorator, applicationId, instanceId, segmentId)
。referenceServices
-
方法,從首個 Span 中解析到#parseFirst(SpanDecorator, applicationId, instanceId, segmentId)
。timeBucket
-
方法,從 EntrySpan 中解析#parseEntry(SpanDecorator, applicationId, instanceId, segmentId)
、serviceId
、startTime
、endTime
、isError
。hasEntry
-
方法,建構,代碼如下:#build()
- 第 114 行:判斷
,存在 EntrySpan 。hasEntry = true
- ——— 有 TraceSegmentRef ———
- 第 117 至 120 行:建立 ServiceReference 對象,其中:
-
:TraceSegmentRef 的入口編号。entryServiceId
-
:TraceSegmentRef 的操作編号。frontServiceId
-
: 自己 EntrySpan 的操作編号。behindServiceId
-
- 第 121 行:調用
方法,設定調用次數。#calculateCost(...)
- 第 126 行:調用
方法,發送 ServiceReference 給 AggregationWorker ,執行流式處理。#sendToAggregationWorker(...)
- ——— 無 TraceSegmentRef ———
- 第 117 至 120 行:建立 ServiceReference 對象,其中:
-
:自己 EntrySpan 的操作編号。entryServiceId
-
:frontServiceId
對應的操作編号( 系統内置,代表【空】 )。Const.NONE_SERVICE_ID
-
: 自己 EntrySpan 的操作編号。behindServiceId
-
- 第 121 行:調用
方法,設定調用次數。#calculateCost(...)
- 第 126 行:調用
方法,發送 ServiceReference 給 AggregationWorker ,執行流式處理。#sendToAggregationWorker(...)
- 第 114 行:判斷
在
TraceStreamGraph#createServiceReferenceGraph()
方法中,我們可以看到 ServiceReference 對應的
Graph<ServiceReference>
對象的建立。
- 和 NodeComponent 的
基本一緻,胖友自己看下源碼。Graph<NodeComponent>
-
org.skywalking.apm.collector.agent.stream.worker.trace.noderef.ServiceEntryAggregationWorker
-
org.skywalking.apm.collector.agent.stream.worker.trace.noderef.ServiceEntryRemoteWorker
-
org.skywalking.apm.collector.agent.stream.worker.trace.noderef.ServiceEntryPersistenceWorker
11. Segment
不同于上述所有資料實體,Segment 無需解析,直接使用 TraceSegment 建構,參見如下方法:
-
SegmentParse#parse(UpstreamSegment, Source)
-
SegmentParse#buildSegment(id, dataBinary)
org.skywalking.apm.collector.storage.table.segment.Segment
,全局鍊路追蹤,記錄一次分布式鍊路追蹤,包括的 TraceSegment 編号。
-
, Segment 表(org.skywalking.apm.collector.storage.table.global.GlobalTraceTable
)。字段如下:segment
-
:TraceSegment 編号。_id
-
:TraceSegment 鍊路編号。data_binary
-
:時間(time_bucket
)。yyyyMMddHHmm
-
-
,GlobalTrace 的 EsDAO 。org.skywalking.apm.collector.storage.es.dao.SegmentEsPersistenceDAO
- 在 ES 存儲例子如下圖:
在
TraceStreamGraph#createSegmentGraph()
方法中,我們可以看到 Segment 對應的
Graph<Segment>
對象的建立。
-
org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentPersistenceWorker