天天看點

源碼角度了解Skywalking之建立連接配接與服務注冊源碼角度了解Skywalking之建立連接配接與服務注冊

源碼角度了解Skywalking之建立連接配接與服務注冊

這篇文章主要講一下Agent與OAP建立連接配接并進行服務注冊。

從SkyWalking的啟動流程SkyWalkingAgent的premain()中我們了解到調用

ServiceManager.INSTANCE.boot()

來啟動插件服務,并分析了利用SPI機制加載配置的各種插件的BootService,然後依次周遊BootService執行個體,調用他們的prepare()準備方法、startup();啟動方法和onComplete()完成方法。

ServiceManager.INSTANCE.boot()方法:

public void boot() {
        bootedServices = loadAllServices();

        prepare();
        startup();
        onComplete();
    }
           

我們先看一下prepare()方法,方法中周遊配置的BootService實作類,依次調用他們的prepare()方法,其中就有一個GRPCChannelManager類

GRPCChannelManager

GRPCChannelManager從名字就可以看出來,它是對gprc協定管道的管理類,它隻實作了boot()方法

boot()方法

boot()方法的關鍵代碼:

public static long GRPC_CHANNEL_CHECK_INTERVAL = 30;
connectCheckFuture = Executors
            .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
            .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override
                public void handle(Throwable t) {
                    logger.error("unexpected exception.", t);
                }
            }), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
           

也就是開啟了通過線程池開啟了開啟,每隔30s執行一次,那麼這個線程主要是做什麼工作的呢?

我們看一下GRPCChannelManager類的run()方法

run()方法

源碼角度了解Skywalking之建立連接配接與服務注冊源碼角度了解Skywalking之建立連接配接與服務注冊

第一步:根據ip和端口與指定OAP執行個體建立連接配接

第二步:連接配接成功後調用notify()方法通知所有的GRPCChannelListener連接配接成功的狀态,GRPCChannelListener會調用statusChanged()方法來改變連接配接狀态的值

第三步:設定重新連接配接為false

也就是它建立網絡連接配接并且通知其他監聽者的類

ServiceAndEndpointRegisterClient

ServiceAndEndpointRegisterClient既實作了BootService接口,又實作了GRPCChannelListener接口,主要的工作就是完成服務和端點的注冊

prepare()方法

@Override
    public void prepare() throws Throwable {
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);

        INSTANCE_UUID = StringUtil.isEmpty(Config.Agent.INSTANCE_UUID) ? UUID.randomUUID().toString()
            .replaceAll("-", "") : Config.Agent.INSTANCE_UUID;

        SERVICE_INSTANCE_PROPERTIES = new ArrayList<KeyStringValuePair>();

        for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
            SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
                .setKey(key).setValue(Config.Agent.INSTANCE_PROPERTIES.get(key)).build());
        }
    }
           
  1. 查找GRPCChannelManager執行個體添加ServiceAndEndpointRegisterClient執行個體作為監聽
  2. 生成執行個體UUID,先從agent.config檔案中讀取,如果檔案中沒有定義生成UUID
  3. 讀取agent.config檔案中的配置資訊添加到SERVICE_INSTANCE_PROPERTIES集合中

boot()方法

public static long APP_AND_SERVICE_REGISTER_CHECK_INTERVAL = 3;
@Override
    public void boot() throws Throwable {
        applicationRegisterFuture = Executors
            .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ServiceAndEndpointRegisterClient"))
            .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override
                public void handle(Throwable t) {
                    logger.error("unexpected exception.", t);
                }
            }), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
    }
           

boot()方法中會每3s執行一次run()

run()方法

源碼角度了解Skywalking之建立連接配接與服務注冊源碼角度了解Skywalking之建立連接配接與服務注冊

run()方法的代碼同樣比較長,主要分為三個部分,

第一部分:如果Agent還沒有完成了注冊,也就是服務id為空

通過doServiceRegister()方法進行服務注冊,傳回服務注冊映射關系,周遊關系集合得到服務id

第二部分:已經完成了注冊,判斷服務執行個體id是否為空,如果為空

通過ServiceId、執行個體uuid和時間戳等參數來調用doServiceInstanceRegister()方法,得到ServiceInstanceRegisterMapping映射對象,周遊映射資訊得到服務執行個體id

第三部分:已經完成了注冊,并且服務執行個體id不為空

總結

❤️ 感謝大家

  1. 歡迎關注我❤️,點贊👍🏻,評論🤤,轉發🙏
  2. 有不當之處歡迎批評指正。