微服务全链路跟踪:grpc集成zipkin
微服务全链路跟踪:grpc集成jaeger
微服务全链路跟踪:springcloud集成jaeger
微服务全链路跟踪:jaeger集成istio,并兼容uber-trace-id与b3
微服务全链路跟踪:jaeger集成hystrix
微服务全链路跟踪:jaeger增加tag参数
本章节内容是基于springboot2集成net.devh.grpc的拓展
本章介绍grpc集成jaeger,本文主要参考jaeger官方文档进行扩展
https://github.com/opentracing-contrib/java-spring-cloud
https://github.com/opentracing-contrib/java-spring-jaeger
jaeger部署
这里就不列举jaeger代码或者容器部署了,网上很多
grpc工程目录
grpc-common
grpc-client-starter
grpc-server-starter
grpc-common
pom.xml依赖
org.projectlombok
lombok
true
相关实例
import io.opentracing.Span;
public interface ActiveSpanSource{
public static ActiveSpanSource NONE = new ActiveSpanSource() {
@Override
public Span getActiveSpan(){
return null;
}
};
public static ActiveSpanSource GRPC_CONTEXT = new ActiveSpanSource() {
@Override
public Span getActiveSpan(){
return OpenTracingContextKey.activeSpan();
}
};
public Span getActiveSpan();
}
import io.grpc.Context;
import io.opentracing.Span;
public class OpenTracingContextKey{
public static final String KEY_NAME = "io.opentracing.active-span";
private static final Context.Key key = Context.key(KEY_NAME);
public static Span activeSpan(){
return key.get();
}
public static Context.Key getKey(){
return key;
}
}
import io.grpc.MethodDescriptor;
public interface OperationNameConstructor{
public static OperationNameConstructor DEFAULT = new OperationNameConstructor() {
@Override
public String constructOperationName(MethodDescriptor method){
return method.getFullMethodName();
}
};
public String constructOperationName(MethodDescriptor method);
}
grpc-server-starter
pom.xml依赖
org.springframework.boot
spring-boot-configuration-processor
true
org.projectlombok
lombok
true
net.devh
grpc-server-spring-boot-starter
com.springcloud.grpc
grpc-common
自动装配
import io.grpc.ClientInterceptor;
import io.grpc.ServerInterceptor;
import io.opentracing.Tracer;
import net.devh.boot.grpc.server.interceptor.GlobalServerInterceptorConfigurer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
@Configuration
public class GrpcOpenTracingConfig{
@Autowired
@Lazy
private Tracer tracer;
//grpc-spring-boot-starter provides @GrpcGlobalInterceptor to allow server-side interceptors to be registered with all
//server stubs, we are just taking advantage of that to install the server-side gRPC tracer.
@Bean
ServerInterceptor grpcServerInterceptor(){
return new ServerTracingInterceptor(tracer);
}
@Bean
public GlobalServerInterceptorConfigurer globalInterceptorConfigurerAdapter(ServerInterceptor grpcServerInterceptor){
return registry -> registry.addServerInterceptors(grpcServerInterceptor);
}
}
import com.google.common.collect.ImmutableMap;
import com.grpc.common.OpenTracingContextKey;
import com.grpc.common.OperationNameConstructor;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.ForwardingServerCallListener;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMapExtractAdapter;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class ServerTracingInterceptor implements ServerInterceptor{
private final Tracer tracer;
private final OperationNameConstructor operationNameConstructor;
private final boolean streaming;
private final boolean verbose;
private final Set tracedAttributes;
public ServerTracingInterceptor(Tracer tracer){
this.tracer = tracer;
this.operationNameConstructor = OperationNameConstructor.DEFAULT;
this.streaming = false;
this.verbose = false;
this.tracedAttributes = new HashSet();
}
private ServerTracingInterceptor(Tracer tracer, OperationNameConstructor operationNameConstructor, boolean streaming,
boolean verbose, Set tracedAttributes){
this.tracer = tracer;
this.operationNameConstructor = operationNameConstructor;
this.streaming = streaming;
this.verbose = verbose;
this.tracedAttributes = tracedAttributes;
}
public ServerServiceDefinition intercept(ServerServiceDefinition serviceDef){
return ServerInterceptors.intercept(serviceDef, this);
}
public ServerServiceDefinition intercept(BindableService bindableService){
return ServerInterceptors.intercept(bindableService, this);
}
@Override
public ServerCall.Listener interceptCall(
ServerCall call,
Metadata headers,
ServerCallHandler next
){
Map headerMap = new HashMap();
for (String key : headers.keys()) {
if (!key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
String value = headers.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
headerMap.put(key, value);
}
}
final String operationName = operationNameConstructor.constructOperationName(call.getMethodDescriptor());
final Span span = getSpanFromHeaders(headerMap, operationName);
for (ServerRequestAttribute attr : this.tracedAttributes) {
switch (attr) {
case METHOD_TYPE:
span.setTag("grpc.method_type", call.getMethodDescriptor().getType().toString());
break;
case METHOD_NAME:
span.setTag("grpc.method_name", call.getMethodDescriptor().getFullMethodName());
break;
case CALL_ATTRIBUTES:
span.setTag("grpc.call_attributes", call.getAttributes().toString());
break;
case HEADERS:
span.setTag("grpc.headers", headers.toString());
break;
}
}
Context ctxWithSpan = Context.current().withValue(OpenTracingContextKey.getKey(), span);
ServerCall.Listener listenerWithContext = Contexts
.interceptCall(ctxWithSpan, call, headers, next);
ServerCall.Listener tracingListenerWithContext =
new ForwardingServerCallListener.SimpleForwardingServerCallListener(listenerWithContext) {
@Override
public void onMessage(ReqT message){
if (streaming || verbose) { span.log(ImmutableMap.of("Message received", message)); }
delegate().onMessage(message);
}
@Override
public void onHalfClose(){
if (streaming) { span.log("Client finished sending messages"); }
delegate().onHalfClose();
}
@Override
public void onCancel(){
span.log("Call cancelled");
span.finish();
delegate().onCancel();
}
@Override
public void onComplete(){
if (verbose) { span.log("Call completed"); }
span.finish();
delegate().onComplete();
}
};
return tracingListenerWithContext;
}
private Span getSpanFromHeaders(Map headers, String operationName){
Span span;
try {
SpanContext parentSpanCtx = tracer.extract(Format.Builtin.HTTP_HEADERS,
new TextMapExtractAdapter(headers));
if (parentSpanCtx == null) {
span = tracer.buildSpan(operationName).startManual();
} else {
span = tracer.buildSpan(operationName).asChildOf(parentSpanCtx).startManual();
}
} catch (IllegalArgumentException iae){
span = tracer.buildSpan(operationName)
.withTag("Error", "Extract failed and an IllegalArgumentException was thrown")
.startManual();
}
return span;
}
public static class Builder{
private final Tracer tracer;
private OperationNameConstructor operationNameConstructor;
private boolean streaming;
private boolean verbose;
private Set tracedAttributes;
public Builder(Tracer tracer){
this.tracer = tracer;
this.operationNameConstructor = OperationNameConstructor.DEFAULT;
this.streaming = false;
this.verbose = false;
this.tracedAttributes = new HashSet();
}
public Builder withOperationName(OperationNameConstructor operationNameConstructor){
this.operationNameConstructor = operationNameConstructor;
return this;
}
public Builder withTracedAttributes(ServerRequestAttribute... attributes){
this.tracedAttributes = new HashSet(Arrays.asList(attributes));
return this;
}
public Builder withStreaming(){
this.streaming = true;
return this;
}
public Builder withVerbosity(){
this.verbose = true;
return this;
}
public ServerTracingInterceptor build(){
return new ServerTracingInterceptor(this.tracer, this.operationNameConstructor,
this.streaming, this.verbose, this.tracedAttributes);
}
}
public enum ServerRequestAttribute {
HEADERS,
METHOD_TYPE,
METHOD_NAME,
CALL_ATTRIBUTES
}
}
application.xml配置
opentracing:
jaeger:
udp-sender:
host: 169.44.197.59
port: 6831
remote-reporter:
flush-interval: 1000
max-queue-size: 5000
log-spans: false
probabilistic-sampler:
sampling-rate: 1
集成pom配置
com.springcloud.grpc
grpc-server-starter
1.0.0-SNAPSHOT
grpc-client-starter
pom.xml依赖
org.springframework.boot
spring-boot-configuration-processor
true
org.projectlombok
lombok
true
net.devh
grpc-server-spring-boot-starter
com.springcloud.grpc
grpc-common
自动装配
import io.grpc.ClientInterceptor;
import io.opentracing.Tracer;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.interceptor.GlobalClientInterceptorConfigurer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
@Order(Ordered.LOWEST_PRECEDENCE)
@Configuration
@Slf4j
public class GrpcOpenTracingConfig{
@Autowired
@Lazy
Tracer tracer;
//We also create a client-side interceptor and put that in the context, this interceptor can then be injected into gRPC clients and
//then applied to the managed channel.
@Bean
ClientInterceptor grpcClientInterceptor(){
return new ClientTracingInterceptor(tracer);
}
@Bean
public GlobalClientInterceptorConfigurer globalInterceptorConfigurerAdapter(ClientInterceptor grpcClientInterceptor){
return registry -> registry.addClientInterceptors(grpcClientInterceptor);
}
}
import com.google.common.collect.ImmutableMap;
import com.grpc.common.ActiveSpanSource;
import com.grpc.common.OperationNameConstructor;
import io.grpc.*;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ClientTracingInterceptor implements ClientInterceptor{
private final Tracer tracer;
private final OperationNameConstructor operationNameConstructor;
private final boolean streaming;
private final boolean verbose;
private final Set tracedAttributes;
private final ActiveSpanSource activeSpanSource;
public ClientTracingInterceptor(Tracer tracer){
this.tracer=tracer;
this.operationNameConstructor = OperationNameConstructor.DEFAULT;
this.streaming = false;
this.verbose = false;
this.tracedAttributes = new HashSet();
this.activeSpanSource = ActiveSpanSource.GRPC_CONTEXT;
}
private ClientTracingInterceptor(Tracer tracer, OperationNameConstructor operationNameConstructor, boolean streaming,
boolean verbose, Set tracedAttributes, ActiveSpanSource activeSpanSource){
this.tracer = tracer;
this.operationNameConstructor = operationNameConstructor;
this.streaming = streaming;
this.verbose = verbose;
this.tracedAttributes = tracedAttributes;
this.activeSpanSource = activeSpanSource;
}
public Channel intercept(Channel channel){
return ClientInterceptors.intercept(channel, this);
}
@Override
public ClientCall interceptCall(
MethodDescriptor method,
CallOptions callOptions,
Channel next
){
final String operationName = operationNameConstructor.constructOperationName(method);
Span activeSpan = this.activeSpanSource.getActiveSpan();
final Span span = createSpanFromParent(activeSpan, operationName);
for (ClientRequestAttribute attr : this.tracedAttributes) {
switch (attr) {
case ALL_CALL_OPTIONS:
span.setTag("grpc.call_options", callOptions.toString());
break;
case AUTHORITY:
if (callOptions.getAuthority() == null) {
span.setTag("grpc.authority", "null");
} else {
span.setTag("grpc.authority", callOptions.getAuthority());
}
break;
case COMPRESSOR:
if (callOptions.getCompressor() == null) {
span.setTag("grpc.compressor", "null");
} else {
span.setTag("grpc.compressor", callOptions.getCompressor());
}
break;
case DEADLINE:
if (callOptions.getDeadline() == null) {
span.setTag("grpc.deadline_millis", "null");
} else {
span.setTag("grpc.deadline_millis", callOptions.getDeadline().timeRemaining(TimeUnit.MILLISECONDS));
}
break;
case METHOD_NAME:
span.setTag("grpc.method_name", method.getFullMethodName());
break;
case METHOD_TYPE:
if (method.getType() == null) {
span.setTag("grpc.method_type", "null");
} else {
span.setTag("grpc.method_type", method.getType().toString());
}
break;
case HEADERS:
break;
}
}
return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) {
@Override
public void start(Listener responseListener, Metadata headers){
if (verbose) {
span.log("Started call");
}
if (tracedAttributes.contains(ClientRequestAttribute.HEADERS)) {
span.setTag("grpc.headers", headers.toString());
}
tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new TextMap() {
@Override
public void put(String key, String value){
Metadata.Key headerKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
headers.put(headerKey, value);
}
@Override
public Iterator> iterator() {
throw new UnsupportedOperationException(
"TextMapInjectAdapter should only be used with Tracer.inject()");
}
});
Listener tracingResponseListener = new ForwardingClientCallListener
.SimpleForwardingClientCallListener(responseListener) {
@Override
public void onHeaders(Metadata headers){
if (verbose) { span.log(ImmutableMap.of("Response headers received", headers.toString())); }
delegate().onHeaders(headers);
}
@Override
public void onMessage(RespT message){
if (streaming || verbose) { span.log("Response received"); }
delegate().onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers){
if (verbose) {
if (status.getCode().value() == 0) { span.log("Call closed"); }
else { span.log(ImmutableMap.of("Call failed", status.getDescription())); }
}
span.finish();
delegate().onClose(status, trailers);
}
};
delegate().start(tracingResponseListener, headers);
}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause){
String errorMessage;
if (message == null) {
errorMessage = "Error";
} else {
errorMessage = message;
}
if (cause == null) {
span.log(errorMessage);
} else {
span.log(ImmutableMap.of(errorMessage, cause.getMessage()));
}
delegate().cancel(message, cause);
}
@Override
public void halfClose(){
if (streaming) { span.log("Finished sending messages"); }
delegate().halfClose();
}
@Override
public void sendMessage(ReqT message){
if (streaming || verbose) { span.log("Message sent"); }
delegate().sendMessage(message);
}
};
}
private Span createSpanFromParent(Span parentSpan, String operationName){
if (parentSpan == null) {
return tracer.buildSpan(operationName).startManual();
} else {
return tracer.buildSpan(operationName).asChildOf(parentSpan).startManual();
}
}
public static class Builder{
private Tracer tracer;
private OperationNameConstructor operationNameConstructor;
private boolean streaming;
private boolean verbose;
private Set tracedAttributes;
private ActiveSpanSource activeSpanSource;
public Builder(Tracer tracer){
this.tracer = tracer;
this.operationNameConstructor = OperationNameConstructor.DEFAULT;
this.streaming = false;
this.verbose = false;
this.tracedAttributes = new HashSet();
this.activeSpanSource = ActiveSpanSource.GRPC_CONTEXT;
}
public Builder withOperationName(OperationNameConstructor operationNameConstructor){
this.operationNameConstructor = operationNameConstructor;
return this;
}
public Builder withStreaming(){
this.streaming = true;
return this;
}
public Builder withTracedAttributes(ClientRequestAttribute... tracedAttributes){
this.tracedAttributes = new HashSet(
Arrays.asList(tracedAttributes));
return this;
}
public Builder withVerbosity(){
this.verbose = true;
return this;
}
public Builder withActiveSpanSource(ActiveSpanSource activeSpanSource){
this.activeSpanSource = activeSpanSource;
return this;
}
public ClientTracingInterceptor build(){
return new ClientTracingInterceptor(this.tracer, this.operationNameConstructor,
this.streaming, this.verbose, this.tracedAttributes, this.activeSpanSource);
}
}
public enum ClientRequestAttribute {
METHOD_TYPE,
METHOD_NAME,
DEADLINE,
COMPRESSOR,
AUTHORITY,
ALL_CALL_OPTIONS,
HEADERS
}
}
application.xml配置
opentracing:
jaeger:
udp-sender:
host: 169.44.197.59
port: 6831
remote-reporter:
flush-interval: 1000
max-queue-size: 5000
log-spans: false
probabilistic-sampler:
sampling-rate: 1
集成pom配置
com.springcloud.grpc
grpc-client-starter
1.0.0-SNAPSHOT