天天看点

gRPC之API详解异常重试对冲策略重试限流设置超时反射创建对象

文章目录

  • 异常重试
  • 对冲策略
  • 重试限流
  • 设置超时
  • 反射创建对象

异常重试

  1. 调用失败的情况
    • 第一种:客户端请求异常,此时未达到服务端
    • 第二种:请求达到服务端,但是服务端并未处理此请求
    • 第三种:服务端处理请求,但是处理结果是失败
  2. 对于第一种和第二种gRPC客户端会自动重试,因为服务端并未处理,所以始终可以重试(透明重试transparent retries),第三种需要手动设置重试策略
    //在客户端设置重试策略( 新建重试配置retry.json)
    1. name 指定需要配置异常重试的RPC方法,service是必填项,method是可选项
    2. retryPolicy 指定重试策略
       2.1 maxAttempts 最大重试次数,指定一次RPC 调用中最多的请求次数,包括第一次请求。必须是大于 1 的整数,对于大于5的值会被视为5。
           如果设置了调用的过期时间,那么到了过期时间,无论重试情况都会返回超时错误DeadlineExceeded  
       2.2 retryableStatusCode 重试状态码,当 RPC 调用返回非 OK 响应,会根据 retryableStatusCode 来判断是否进行重试,
           GRPC并没有提供自定义CODE的功能,所以只能用内置的CODE
       2.3 initialBackoff,maxBackoff,backoffMultiplier 指数退避参数,在进行下一次重试请求前,会计算需要等待的时间。
      			必须指定,并且必须具有大于0。第一次重试间隔是random(0, initialBackoff), 
    				第 n 次的重试间隔为random(0, min( initialBackoff*backoffMultiplier**(n-1) , maxBackoff)) 
    
    {
      "methodConfig": [
        {
          "name": [
            {
              "service": "cn.jannal.grpc.facade.dto.HelloService",
              "method": "hello"
            }
          ],
    
          "retryPolicy": {
            "maxAttempts": 5, 
            "initialBackoff": "0.5s",
            "maxBackoff": "30s",
            "backoffMultiplier": 2,
            "retryableStatusCodes": [
              "UNAVAILABLE" 
            ]
          }
        }
      ]
    }
               
  3. 客户端重试配置代码
    @Slf4j
    public class GrpcConsumerRetry {
        public static final String IP = "127.0.0.1";
        public static final int PORT = 8082;
    
        @Test
        public void testRetry() {
            ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                    .defaultServiceConfig(getRetryingServiceConfig())
                    .enableRetry() // 重要,客户端是默认关闭了重试的
                    .usePlaintext()// 启用明文
                    .build();
            // 同步调用
            HelloServiceGrpc.HelloServiceBlockingStub stub
                    = HelloServiceGrpc.newBlockingStub(channel);
    
            HelloResponse helloResponse = stub.hello(HelloRequest.newBuilder()
                    .setFirstName("Jannal")
                    .setLastName("Jan")
                    .build());
            log.info("Response received from server:{}", helloResponse);
            channel.shutdown();
        }
    
        protected static Map<String, ?> getRetryingServiceConfig() {
            return new Gson()
                    .fromJson(new JsonReader(new InputStreamReader(
                                    Objects.requireNonNull(GrpcConsumerRetry.class.getClassLoader().getResourceAsStream(
                                            "retry.json")), StandardCharsets.UTF_8)),
                            Map.class);
        }
    }
               

对冲策略

  1. 对冲是指在不等待响应的情况主动发送单次调用的多个请求。请求的流程
    • 第一次正常的请求正常发出
    • 在等待固定时间间隔后,没有收到正确的响应,第二个对冲请求会被发出
    • 再等待固定时间间隔后,没有收到任何前面两个请求的正确响应,第三个会被发出
    • 一直重复以上流程直到发出的对冲请求数量达到配置的最大次数
    • 一旦收到正确响应,所有对冲请求都会被取消,响应会被返回给应用层
  2. 注意事项:使用对冲的时候,请求可能会访问到不同的后端(如果设置了负载均衡),所以要求对冲策略应该只用于幂等的操作
  3. 对冲策略配置
    1. name 指定需要配置对冲策略的RPC方法,service是必填项,method是可选项
    2. hedgingPolicy 指定对冲策略
    	2.1 maxAttempts 最大请求次数,指定一次RPC 调用中最多的请求次数,包括第一次请求。 必须是大于1的整数,对于大于5的值会被视为5。
    		如果设置了调用的过期时间,那么到了过期时间,无论重试情况都会返回超时错误DeadlineExceeded
    	2.2 hedgingDelay 等待时间,如果hedgingDelay时间内没有响应,那么直接发送第二次请求,如果指定0S,会立即将maxAttempts个请求发出
    	2.3 nonFatalStatusCodes 当对冲请求接收到nonFatalStatusCodes后,会立即发送下一个对冲请求,不管 hedgingDelay。
    			如果收到其他的状态码,则所有未完成的对冲请求都将被取消,并且将状态码返回给调用者。
    		  本质上,对冲可以看做是收到FatalStatusCodes 前对RPC调用的重试。
    			可选的字段,因为在上一个请求没有响应的时候也会发送对冲请求
    {
      "methodConfig": [
        {
          "name": [
            {
              "service": "cn.jannal.grpc.facade.dto.HelloService",
              "method": "hello"
            }
          ],
          "hedgingPolicy": {
            "maxAttempts": 3,
            "hedgingDelay": "1s",
            "nonFatalStatusCodes":[
              
            ]
          }
        }
      ]
    }
               

