天天看点

java全链路追踪,微服务全链路跟踪:jaeger集成grpc

微服务全链路跟踪:grpc集成zipkin

微服务全链路跟踪:grpc集成jaeger

微服务全链路跟踪:springcloud集成jaeger

微服务全链路跟踪:jaeger集成istio,并兼容uber-trace-id与b3

微服务全链路跟踪:jaeger集成hystrix

微服务全链路跟踪:jaeger增加tag参数

本章节内容是基于springboot2集成net.devh.grpc的拓展

本章介绍grpc集成jaeger,本文主要参考jaeger官方文档进行扩展

https://github.com/opentracing-contrib/java-spring-cloud

https://github.com/opentracing-contrib/java-spring-jaeger

jaeger部署

这里就不列举jaeger代码或者容器部署了,网上很多

grpc工程目录

grpc-common

grpc-client-starter

grpc-server-starter

grpc-common

pom.xml依赖

org.projectlombok

lombok

true

相关实例

import io.opentracing.Span;

public interface ActiveSpanSource{

public static ActiveSpanSource NONE = new ActiveSpanSource() {

@Override

public Span getActiveSpan(){

return null;

}

};

public static ActiveSpanSource GRPC_CONTEXT = new ActiveSpanSource() {

@Override

public Span getActiveSpan(){

return OpenTracingContextKey.activeSpan();

}

};

public Span getActiveSpan();

}

import io.grpc.Context;

import io.opentracing.Span;

public class OpenTracingContextKey{

public static final String KEY_NAME = "io.opentracing.active-span";

private static final Context.Key key = Context.key(KEY_NAME);

public static Span activeSpan(){

return key.get();

}

public static Context.Key getKey(){

return key;

}

}

import io.grpc.MethodDescriptor;

public interface OperationNameConstructor{

public static OperationNameConstructor DEFAULT = new OperationNameConstructor() {

@Override

public  String constructOperationName(MethodDescriptor method){

return method.getFullMethodName();

}

};

public  String constructOperationName(MethodDescriptor method);

}

grpc-server-starter

pom.xml依赖

org.springframework.boot

spring-boot-configuration-processor

true

org.projectlombok

lombok

true

net.devh

grpc-server-spring-boot-starter

com.springcloud.grpc

grpc-common

自动装配

import io.grpc.ClientInterceptor;

import io.grpc.ServerInterceptor;

import io.opentracing.Tracer;

import net.devh.boot.grpc.server.interceptor.GlobalServerInterceptorConfigurer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Lazy;

@Configuration

public class GrpcOpenTracingConfig{

@Autowired

@Lazy

private Tracer tracer;

//grpc-spring-boot-starter provides @GrpcGlobalInterceptor to allow server-side interceptors to be registered with all

//server stubs, we are just taking advantage of that to install the server-side gRPC tracer.

@Bean

ServerInterceptor grpcServerInterceptor(){

return new ServerTracingInterceptor(tracer);

}

@Bean

public GlobalServerInterceptorConfigurer globalInterceptorConfigurerAdapter(ServerInterceptor grpcServerInterceptor){

return registry -> registry.addServerInterceptors(grpcServerInterceptor);

}

}

import com.google.common.collect.ImmutableMap;

import com.grpc.common.OpenTracingContextKey;

import com.grpc.common.OperationNameConstructor;

import io.grpc.BindableService;

import io.grpc.Context;

import io.grpc.Contexts;

import io.grpc.Metadata;

import io.grpc.ServerCall;

import io.grpc.ServerCallHandler;

import io.grpc.ServerInterceptor;

import io.grpc.ServerInterceptors;

import io.grpc.ServerServiceDefinition;

import io.grpc.ForwardingServerCallListener;

import io.opentracing.propagation.Format;

import io.opentracing.propagation.TextMapExtractAdapter;

import io.opentracing.Span;

import io.opentracing.SpanContext;

import io.opentracing.Tracer;

import java.util.Arrays;

import java.util.HashMap;

import java.util.HashSet;

import java.util.Map;

import java.util.Set;

