天天看點

SkyWalking 源碼分析 —— Collector Remote 遠端通信服務1. 概述2. collector-remote-define3. collector-remote-grpc-provider4. collector-remote-grpc-provider

1. 概述

本文主要分享 SkyWalking Collector Remote 遠端通信服務。該服務用于 Collector 叢集内部通信。

SkyWalking 源碼分析 —— Collector Remote 遠端通信服務1. 概述2. collector-remote-define3. collector-remote-grpc-provider4. collector-remote-grpc-provider

目前叢集内部通信的目的,跨節點的流式處理。Remote Module 應用在 SkyWalking 架構圖如下位置( 紅框 ) :

FROM https://github.com/apache/incubating-skywalking
SkyWalking 源碼分析 —— Collector Remote 遠端通信服務1. 概述2. collector-remote-define3. collector-remote-grpc-provider4. collector-remote-grpc-provider

下面我們來看看整體的項目結構,如下圖所示 :

SkyWalking 源碼分析 —— Collector Remote 遠端通信服務1. 概述2. collector-remote-define3. collector-remote-grpc-provider4. collector-remote-grpc-provider
  • collector-remote-define

     :定義遠端通信接口。
  • collector-remote-kafka-provider

     :基于 Kafka 的遠端通信實作。目前暫未完成。
  • collector-remote-grpc-provider

     :基于 Google gRPC 的遠端通信實作。生産環境目前使用

下面,我們從接口到實作的順序進行分享。

2. collector-remote-define

collector-remote-define

 :定義遠端通信接口。項目結構如下 :

SkyWalking 源碼分析 —— Collector Remote 遠端通信服務1. 概述2. collector-remote-define3. collector-remote-grpc-provider4. collector-remote-grpc-provider

整體流程如下圖:

SkyWalking 源碼分析 —— Collector Remote 遠端通信服務1. 概述2. collector-remote-define3. collector-remote-grpc-provider4. collector-remote-grpc-provider

我們按照整個流程的處理順序,逐個解析涉及到的類與接口。

2.1 RemoteModule

org.skywalking.apm.collector.remote.RemoteModule

 ,實作 Module 抽象類,遠端通信 Module 。

#name()

 實作方法,傳回子產品名為 

"remote"

 。

#services()

 實作方法,傳回 Service 類名:RemoteSenderService 、RemoteDataRegisterService 。

2.2 RemoteSenderService

org.skywalking.apm.collector.remote.service.RemoteSenderService

 ,繼承 Service 接口,遠端發送服務接口,定義了 

#send(graphId, nodeId, data, selector)

 接口方法,調用 RemoteClient ,發送資料。

  • graphId

     方法參數,Graph 編号。通過 

    graphId

     ,可以查找到對應的 Graph 對象。
    • Graph 在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「2. apm-collector-core/graph」 有詳細解析。
  • nodeId

     方法參數,Worker 編号。通過 

    workerId

     ,可以查找在 Graph 對象中的 Worker 對象,進而 Graph 中的流式處理。
    • Worker 在 《SkyWalking 源碼分析 —— Collector Streaming Computing 流式處理(一)》「3. apm-collector-stream」 有詳細解析。
  • data

     方法參數,Data 資料對象。例如,流式處理的具體資料對象。
    • Data 在 《SkyWalking 源碼分析 —— Collector Storage 存儲元件》「2. apm-collector-core」 有詳細解析。
  • selector

     方法參數,

    org.skywalking.apm.collector.remote.service.Selector

     選擇器對象。根據 Selector 對象,使用對應的負載均衡政策,選擇叢集内的 Collector 節點,發送資料。
  • RemoteSenderService.Mode 傳回值,發送模式分成 

    Remote

     和 

    Local

     兩種方式。前者,發送資料到遠端的 Collector 節點;後者,發送資料到本地,即本地處理,參見 

    RemoteWorkerRef#in(message)

     方法。

2.3 RemoteClientService

org.skywalking.apm.collector.remote.service.RemoteClientService

 ,繼承 Service 接口,遠端用戶端服務接口,定義了 

#create(host, port, channelSize, bufferSize)

 接口方法,建立 RemoteClient 對象。

2.4 RemoteClient

