天天看點

基于grpc從零開始搭建一個準生産分布式應用(4) - 06 - springGrpc架構

一、方案一:自定義攔截器

1.1、異常定義

public final class ValidationExceptions {
    private ValidationExceptions() {
    }

    /**
     * Convert a {@link ValidationException} into a gRPC {@code StatusRuntimeException}
     * with status code {@code Code.INVALID_ARGUMENT},
     * the {@link ValidationException} exception message,
     * and {@link Any} error details containing {@link BadRequest} with field violation details.
     *
     * @param ex the {@code ValidationException} to convert.
     * @return a gRPC {@code StatusRuntimeException}
     */
    public static StatusRuntimeException asStatusRuntimeException(ValidationException ex) {
        BadRequest badRequestElement = BadRequest.newBuilder()
                .addFieldViolations(BadRequest.FieldViolation.newBuilder().setField(ex.getField()).setDescription(ex.getReason()).build())
                .build();

        return StatusProto.toStatusRuntimeException(Status.newBuilder()
                .setCode(Code.INVALID_ARGUMENT.getNumber())
                .setMessage(ex.getMessage())
                .addDetails(Any.pack(badRequestElement)).build());
    }
}      

1.2、服務端攔截器

public class ValidatingServerInterceptor implements ServerInterceptor {
    private final ValidatorIndex index;

    public ValidatingServerInterceptor(ValidatorIndex index) {
        this.index = index;
    }

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {

            // Implementations are free to block for extended periods of time. Implementations are not
            // required to be thread-safe.
            private boolean aborted = false;

            @Override
            public void onMessage(ReqT message) {
                try {
                    index.validatorFor(message.getClass()).assertValid(message);
                    super.onMessage(message);
                } catch (ValidationException ex) {
                    StatusRuntimeException status = ValidationExceptions.asStatusRuntimeException(ex);
                    aborted = true;
                    call.close(status.getStatus(), status.getTrailers());
                }
            }

            @Override
            public void onHalfClose() {
                if (!aborted) {
                    super.onHalfClose();
                }
            }
        };
    }
}      

1.3、用戶端攔截器

public class ValidatingClientInterceptor implements ClientInterceptor {
    private final ValidatorIndex index;

    public ValidatingClientInterceptor(ValidatorIndex index) {
        this.index = index;
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            @Override
            public void sendMessage(ReqT message) {
                try {
                    index.validatorFor(message.getClass()).assertValid(message);
                    super.sendMessage(message);
                } catch (ValidationException ex) {
                    throw ValidationExceptions.asStatusRuntimeException(ex);
                }
            }
        };
    }
}      

二、方案二:自定義攔截器

2.1、服務端攔截器

public class DelegateInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
        long inTime = System.currentTimeMillis();

        String trackId = metadata.get(Metadata.Key.of(CONST.TRACKID_KEY, Metadata.ASCII_STRING_MARSHALLER));
        if (StringUtils.isEmpty(trackId)){
            trackId = String.valueOf(genLogId(System.nanoTime()));
        }

        StringBuilder delegateLog = new StringBuilder();
        delegateLog.append("tid=").append(trackId)
                .append(CONST.SPLIT_BLANK).append("appid=").append(TokenParser.appId())
                .append(CONST.SPLIT_BLANK).append("ip=").append(serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR))
                .append(CONST.SPLIT_BLANK).append("uri=").append(serverCall.getMethodDescriptor().getFullMethodName())
//                .append(LogConst.SPLIT_BLANK).append("param=").append(serverCall.getAttributes()) //TODO 列印原始入參,暫時還沒取出來
                .append(CONST.SPLIT_BLANK).append("inTime=").append(inTime);

        //儲存請求時間和相關日志到請求線程中,供後面攔截器列印用
        metadata.put(Metadata.Key.of(CONST.DELEGATE_LOG_KEY, Metadata.ASCII_STRING_MARSHALLER), delegateLog.toString());
        metadata.put(Metadata.Key.of(CONST.DELEGATE_INTIME_KEY, Metadata.ASCII_STRING_MARSHALLER), String.valueOf(inTime));

        log.info(delegateLog.toString());

        //下面設定的值必須為原始值,不能自定義的變量,保持參數的純淨
        DelegateCall<ReqT, RespT> serverCallDelegate = new DelegateCall<>(serverCall);
        serverCallDelegate.setMetadata(metadata);
        DelegateCallListener<ReqT, RespT> delegateCallListener = new DelegateCallListener<>(serverCallHandler.startCall(serverCallDelegate, metadata));
        delegateCallListener.setServerCall(serverCall);

        return delegateCallListener;
    }      

2.2、用戶端攔截器

public class LogGrpcInterceptor implements ClientInterceptor {

    private static final Logger log = LoggerFactory.getLogger(LogGrpcInterceptor.class);

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
            MethodDescriptor<ReqT, RespT> method,
            CallOptions callOptions,
            Channel next) {

        log.info("Received call to {}", method.getFullMethodName());
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

            @Override
            public void sendMessage(ReqT message) {
                log.debug("Request message: {}", message);
                super.sendMessage(message);
            }

            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                super.start(
                        new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
                            @Override
                            public void onMessage(RespT message) {
                                log.debug("Response message: {}", message);
                                super.onMessage(message);
                            }

                            @Override
                            public void onHeaders(Metadata headers) {
                                log.debug("gRPC headers: {}", headers);
                                super.onHeaders(headers);
                            }

                            @Override
                            public void onClose(Status status, Metadata trailers) {
                                log.info("Interaction ends with status: {}", status);
                                log.info("Trailers: {}", trailers);
                                super.onClose(status, trailers);
                            }
                        }, headers);
            }
        };
    }
}      

2.3、利用反射取得相關的值

public class ServerCallStreamObserverUtil<T> {
    private static Class targetClazz;
    private static Field targetField;
    static {
        Class[] classes = ServerCalls.class.getDeclaredClasses();
        for(Class clazz : classes) {
            if("io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl".equals(clazz.getName())) {
                targetClazz = clazz;
                break;
            }
        }
        try {
            targetField = targetClazz.getDeclaredField("call");
            targetField.setAccessible(true);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        }
    }

    public DelegateCall delegateCall(StreamObserver<T> responseObserver){
        try {
            DelegateCall a = (DelegateCall) targetField.get(responseObserver);
            return a;
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return null;
    }
}      

繼續閱讀