源碼角度了解Skywalking之Trace可以跨線程嗎
Trace資訊是一個重要的資訊,那麼Skywalking的trace可以跨線程傳播嗎?
我們先給出答案,它是是可以的跨線程傳播的,今天就帶大家看一下Trace跨線程的使用和實作原理
使用
private static final ExecutorService SERVICE = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
});
public void asyncRunnable(Runnable runnable) {
SERVICE.submit(RunnableWrapper.of(runnable));
}
public void asyncCallable(Callable<Boolean> callable) {
SERVICE.submit(CallableWrapper.of(callable));
}
public void asyncSupplier(Supplier<Boolean> supplier) {
CompletableFuture.supplyAsync(SupplierWrapper.of(supplier));
}
定義單個線程池
RunnableWrapper包裝Runnable對象實作Trace的跨線程
CallableWrapper包裝Callable接口對象來實作Trace的跨線程
SupplierWrapper包裝Supplier對象實作Trace的跨線程
這樣Skywalking就可以捕捉到線程的trace資訊了,那麼它是怎麼實作的呢?
實作原理
通過看Skywalking的apm-application-toolkit子產品,可以看到RunnableWrapper、CallableWrapper類和SupplierWrapper類都有一個特點,那就是類上都有一個注解@TraceCrossThread,這個注解顯然是Trace跨線程的意思
誰會掃描@TraceCrossThread注解呢?
apm-toolkit-trace-activation子產品下的CallableOrRunnableActivation會對标注@TraceCrossThread注解的的類和call()方法、run()方法或者get()的方法進行攔截增強,對應攔截器為CallableOrRunnableInvokeInterceptor
CallableOrRunnableConstructInterceptor
CallableOrRunnableActivation還定義了構造方法的切入點,對标注@TraceCrossThread注解的類的任何構造方法進行攔截,攔截器為CallableOrRunnableConstructInterceptor,看一下這個攔截器的增強邏輯
CallableOrRunnableConstructInterceptor類:
public class CallableOrRunnableConstructInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
if (ContextManager.isActive()) {
objInst.setSkyWalkingDynamicField(ContextManager.capture());
}
}
}
代碼比較簡答,也就是将目前 TracingContext 的核心資訊填充到 ContextSnapshot 中,并添加到_$EnhancedClassField_ws 字段中,接下來執行個體方法的增強會用到這個資訊。
CallableOrRunnableInvokeInterceptor
beforeMethod()方法
CallableOrRunnableInvokeInterceptor的beforeMethod()方法:
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName());
ContextSnapshot cachedObjects = (ContextSnapshot)objInst.getSkyWalkingDynamicField();
if (cachedObjects != null) {
ContextManager.continued(cachedObjects);
}
}
- 建立新的TracingContext,建立LocalSpan
- 判斷ContextSnapshot對象是否為空,如果不為空,調用ContextManager.continued()方法在此段和跨線程段之間建立引用,填充TracingContext資訊。
afterMethod()方法
CallableOrRunnableInvokeInterceptor的afterMethod()方法:
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
// clear ContextSnapshot
objInst.setSkyWalkingDynamicField(null);
return ret;
}
總結
❤️ 感謝大家
- 歡迎關注我❤️,點贊👍🏻,評論🤤,轉發🙏
- 有不當之處歡迎批評指正。