org.skywalking.apm.collector.remote.service.RemoteClient

 ,繼承 

java.lang.Comparable

 接口,遠端用戶端接口。定義了如下接口方法:

  • #push(graphId, nodeId, data, selector)

     接口方法,發送資料。
  • #getAddress()

     接口方法,傳回用戶端連接配接的遠端 Collector 位址。
  • #equals(address)

     接口方法,判斷 RemoteClient 是否連接配接了指定的位址。

2.5 CommonRemoteDataRegisterService

在說 CommonRemoteDataRegisterService 之前,首先來說下 CommonRemoteDataRegisterService 的意圖。

在上文中,我們可以看到發送給 Collector 是 Data 對象,而 Data 是資料的抽象類,在具體反序列化 Data 對象之前,程式是無法得知它是 Data 的哪個實作對象。這個時候,我們可以給 Data 對象的每個實作類,生成一個對應的資料協定編号。

  • 在發送資料之前,序列化 Data 對象時,增加該 Data 對應的協定編号,一起發送。
  • 在接收資料之後,反序列化資料時,根據協定編号,建立 Data 對應的實作類對象。

org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService

 ,通用遠端資料注冊服務。

  • id

     屬性,資料協定自增編号。
  • dataClassMapping

     屬性,資料類型( Class<? extends Data> )與資料協定編号的映射。
  • dataInstanceCreatorMapping

     屬性,資料協定編号與資料對象建立器( RemoteDataInstanceCreator )的映射。

2.5.1 RemoteDataRegisterService

org.skywalking.apm.collector.remote.service.RemoteDataRegisterService

 ,繼承 Service 接口,遠端用戶端服務接口,定義了 

#register(Class<? extends Data>, RemoteDataInstanceCreator)

 接口方法,注冊資料類型對應的遠端資料建立器( 

RemoteDataRegisterService.RemoteDataInstanceCreator

 )對象。

CommonRemoteDataRegisterService 實作了 RemoteDataRegisterService 接口,

#register(Class<? extends Data>, RemoteDataInstanceCreator)

 實作方法。

另外,AgentStreamRemoteDataRegister 會調用 

RemoteDataRegisterService#register(Class<? extends Data>, RemoteDataInstanceCreator)

 方法,注冊每個資料類型的 RemoteDataInstanceCreator 對象。注意,例如 

Application::new

 是 RemoteDataInstanceCreator 的匿名實作類。

2.5.2 RemoteDataIDGetter

org.skywalking.apm.collector.remote.service.RemoteDataIDGetter

 ,繼承 Service 接口,遠端資料協定編号擷取器接口,定義了 

#getRemoteDataId(Class<? extends Data>)

 接口方法,根據資料類型擷取資料協定編号。

CommonRemoteDataRegisterService 實作了 RemoteDataIDGetter 接口,

#getRemoteDataId(Class<? extends Data>)

 實作方法。

2.5.3 RemoteDataInstanceCreatorGetter

org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter

 ,繼承 Service 接口,遠端資料建立器的擷取器接口,定義了 

#getInstanceCreator(remoteDataId

 接口方法,根據資料協定編号獲得遠端資料建立器( RemoteDataInstanceCreator )。

CommonRemoteDataRegisterService 實作了 RemoteDataInstanceCreatorGetter 接口,

#getInstanceCreator(remoteDataId)

 實作方法。

2.6 RemoteSerializeService

org.skywalking.apm.collector.remote.service.RemoteSerializeService

 ,遠端通信序列化服務接口,定義了 

#serialize(Data)

 接口方法,序列化資料,生成 Builder 對象。

2.7 RemoteSerializeService

org.skywalking.apm.collector.remote.service.RemoteDeserializeService

 ,遠端通信序反列化服務接口,定義了 

#deserialize(RemoteData, Data)

 接口方法,反序列化傳輸資料。

3. collector-remote-grpc-provider

collector-remote-grpc-provider

 ,基于 Google gRPC 的遠端通信實作。

項目結構如下 :

SkyWalking 源碼分析 —— Collector Remote 遠端通信服務1. 概述2. collector-remote-define3. collector-remote-grpc-provider4. collector-remote-grpc-provider

預設配置,在 

application-default.yml

 已經配置如下:

