天天看點

源碼角度了解Skywalking之Trace可以跨線程嗎源碼角度了解Skywalking之Trace可以跨線程嗎

源碼角度了解Skywalking之Trace可以跨線程嗎

Trace資訊是一個重要的資訊,那麼Skywalking的trace可以跨線程傳播嗎?

我們先給出答案,它是是可以的跨線程傳播的,今天就帶大家看一下Trace跨線程的使用和實作原理

使用

private static final ExecutorService SERVICE = Executors.newSingleThreadExecutor(r -> {
    Thread thread = new Thread(r);
    thread.setDaemon(true);
    return thread;
});

public void asyncRunnable(Runnable runnable) {
    SERVICE.submit(RunnableWrapper.of(runnable));
}

public void asyncCallable(Callable<Boolean> callable) {
    SERVICE.submit(CallableWrapper.of(callable));
}
public void asyncSupplier(Supplier<Boolean> supplier) {
    CompletableFuture.supplyAsync(SupplierWrapper.of(supplier));
}

           

定義單個線程池

RunnableWrapper包裝Runnable對象實作Trace的跨線程

CallableWrapper包裝Callable接口對象來實作Trace的跨線程

SupplierWrapper包裝Supplier對象實作Trace的跨線程

這樣Skywalking就可以捕捉到線程的trace資訊了,那麼它是怎麼實作的呢?

實作原理

通過看Skywalking的apm-application-toolkit子產品,可以看到RunnableWrapper、CallableWrapper類和SupplierWrapper類都有一個特點,那就是類上都有一個注解@TraceCrossThread,這個注解顯然是Trace跨線程的意思

誰會掃描@TraceCrossThread注解呢?

apm-toolkit-trace-activation子產品下的CallableOrRunnableActivation會對标注@TraceCrossThread注解的的類和call()方法、run()方法或者get()的方法進行攔截增強,對應攔截器為CallableOrRunnableInvokeInterceptor

CallableOrRunnableConstructInterceptor

CallableOrRunnableActivation還定義了構造方法的切入點,對标注@TraceCrossThread注解的類的任何構造方法進行攔截,攔截器為CallableOrRunnableConstructInterceptor,看一下這個攔截器的增強邏輯

CallableOrRunnableConstructInterceptor類:

public class CallableOrRunnableConstructInterceptor implements InstanceConstructorInterceptor {
    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
        if (ContextManager.isActive()) {
            objInst.setSkyWalkingDynamicField(ContextManager.capture());
        }
    }

}
           

代碼比較簡答,也就是将目前 TracingContext 的核心資訊填充到 ContextSnapshot 中,并添加到_$EnhancedClassField_ws 字段中,接下來執行個體方法的增強會用到這個資訊。

CallableOrRunnableInvokeInterceptor

beforeMethod()方法

CallableOrRunnableInvokeInterceptor的beforeMethod()方法:

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
    MethodInterceptResult result) throws Throwable {
    ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName());
    ContextSnapshot cachedObjects = (ContextSnapshot)objInst.getSkyWalkingDynamicField();
    if (cachedObjects != null) {
        ContextManager.continued(cachedObjects);
    }
}
           
  1. 建立新的TracingContext,建立LocalSpan
  2. 判斷ContextSnapshot對象是否為空,如果不為空,調用ContextManager.continued()方法在此段和跨線程段之間建立引用,填充TracingContext資訊。

afterMethod()方法

CallableOrRunnableInvokeInterceptor的afterMethod()方法:

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
    Object ret) throws Throwable {
    ContextManager.stopSpan();
    // clear ContextSnapshot
    objInst.setSkyWalkingDynamicField(null);
    return ret;
}
           

總結

❤️ 感謝大家

  1. 歡迎關注我❤️,點贊👍🏻,評論🤤,轉發🙏
  2. 有不當之處歡迎批評指正。