1. 概述
本文主要分享 SkyWalking Collector Remote 遠端通信服務。該服務用于 Collector 叢集内部通信。
目前叢集内部通信的目的,跨節點的流式處理。Remote Module 應用在 SkyWalking 架構圖如下位置( 紅框 ) :
FROM https://github.com/apache/incubating-skywalking
下面我們來看看整體的項目結構,如下圖所示 :
-
:定義遠端通信接口。collector-remote-define
-
:基于 Kafka 的遠端通信實作。目前暫未完成。collector-remote-kafka-provider
-
:基于 Google gRPC 的遠端通信實作。生産環境目前使用collector-remote-grpc-provider
下面,我們從接口到實作的順序進行分享。
2. collector-remote-define
collector-remote-define
:定義遠端通信接口。項目結構如下 :
整體流程如下圖:
我們按照整個流程的處理順序,逐個解析涉及到的類與接口。
2.1 RemoteModule
org.skywalking.apm.collector.remote.RemoteModule
,實作 Module 抽象類,遠端通信 Module 。
#name()
實作方法,傳回子產品名為
"remote"
。
#services()
實作方法,傳回 Service 類名:RemoteSenderService 、RemoteDataRegisterService 。
2.2 RemoteSenderService
org.skywalking.apm.collector.remote.service.RemoteSenderService
,繼承 Service 接口,遠端發送服務接口,定義了
#send(graphId, nodeId, data, selector)
接口方法,調用 RemoteClient ,發送資料。
-
方法參數,Graph 編号。通過graphId
,可以查找到對應的 Graph 對象。graphId
- Graph 在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「2. apm-collector-core/graph」 有詳細解析。
-
方法參數,Worker 編号。通過nodeId
,可以查找在 Graph 對象中的 Worker 對象,進而 Graph 中的流式處理。workerId
- Worker 在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「3. apm-collector-stream」 有詳細解析。
-
方法參數,Data 資料對象。例如,流式處理的具體資料對象。data
- Data 在 《SkyWalking 源碼分析 —— Collector Storage 存儲元件》「2. apm-collector-core」 有詳細解析。
-
方法參數,selector
選擇器對象。根據 Selector 對象,使用對應的負載均衡政策,選擇叢集内的 Collector 節點,發送資料。org.skywalking.apm.collector.remote.service.Selector
- RemoteSenderService.Mode 傳回值,發送模式分成
和Remote
兩種方式。前者,發送資料到遠端的 Collector 節點;後者,發送資料到本地,即本地處理,參見Local
方法。RemoteWorkerRef#in(message)
2.3 RemoteClientService
org.skywalking.apm.collector.remote.service.RemoteClientService
,繼承 Service 接口,遠端用戶端服務接口,定義了
#create(host, port, channelSize, bufferSize)
接口方法,建立 RemoteClient 對象。
2.4 RemoteClient
org.skywalking.apm.collector.remote.service.RemoteClient
,繼承
java.lang.Comparable
接口,遠端用戶端接口。定義了如下接口方法:
-
接口方法,發送資料。#push(graphId, nodeId, data, selector)
-
接口方法,傳回用戶端連接配接的遠端 Collector 位址。#getAddress()
-
接口方法,判斷 RemoteClient 是否連接配接了指定的位址。#equals(address)
2.5 CommonRemoteDataRegisterService
在說 CommonRemoteDataRegisterService 之前,首先來說下 CommonRemoteDataRegisterService 的意圖。
在上文中,我們可以看到發送給 Collector 是 Data 對象,而 Data 是資料的抽象類,在具體反序列化 Data 對象之前,程式是無法得知它是 Data 的哪個實作對象。這個時候,我們可以給 Data 對象的每個實作類,生成一個對應的資料協定編号。
- 在發送資料之前,序列化 Data 對象時,增加該 Data 對應的協定編号,一起發送。
- 在接收資料之後,反序列化資料時,根據協定編号,建立 Data 對應的實作類對象。
org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService
,通用遠端資料注冊服務。
-
屬性,資料協定自增編号。id
-
屬性,資料類型( Class<? extends Data> )與資料協定編号的映射。dataClassMapping
-
屬性,資料協定編号與資料對象建立器( RemoteDataInstanceCreator )的映射。dataInstanceCreatorMapping
2.5.1 RemoteDataRegisterService
org.skywalking.apm.collector.remote.service.RemoteDataRegisterService
,繼承 Service 接口,遠端用戶端服務接口,定義了
#register(Class<? extends Data>, RemoteDataInstanceCreator)
接口方法,注冊資料類型對應的遠端資料建立器(
RemoteDataRegisterService.RemoteDataInstanceCreator
)對象。
CommonRemoteDataRegisterService 實作了 RemoteDataRegisterService 接口,
#register(Class<? extends Data>, RemoteDataInstanceCreator)
實作方法。
另外,AgentStreamRemoteDataRegister 會調用
RemoteDataRegisterService#register(Class<? extends Data>, RemoteDataInstanceCreator)
方法,注冊每個資料類型的 RemoteDataInstanceCreator 對象。注意,例如
Application::new
是 RemoteDataInstanceCreator 的匿名實作類。
2.5.2 RemoteDataIDGetter
org.skywalking.apm.collector.remote.service.RemoteDataIDGetter
,繼承 Service 接口,遠端資料協定編号擷取器接口,定義了
#getRemoteDataId(Class<? extends Data>)
接口方法,根據資料類型擷取資料協定編号。
CommonRemoteDataRegisterService 實作了 RemoteDataIDGetter 接口,
#getRemoteDataId(Class<? extends Data>)
實作方法。
2.5.3 RemoteDataInstanceCreatorGetter
org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter
,繼承 Service 接口,遠端資料建立器的擷取器接口,定義了
#getInstanceCreator(remoteDataId
接口方法,根據資料協定編号獲得遠端資料建立器( RemoteDataInstanceCreator )。
CommonRemoteDataRegisterService 實作了 RemoteDataInstanceCreatorGetter 接口,
#getInstanceCreator(remoteDataId)
實作方法。
2.6 RemoteSerializeService
org.skywalking.apm.collector.remote.service.RemoteSerializeService
,遠端通信序列化服務接口,定義了
#serialize(Data)
接口方法,序列化資料,生成 Builder 對象。
2.7 RemoteSerializeService
org.skywalking.apm.collector.remote.service.RemoteDeserializeService
,遠端通信序反列化服務接口,定義了
#deserialize(RemoteData, Data)
接口方法,反序列化傳輸資料。
3. collector-remote-grpc-provider
collector-remote-grpc-provider
,基于 Google gRPC 的遠端通信實作。
項目結構如下 :
預設配置,在
application-default.yml
已經配置如下:
|
3.1 RemoteModuleGRPCProvider
org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider
,實作 ModuleProvider 抽象類,基于 gRPC 的元件服務提供者實作類。
#name()
實作方法,傳回元件服務提供者名為
"gRPC"
。
module()
實作方法,傳回元件類為 RemoteModule 。
#requiredModules()
實作方法,傳回依賴元件為
cluster
、
gRPC_manager
。
#prepare(Properties)
實作方法,執行準備階段邏輯。
- 第 53 至 56 行 :建立 CommonRemoteDataRegisterService 、GRPCRemoteSenderService 對象,并調用
父類方法,注冊到#registerServiceImplementation()
。services
#start()
實作方法,執行啟動階段邏輯。
- Server 相關
- 第 65 行:建立 gRPC Server 對象。
- 第 67 行:注冊 RemoteCommonServiceHandler 對象到 gRPC Server 上,用于接收 gRPC 請求後的處理。
- 《SkyWalking 源碼分析 —— Collector Server Component 伺服器元件》「3. gRPC 實作」
- 《SkyWalking 源碼分析 —— Collector gRPC Server Manager》
- 注冊發現相關
- 第 70 至 71 行:建立
對象,将自己注冊到叢集管理。這樣,自己可以被 Collector 叢集節點發現,進而被調用。org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCRegistration
- 第 73 至 74 行:注冊 GRPCRemoteSenderService 對象到叢集管理。這樣,自己可以監聽到 Collector 叢集節點的加入或離開,進而調用。
- 《SkyWalking 源碼分析 —— Collector Cluster 叢集管理》
- 第 70 至 71 行:建立
#notifyAfterCompleted()
實作方法,方法為空。
3.2 GRPCRemoteSenderService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService
,繼承 ClusterModuleListener 抽象類,實作 RemoteSenderService 接口,基于 gPRC 的遠端發送服務實作類。
3.2.1 注冊發現
通過繼承 ClusterModuleListener 抽象類,實作了監聽 Collector 叢集節點的加入或離開。
-
屬性,連接配接 Collector 叢集節點的用戶端數組。每個 Collector 叢集節點,對應一個用戶端。remoteClients
-
實作方法,傳回監聽的目錄#path()
。Collector 叢集中,每個節點的 Remote Server 都會注冊到該目錄下。"/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME
-
實作方法,當新的節點加入,建立新的用戶端連接配接。#serverJoinNotify(serverAddress)
-
實作方法,當老的節點離開,移除對應的用戶端連接配接。#serverQuitNotify(serverAddress)
3.2.2 負載均衡
RemoteModuleGRPCProvider 基于不同的選擇器 ( Selector ) ,提供不同的用戶端選擇(
org.skywalking.apm.collector.remote.grpc.service.selector.RemoteClientSelector
)實作 :
-
屬性,HashCodeSelector ,基于資料的哈希碼。hashCodeSelector
-
屬性,ForeverFirstSelector ,基于用戶端數組的順序,選擇第一個。foreverFirstSelector
-
屬性,RollingSelector ,基于用戶端數組的順序,順序向下選擇。rollingSelector
-
方法,代碼如下:#send(graphId, nodeId, data, selector)
- 第 63 、66 、69 行:根據選擇器,調用
方法,選擇用戶端。RemoteClientSelector#select(clients, data)
- 第 64 、67 、70 行:調用
方法,發送請求資料。#sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data)
- 第 76 至 77 行:當選擇的用戶端連接配接的是本地時,不發送資料,交給本地處理,參見
方法。RemoteWorkerRef#in(message)
- 第 78 至 81 行:當選擇的用戶端連接配接的是遠端時,調用
方法,發送資料。RemoteClient#push(graphId, nodeId, data)
- 第 76 至 77 行:當選擇的用戶端連接配接的是本地時,不發送資料,交給本地處理,參見
- 第 63 、66 、69 行:根據選擇器,調用
3.3 GRPCRemoteClientService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService
,實作 RemoteClientService 接口,基于 gRPC 的遠端用戶端服務實作類。
#create(host, port, channelSize, bufferSize)
實作方法,建立 GRPCRemoteClient 對象。
3.4 GRPCRemoteClient
友情提示:本小節會涉及較多 gRPC 相關的知識,建議不熟悉的胖友自己 Google ,補充下姿勢。
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClient
,實作 RemoteClient 接口,基于 gRPC 的遠端用戶端實作類。
-
屬性,GRPCClient 對象。相比來說,GRPCRemoteClient 偏業務的封裝,内部調用 GRPCClient 對象。client
-
屬性,DataCarrier 對象,本地消息隊列。GRPCRemoteClient 在被調用發送資料時,先送出到本地隊列,異步消費進行發送到遠端 Collector 節點。DataCarrier 在 《SkyWalking 源碼分析 —— DataCarrier 異步處理庫》 詳細解析。carrier
- 第 63 行:調用
方法,設定消費者為 RemoteMessageConsumer 對象。DataCarrier#consume(IConsumer, num)
- 第 63 行:調用
#push(graphId, nodeId, data)
實作方法,異步發送消息到遠端 Collector 。
- 第 73 行:調用
方法,獲得資料協定編号。RemoteDataIDGetter#getRemoteDataId(Class<? extends Data>)
- 第 76 至 80 行:建立傳輸資料( RemoteMessage.Builder ) 對象。RemoteMessage 通過 Protobuf 建立定義,如下圖所示:
- 第 83 行:調用
方法,發送資料到本地隊列。DataCarrier#produce(data)
RemoteMessageConsumer ,批量消費本地隊列的資料,逐條發送資料到遠端 Collector 節點。
-
實作方法,代碼如下:#consume(List<RemoteMessage>)
- 第 100 行:建立 StreamObserver 對象。StreamObserver 主要是 gPRC 相關的 API 的調用。
- 第 101 至 103 行:調用
方法,逐條發送資料。io.grpc.stub.StreamObserver#onNext(RemoteMessage)
- 第 106 行:調用
方法,全部請求資料發送完成。io.grpc.stub.StreamObserver#onCompleted()
3.5 RemoteCommonServiceHandler
org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler
,實作
org.skywalking.apm.collector.server.grpc.GRPCHandler
接口,繼承 RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 抽象類,遠端通信通用邏輯處理器。
其中,RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 在
RemoteCommonService.proto
檔案的定義如下圖:
#call(StreamObserver<Empty>)
實作方法,代碼如下:
-
方法,處理每一條消息,代碼如下:#onNext(RemoteMessage)
- 第 65 行:調用
方法,獲得資料協定編号對應的 RemoteDataInstanceCreator 對象。然後,調用RemoteDataInstanceCreatorGetter#getInstanceCreator(remoteDataId)
方法,建立資料協定編号對應的 Data 實作類對應的對象。RemoteDataInstanceCreator#createInstance(id)
- 第 70 行:調用
方法,獲得GraphManager#findGraph(graphId)
對應的 Graph 對象。然後,調動graphId
方法,獲得 Next 對象。GraphNodeFinder#findNext(nodeId)
- 第 71 行:調用
方法,繼續流式處理。Next#execute(Data)
- 第 65 行:調用
3.6 GRPCRemoteSerializeService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService
,實作 RemoteSerializeService 接口,基于 gRPC 的遠端通信序列化服務實作類。
3.7 GRPCRemoteDeserializeService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService
,實作 GRPCRemoteDeserializeService 接口,基于 gRPC 的遠端通信反序列化服務實作類。
4. collector-remote-grpc-provider
collector-remote-kafka-provider
:基于 Kafka 的遠端通信實作。