remote:
  gRPC:
    host: localhost
    port: 11800
           

3.1 RemoteModuleGRPCProvider

org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider

 ,實作 ModuleProvider 抽象類,基于 gRPC 的元件服務提供者實作類。

#name()

 實作方法,傳回元件服務提供者名為 

"gRPC"

 。

module()

 實作方法,傳回元件類為 RemoteModule 。

#requiredModules()

 實作方法,傳回依賴元件為 

cluster

 、

gRPC_manager

 。

#prepare(Properties)

 實作方法,執行準備階段邏輯。

  • 第 53 至 56 行 :建立 CommonRemoteDataRegisterService 、GRPCRemoteSenderService 對象,并調用 

    #registerServiceImplementation()

     父類方法,注冊到 

    services

     。

#start()

 實作方法,執行啟動階段邏輯。

  • Server 相關
    • 第 65 行:建立 gRPC Server 對象。
    • 第 67 行:注冊 RemoteCommonServiceHandler 對象到 gRPC Server 上,用于接收 gRPC 請求後的處理。
    • 《SkyWalking 源碼分析 —— Collector Server Component 伺服器元件》「3. gRPC 實作」
    • 《SkyWalking 源碼分析 —— Collector gRPC Server Manager》
  • 注冊發現相關
    • 第 70 至 71 行:建立 

      org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCRegistration

       對象,将自己注冊到叢集管理。這樣,自己可以被 Collector 叢集節點發現,進而被調用。
    • 第 73 至 74 行:注冊 GRPCRemoteSenderService 對象到叢集管理。這樣,自己可以監聽到 Collector 叢集節點的加入或離開,進而調用。
    • 《SkyWalking 源碼分析 —— Collector Cluster 叢集管理》

#notifyAfterCompleted()

 實作方法,方法為空。

3.2 GRPCRemoteSenderService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService

 ,繼承 ClusterModuleListener 抽象類,實作 RemoteSenderService 接口,基于 gPRC 的遠端發送服務實作類。

3.2.1 注冊發現

通過繼承 ClusterModuleListener 抽象類,實作了監聽 Collector 叢集節點的加入或離開。

  • remoteClients

     屬性,連接配接 Collector 叢集節點的用戶端數組。每個 Collector 叢集節點,對應一個用戶端。
  • #path()

     實作方法,傳回監聽的目錄 

    "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME

     。Collector 叢集中,每個節點的 Remote Server 都會注冊到該目錄下。
  • #serverJoinNotify(serverAddress)

     實作方法,當新的節點加入,建立新的用戶端連接配接。
  • #serverQuitNotify(serverAddress)

     實作方法,當老的節點離開,移除對應的用戶端連接配接。

3.2.2 負載均衡

RemoteModuleGRPCProvider 基于不同的選擇器 ( Selector ) ,提供不同的用戶端選擇( 

org.skywalking.apm.collector.remote.grpc.service.selector.RemoteClientSelector

 )實作 :

SkyWalking 源碼分析 —— Collector Remote 遠端通信服務1. 概述2. collector-remote-define3. collector-remote-grpc-provider4. collector-remote-grpc-provider
  • hashCodeSelector

     屬性,HashCodeSelector ,基于資料的哈希碼。
  • foreverFirstSelector

     屬性,ForeverFirstSelector ,基于用戶端數組的順序,選擇第一個。
  • rollingSelector

     屬性,RollingSelector ,基于用戶端數組的順序,順序向下選擇。
  • #send(graphId, nodeId, data, selector)

     方法,代碼如下:
    • 第 63 、66 、69 行:根據選擇器,調用 

      RemoteClientSelector#select(clients, data)

       方法,選擇用戶端。
    • 第 64 、67 、70 行:調用 

      #sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data)

       方法,發送請求資料。
      • 第 76 至 77 行:當選擇的用戶端連接配接的是本地時,不發送資料,交給本地處理,參見 

        RemoteWorkerRef#in(message)

         方法。
      • 第 78 至 81 行:當選擇的用戶端連接配接的是遠端時,調用 

        RemoteClient#push(graphId, nodeId, data)

         方法,發送資料。

3.3 GRPCRemoteClientService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService

 ,實作 RemoteClientService 接口,基于 gRPC 的遠端用戶端服務實作類。