public class ServerTracingInterceptor implements ServerInterceptor{

private final Tracer tracer;

private final OperationNameConstructor operationNameConstructor;

private final boolean streaming;

private final boolean verbose;

private final Set tracedAttributes;

public ServerTracingInterceptor(Tracer tracer){

this.tracer = tracer;

this.operationNameConstructor = OperationNameConstructor.DEFAULT;

this.streaming = false;

this.verbose = false;

this.tracedAttributes = new HashSet();

}

private ServerTracingInterceptor(Tracer tracer, OperationNameConstructor operationNameConstructor, boolean streaming,

boolean verbose, Set tracedAttributes){

this.tracer = tracer;

this.operationNameConstructor = operationNameConstructor;

this.streaming = streaming;

this.verbose = verbose;

this.tracedAttributes = tracedAttributes;

}

public ServerServiceDefinition intercept(ServerServiceDefinition serviceDef){

return ServerInterceptors.intercept(serviceDef, this);

}

public ServerServiceDefinition intercept(BindableService bindableService){

return ServerInterceptors.intercept(bindableService, this);

}

@Override

public  ServerCall.Listener interceptCall(

ServerCall call,

Metadata headers,

ServerCallHandler next

){

Map headerMap = new HashMap();

for (String key : headers.keys()) {

if (!key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {

String value = headers.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));

headerMap.put(key, value);

}

}

final String operationName = operationNameConstructor.constructOperationName(call.getMethodDescriptor());

final Span span = getSpanFromHeaders(headerMap, operationName);

for (ServerRequestAttribute attr : this.tracedAttributes) {

switch (attr) {

case METHOD_TYPE:

span.setTag("grpc.method_type", call.getMethodDescriptor().getType().toString());

break;

case METHOD_NAME:

span.setTag("grpc.method_name", call.getMethodDescriptor().getFullMethodName());

break;

case CALL_ATTRIBUTES:

span.setTag("grpc.call_attributes", call.getAttributes().toString());

break;

case HEADERS:

span.setTag("grpc.headers", headers.toString());

break;

}

}

Context ctxWithSpan = Context.current().withValue(OpenTracingContextKey.getKey(), span);

ServerCall.Listener listenerWithContext = Contexts

.interceptCall(ctxWithSpan, call, headers, next);

ServerCall.Listener tracingListenerWithContext =

new ForwardingServerCallListener.SimpleForwardingServerCallListener(listenerWithContext) {

@Override

public void onMessage(ReqT message){

if (streaming || verbose) { span.log(ImmutableMap.of("Message received", message)); }

delegate().onMessage(message);

}

@Override

public void onHalfClose(){

if (streaming) { span.log("Client finished sending messages"); }

delegate().onHalfClose();

}

@Override

public void onCancel(){

span.log("Call cancelled");

span.finish();

delegate().onCancel();

}

@Override

public void onComplete(){

if (verbose) { span.log("Call completed"); }

span.finish();

delegate().onComplete();

}

};

return tracingListenerWithContext;

}

private Span getSpanFromHeaders(Map headers, String operationName){

Span span;

try {

SpanContext parentSpanCtx = tracer.extract(Format.Builtin.HTTP_HEADERS,

new TextMapExtractAdapter(headers));

if (parentSpanCtx == null) {

span = tracer.buildSpan(operationName).startManual();

} else {

span = tracer.buildSpan(operationName).asChildOf(parentSpanCtx).startManual();

}

} catch (IllegalArgumentException iae){

span = tracer.buildSpan(operationName)

.withTag("Error", "Extract failed and an IllegalArgumentException was thrown")

.startManual();

}

return span;

}

public static class Builder{

private final Tracer tracer;

private OperationNameConstructor operationNameConstructor;

private boolean streaming;

private boolean verbose;

private Set tracedAttributes;

public Builder(Tracer tracer){

this.tracer = tracer;

this.operationNameConstructor = OperationNameConstructor.DEFAULT;

this.streaming = false;

this.verbose = false;

this.tracedAttributes = new HashSet();

}

public Builder withOperationName(OperationNameConstructor operationNameConstructor){

this.operationNameConstructor = operationNameConstructor;

return this;

}

public Builder withTracedAttributes(ServerRequestAttribute... attributes){

this.tracedAttributes = new HashSet(Arrays.asList(attributes));

return this;

}

public Builder withStreaming(){

this.streaming = true;

return this;

}

public Builder withVerbosity(){

this.verbose = true;

return this;

}

public ServerTracingInterceptor build(){

return new ServerTracingInterceptor(this.tracer, this.operationNameConstructor,

this.streaming, this.verbose, this.tracedAttributes);

}

}

public enum ServerRequestAttribute {

HEADERS,

METHOD_TYPE,

METHOD_NAME,

CALL_ATTRIBUTES

}

}

application.xml配置

opentracing:

jaeger:

udp-sender:

host: 169.44.197.59

port: 6831

remote-reporter:

flush-interval: 1000

max-queue-size: 5000

log-spans: false

probabilistic-sampler:

sampling-rate: 1

集成pom配置

com.springcloud.grpc

grpc-server-starter

1.0.0-SNAPSHOT

grpc-client-starter

pom.xml依赖

org.springframework.boot

spring-boot-configuration-processor

true

org.projectlombok

lombok

true

net.devh

grpc-server-spring-boot-starter

com.springcloud.grpc

grpc-common

自动装配

import io.grpc.ClientInterceptor;

import io.opentracing.Tracer;

import lombok.extern.slf4j.Slf4j;

import net.devh.boot.grpc.client.interceptor.GlobalClientInterceptorConfigurer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Lazy;

import org.springframework.core.Ordered;

import org.springframework.core.annotation.Order;

@Order(Ordered.LOWEST_PRECEDENCE)

@Configuration

@Slf4j

public class GrpcOpenTracingConfig{

@Autowired

@Lazy

Tracer tracer;

//We also create a client-side interceptor and put that in the context, this interceptor can then be injected into gRPC clients and

//then applied to the managed channel.

@Bean

ClientInterceptor grpcClientInterceptor(){

return new ClientTracingInterceptor(tracer);

}

@Bean

public GlobalClientInterceptorConfigurer globalInterceptorConfigurerAdapter(ClientInterceptor grpcClientInterceptor){

return registry -> registry.addClientInterceptors(grpcClientInterceptor);

}

}

import com.google.common.collect.ImmutableMap;

import com.grpc.common.ActiveSpanSource;

import com.grpc.common.OperationNameConstructor;

import io.grpc.*;

import io.opentracing.Span;

import io.opentracing.Tracer;

import io.opentracing.propagation.Format;

import io.opentracing.propagation.TextMap;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Configuration;

import javax.annotation.Nullable;

import java.util.Arrays;

import java.util.HashSet;

import java.util.Iterator;

import java.util.Map.Entry;

import java.util.Set;

import java.util.concurrent.TimeUnit;

@Slf4j

public class ClientTracingInterceptor implements ClientInterceptor{

private final Tracer tracer;

private final OperationNameConstructor operationNameConstructor;

private final boolean streaming;

private final boolean verbose;

private final Set tracedAttributes;

private final ActiveSpanSource activeSpanSource;

public ClientTracingInterceptor(Tracer tracer){

this.tracer=tracer;

this.operationNameConstructor = OperationNameConstructor.DEFAULT;

this.streaming = false;

this.verbose = false;

this.tracedAttributes = new HashSet();

this.activeSpanSource = ActiveSpanSource.GRPC_CONTEXT;

}

private ClientTracingInterceptor(Tracer tracer, OperationNameConstructor operationNameConstructor, boolean streaming,

boolean verbose, Set tracedAttributes, ActiveSpanSource activeSpanSource){

this.tracer = tracer;

this.operationNameConstructor = operationNameConstructor;

this.streaming = streaming;

this.verbose = verbose;

this.tracedAttributes = tracedAttributes;

this.activeSpanSource = activeSpanSource;

}

public Channel intercept(Channel channel){

return ClientInterceptors.intercept(channel, this);

}

@Override

public  ClientCall interceptCall(

MethodDescriptor method,

CallOptions callOptions,

Channel next

){

final String operationName = operationNameConstructor.constructOperationName(method);

Span activeSpan = this.activeSpanSource.getActiveSpan();

final Span span = createSpanFromParent(activeSpan, operationName);

for (ClientRequestAttribute attr : this.tracedAttributes) {

switch (attr) {

case ALL_CALL_OPTIONS:

span.setTag("grpc.call_options", callOptions.toString());

break;

case AUTHORITY:

if (callOptions.getAuthority() == null) {

span.setTag("grpc.authority", "null");

} else {

span.setTag("grpc.authority", callOptions.getAuthority());

}

break;

case COMPRESSOR:

if (callOptions.getCompressor() == null) {

span.setTag("grpc.compressor", "null");

} else {

span.setTag("grpc.compressor", callOptions.getCompressor());

}

break;

case DEADLINE:

if (callOptions.getDeadline() == null) {

span.setTag("grpc.deadline_millis", "null");

} else {

span.setTag("grpc.deadline_millis", callOptions.getDeadline().timeRemaining(TimeUnit.MILLISECONDS));

}

break;

case METHOD_NAME:

span.setTag("grpc.method_name", method.getFullMethodName());

break;

case METHOD_TYPE:

if (method.getType() == null) {

span.setTag("grpc.method_type", "null");

} else {

span.setTag("grpc.method_type", method.getType().toString());

}

break;

case HEADERS:

break;

}

}

return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) {

@Override

public void start(Listener responseListener, Metadata headers){

if (verbose) {

span.log("Started call");

}

if (tracedAttributes.contains(ClientRequestAttribute.HEADERS)) {

span.setTag("grpc.headers", headers.toString());

}

tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new TextMap() {

@Override

public void put(String key, String value){

Metadata.Key headerKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);

headers.put(headerKey, value);

}

@Override

public Iterator> iterator() {

throw new UnsupportedOperationException(

"TextMapInjectAdapter should only be used with Tracer.inject()");

}

});

Listener tracingResponseListener = new ForwardingClientCallListener

.SimpleForwardingClientCallListener(responseListener) {

@Override

public void onHeaders(Metadata headers){

if (verbose) { span.log(ImmutableMap.of("Response headers received", headers.toString())); }

delegate().onHeaders(headers);

}

@Override

public void onMessage(RespT message){

if (streaming || verbose) { span.log("Response received"); }

delegate().onMessage(message);

}

@Override

public void onClose(Status status, Metadata trailers){

if (verbose) {

if (status.getCode().value() == 0) { span.log("Call closed"); }

else { span.log(ImmutableMap.of("Call failed", status.getDescription())); }

}

span.finish();

delegate().onClose(status, trailers);

}

};

delegate().start(tracingResponseListener, headers);

}