重试限流

  1. 当客户端的失败和成功比超过某个阈值时,gRPC 会通过禁用这些重试策略来防止由于重试导致服务器过载
  2. 重试限流是根据服务器来设置的,而不是针对方法或者服务。对于每个 server,gRPC 的客户端都维护了一个 token_count 变量,变量初始值为配置的 maxTokens 值,值的范围是 0 - maxToken,每次 RPC 请求都会影响这个 token_count 变量值
    每次失败的 RPC 请求都会对 token_count 减 1
    每次成功的 RPC 请求都会对 token_count 增加 tokenRatio 值
               
  3. 如果

    token_count <= (maxTokens / 2)

    ,那么后续发出的请求即使失败也不会进行重试了,但是正常的请求还是会发出去,直到这个

    token_count > (maxTokens / 2)

    才又恢复对失败请求的重试。这种策略可以有效的处理长时间故障。tokenRatio介于0~1之间,支持3位小数

设置超时

  1. 设置超时时间,设置整合stub超时和当前请求的超时
    @Slf4j
    public class GrpcConsumerTimeout {
        public static final String IP = "127.0.0.1";
        public static final int PORT = 8082;
    
        @Test
        public void testTimeout() {
            ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                    .usePlaintext()// 启用明文
                    .build();
            HelloServiceGrpc.HelloServiceBlockingStub stub
                    = HelloServiceGrpc.newBlockingStub(channel)
                    //设置超时时间,这是设置整个stub的deadline
                    .withDeadlineAfter(3, TimeUnit.SECONDS);
    
            HelloResponse helloResponse = stub
                    //设置当前请求的deadline
                    .withDeadlineAfter(3, TimeUnit.SECONDS)
                    .hello(HelloRequest.newBuilder()
                            .setFirstName("Jannal")
                            .setLastName("Jan")
                            .build());
            log.info("Response received from server:{}", helloResponse);
            channel.shutdown();
        }
    
    }
               

反射创建对象

  1. 有时需要将rest动态转换为gRPC的调用,可以使用反射创建请求对象
  2. 第一种通过内省填充JavaBean对象
    @Test
    public void testReflection() throws ClassNotFoundException,
            NoSuchMethodException,
            InvocationTargetException,
            IllegalAccessException,
            InvalidProtocolBufferException {
        //可以通过参数传递过来
        String requestClassName = HelloRequest.class.getName();
    
        //构建请求的Message
        Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader()
                .loadClass(requestClassName);
        Method method = requestMessageClass.getMethod("newBuilder");
        Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{});
        //填充setter方法
        Descriptors.Descriptor descriptor = builder.getDescriptorForType();
        Descriptors.FieldDescriptor firstNameField = descriptor.findFieldByName("firstName");
        builder.setField(firstNameField, "Tom");
        Descriptors.FieldDescriptor lastNameField = descriptor.findFieldByName("lastName");
        builder.setField(lastNameField, "Jannal");
        Message requestMessage = builder.build();
    
    
        ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                .usePlaintext()// 启用明文
                .build();
    
        String serviceGrpcName = HelloServiceGrpc.class.getName();
        String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName();
        Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName);
        Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub);
        //调用newBlockingStub获取一个stub对象
        Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class);
        AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel});
        log.info("{}", stub);
    
        String methodName = "hello";
        //使用stub调用远程方法
        Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class);
        // HellResponse实现了Message接口
        log.info("请求数据:{}", requestMessage);
        Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage);
        log.info("服务端响应数据:{}", helloResponse);
    
        String json = JsonFormat.printer().print(helloResponse);
        log.info("服务端响应数据JSON格式:{}", json);
    
        channel.shutdown();
    }
               
  3. 第二种通过请求的JSON直接填充请求对象
    //添加依赖 protobuf与json之间相互转换  
    compile "com.googlecode.protobuf-java-format:protobuf-java-format"   
      
    @Test
    public void testReflectionJson() throws ClassNotFoundException,
            NoSuchMethodException,
            InvocationTargetException,
            IllegalAccessException,
            InvalidProtocolBufferException {
    
        //json转protobuf
        String json = "{\n" +
                "\tfirstName:\"zhangsan\",\n" +
                "\tlastName:\"lisi\"\n" +
                "}";
        //可以通过参数传递过来
        String requestClassName = HelloRequest.class.getName();
        Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader()
                .loadClass(requestClassName);
        Method method = requestMessageClass.getMethod("newBuilder");
        Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{});
        JsonFormat.parser().merge(json, builder);
        Message requestMessage = builder.build();
    
    
        ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                .usePlaintext()// 启用明文
                .build();
    
        String serviceGrpcName = HelloServiceGrpc.class.getName();
        String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName();
        Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName);
        Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub);
        //调用newBlockingStub获取一个stub对象
        Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class);
        AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel});
        log.info("{}", stub);
    
        String methodName = "hello";
        //使用stub调用远程方法
        Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class);
        // HellResponse实现了Message接口
        log.info("请求数据:{}", requestMessage);
        Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage);
        log.info("服务端响应数据:{}", helloResponse);
    
        String responseJson = JsonFormat.printer().print(helloResponse);
        log.info("服务端响应数据JSON格式:{}", responseJson);
    
        channel.shutdown();
    }