#create(host, port, channelSize, bufferSize)

 實作方法,建立 GRPCRemoteClient 對象。

3.4 GRPCRemoteClient

友情提示:本小節會涉及較多 gRPC 相關的知識,建議不熟悉的胖友自己 Google ,補充下姿勢。

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClient

 ,實作 RemoteClient 接口,基于 gRPC 的遠端用戶端實作類。

  • client

     屬性,GRPCClient 對象。相比來說,GRPCRemoteClient 偏業務的封裝,内部調用 GRPCClient 對象。
  • carrier

     屬性,DataCarrier 對象,本地消息隊列。GRPCRemoteClient 在被調用發送資料時,先送出到本地隊列,異步消費進行發送到遠端 Collector 節點。DataCarrier 在 《SkyWalking 源碼分析 —— DataCarrier 異步處理庫》 詳細解析。
    • 第 63 行:調用 

      DataCarrier#consume(IConsumer, num)

       方法,設定消費者為 RemoteMessageConsumer 對象。

#push(graphId, nodeId, data)

 實作方法,異步發送消息到遠端 Collector 。

  • 第 73 行:調用 

    RemoteDataIDGetter#getRemoteDataId(Class<? extends Data>)

     方法,獲得資料協定編号。
  • 第 76 至 80 行:建立傳輸資料( RemoteMessage.Builder ) 對象。RemoteMessage 通過 Protobuf 建立定義,如下圖所示:
    SkyWalking 源碼分析 —— Collector Remote 遠端通信服務1. 概述2. collector-remote-define3. collector-remote-grpc-provider4. collector-remote-grpc-provider
  • 第 83 行:調用 

    DataCarrier#produce(data)

     方法,發送資料到本地隊列。

RemoteMessageConsumer ,批量消費本地隊列的資料,逐條發送資料到遠端 Collector 節點。

  • #consume(List<RemoteMessage>)

     實作方法,代碼如下:
    • 第 100 行:建立 StreamObserver 對象。StreamObserver 主要是 gPRC 相關的 API 的調用。
    • 第 101 至 103 行:調用 

      io.grpc.stub.StreamObserver#onNext(RemoteMessage)

       方法,逐條發送資料。
    • 第 106 行:調用 

      io.grpc.stub.StreamObserver#onCompleted()

       方法,全部請求資料發送完成。

3.5 RemoteCommonServiceHandler

org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler

 ,實作 

org.skywalking.apm.collector.server.grpc.GRPCHandler

 接口,繼承 RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 抽象類,遠端通信通用邏輯處理器。

其中,RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 在 

RemoteCommonService.proto

 檔案的定義如下圖:

SkyWalking 源碼分析 —— Collector Remote 遠端通信服務1. 概述2. collector-remote-define3. collector-remote-grpc-provider4. collector-remote-grpc-provider

#call(StreamObserver<Empty>)

 實作方法,代碼如下:

  • #onNext(RemoteMessage)

     方法,處理每一條消息,代碼如下:
    • 第 65 行:調用 

      RemoteDataInstanceCreatorGetter#getInstanceCreator(remoteDataId)

       方法,獲得資料協定編号對應的 RemoteDataInstanceCreator 對象。然後,調用 

      RemoteDataInstanceCreator#createInstance(id)

       方法,建立資料協定編号對應的 Data 實作類對應的對象。
    • 第 70 行:調用 

      GraphManager#findGraph(graphId)

       方法,獲得 

      graphId

       對應的 Graph 對象。然後,調動 

      GraphNodeFinder#findNext(nodeId)

       方法,獲得 Next 對象。
    • 第 71 行:調用 

      Next#execute(Data)

       方法,繼續流式處理。

3.6 GRPCRemoteSerializeService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService

 ,實作 RemoteSerializeService 接口,基于 gRPC 的遠端通信序列化服務實作類。

3.7 GRPCRemoteDeserializeService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService

 ,實作 GRPCRemoteDeserializeService 接口,基于 gRPC 的遠端通信反序列化服務實作類。

4. collector-remote-grpc-provider

collector-remote-kafka-provider

 :基于 Kafka 的遠端通信實作。

繼續閱讀