@Override

public void cancel(@Nullable String message, @Nullable Throwable cause){

String errorMessage;

if (message == null) {

errorMessage = "Error";

} else {

errorMessage = message;

}

if (cause == null) {

span.log(errorMessage);

} else {

span.log(ImmutableMap.of(errorMessage, cause.getMessage()));

}

delegate().cancel(message, cause);

}

@Override

public void halfClose(){

if (streaming) { span.log("Finished sending messages"); }

delegate().halfClose();

}

@Override

public void sendMessage(ReqT message){

if (streaming || verbose) { span.log("Message sent"); }

delegate().sendMessage(message);

}

};

}

private Span createSpanFromParent(Span parentSpan, String operationName){

if (parentSpan == null) {

return tracer.buildSpan(operationName).startManual();

} else {

return tracer.buildSpan(operationName).asChildOf(parentSpan).startManual();

}

}

public static class Builder{

private Tracer tracer;

private OperationNameConstructor operationNameConstructor;

private boolean streaming;

private boolean verbose;

private Set tracedAttributes;

private ActiveSpanSource activeSpanSource;

public Builder(Tracer tracer){

this.tracer = tracer;

this.operationNameConstructor = OperationNameConstructor.DEFAULT;

this.streaming = false;

this.verbose = false;

this.tracedAttributes = new HashSet();

this.activeSpanSource = ActiveSpanSource.GRPC_CONTEXT;

}

public Builder withOperationName(OperationNameConstructor operationNameConstructor){

this.operationNameConstructor = operationNameConstructor;

return this;

}

public Builder withStreaming(){

this.streaming = true;

return this;

}

public Builder withTracedAttributes(ClientRequestAttribute... tracedAttributes){

this.tracedAttributes = new HashSet(

Arrays.asList(tracedAttributes));

return this;

}

public Builder withVerbosity(){

this.verbose = true;

return this;

}

public Builder withActiveSpanSource(ActiveSpanSource activeSpanSource){

this.activeSpanSource = activeSpanSource;

return this;

}

public ClientTracingInterceptor build(){

return new ClientTracingInterceptor(this.tracer, this.operationNameConstructor,

this.streaming, this.verbose, this.tracedAttributes, this.activeSpanSource);

}

}

public enum ClientRequestAttribute {

METHOD_TYPE,

METHOD_NAME,

DEADLINE,

COMPRESSOR,

AUTHORITY,

ALL_CALL_OPTIONS,

HEADERS

}

}

application.xml配置

opentracing:

jaeger:

udp-sender:

host: 169.44.197.59

port: 6831

remote-reporter:

flush-interval: 1000

max-queue-size: 5000

log-spans: false

probabilistic-sampler:

sampling-rate: 1

集成pom配置

com.springcloud.grpc

grpc-client-starter

1.0.0-SNAPSHOT