天天看點

gRPC之API詳解異常重試對沖政策重試限流設定逾時反射建立對象

文章目錄

  • 異常重試
  • 對沖政策
  • 重試限流
  • 設定逾時
  • 反射建立對象

異常重試

  1. 調用失敗的情況
    • 第一種:用戶端請求異常,此時未達到服務端
    • 第二種:請求達到服務端,但是服務端并未處理此請求
    • 第三種:服務端處理請求,但是處理結果是失敗
  2. 對于第一種和第二種gRPC用戶端會自動重試,因為服務端并未處理,是以始終可以重試(透明重試transparent retries),第三種需要手動設定重試政策
    //在用戶端設定重試政策( 建立重試配置retry.json)
    1. name 指定需要配置異常重試的RPC方法,service是必填項,method是可選項
    2. retryPolicy 指定重試政策
       2.1 maxAttempts 最大重試次數,指定一次RPC 調用中最多的請求次數,包括第一次請求。必須是大于 1 的整數,對于大于5的值會被視為5。
           如果設定了調用的過期時間,那麼到了過期時間,無論重試情況都會傳回逾時錯誤DeadlineExceeded  
       2.2 retryableStatusCode 重試狀态碼,當 RPC 調用傳回非 OK 響應,會根據 retryableStatusCode 來判斷是否進行重試,
           GRPC并沒有提供自定義CODE的功能,是以隻能用内置的CODE
       2.3 initialBackoff,maxBackoff,backoffMultiplier 指數退避參數,在進行下一次重試請求前,會計算需要等待的時間。
      			必須指定,并且必須具有大于0。第一次重試間隔是random(0, initialBackoff), 
    				第 n 次的重試間隔為random(0, min( initialBackoff*backoffMultiplier**(n-1) , maxBackoff)) 
    
    {
      "methodConfig": [
        {
          "name": [
            {
              "service": "cn.jannal.grpc.facade.dto.HelloService",
              "method": "hello"
            }
          ],
    
          "retryPolicy": {
            "maxAttempts": 5, 
            "initialBackoff": "0.5s",
            "maxBackoff": "30s",
            "backoffMultiplier": 2,
            "retryableStatusCodes": [
              "UNAVAILABLE" 
            ]
          }
        }
      ]
    }
               
  3. 用戶端重試配置代碼
    @Slf4j
    public class GrpcConsumerRetry {
        public static final String IP = "127.0.0.1";
        public static final int PORT = 8082;
    
        @Test
        public void testRetry() {
            ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                    .defaultServiceConfig(getRetryingServiceConfig())
                    .enableRetry() // 重要,用戶端是預設關閉了重試的
                    .usePlaintext()// 啟用明文
                    .build();
            // 同步調用
            HelloServiceGrpc.HelloServiceBlockingStub stub
                    = HelloServiceGrpc.newBlockingStub(channel);
    
            HelloResponse helloResponse = stub.hello(HelloRequest.newBuilder()
                    .setFirstName("Jannal")
                    .setLastName("Jan")
                    .build());
            log.info("Response received from server:{}", helloResponse);
            channel.shutdown();
        }
    
        protected static Map<String, ?> getRetryingServiceConfig() {
            return new Gson()
                    .fromJson(new JsonReader(new InputStreamReader(
                                    Objects.requireNonNull(GrpcConsumerRetry.class.getClassLoader().getResourceAsStream(
                                            "retry.json")), StandardCharsets.UTF_8)),
                            Map.class);
        }
    }
               

對沖政策

  1. 對沖是指在不等待響應的情況主動發送單次調用的多個請求。請求的流程
    • 第一次正常的請求正常發出
    • 在等待固定時間間隔後,沒有收到正确的響應,第二個對沖請求會被發出
    • 再等待固定時間間隔後,沒有收到任何前面兩個請求的正确響應,第三個會被發出
    • 一直重複以上流程直到發出的對沖請求數量達到配置的最大次數
    • 一旦收到正确響應,所有對沖請求都會被取消,響應會被傳回給應用層
  2. 注意事項:使用對沖的時候,請求可能會通路到不同的後端(如果設定了負載均衡),是以要求對沖政策應該隻用于幂等的操作
  3. 對沖政策配置
    1. name 指定需要配置對沖政策的RPC方法,service是必填項,method是可選項
    2. hedgingPolicy 指定對沖政策
    	2.1 maxAttempts 最大請求次數,指定一次RPC 調用中最多的請求次數,包括第一次請求。 必須是大于1的整數,對于大于5的值會被視為5。
    		如果設定了調用的過期時間,那麼到了過期時間,無論重試情況都會傳回逾時錯誤DeadlineExceeded
    	2.2 hedgingDelay 等待時間,如果hedgingDelay時間内沒有響應,那麼直接發送第二次請求,如果指定0S,會立即将maxAttempts個請求發出
    	2.3 nonFatalStatusCodes 當對沖請求接收到nonFatalStatusCodes後,會立即發送下一個對沖請求,不管 hedgingDelay。
    			如果收到其他的狀态碼,則所有未完成的對沖請求都将被取消,并且将狀态碼傳回給調用者。
    		  本質上,對沖可以看做是收到FatalStatusCodes 前對RPC調用的重試。
    			可選的字段,因為在上一個請求沒有響應的時候也會發送對沖請求
    {
      "methodConfig": [
        {
          "name": [
            {
              "service": "cn.jannal.grpc.facade.dto.HelloService",
              "method": "hello"
            }
          ],
          "hedgingPolicy": {
            "maxAttempts": 3,
            "hedgingDelay": "1s",
            "nonFatalStatusCodes":[
              
            ]
          }
        }
      ]
    }
               

重試限流

  1. 當用戶端的失敗和成功比超過某個門檻值時,gRPC 會通過禁用這些重試政策來防止由于重試導緻伺服器過載
  2. 重試限流是根據伺服器來設定的,而不是針對方法或者服務。對于每個 server,gRPC 的用戶端都維護了一個 token_count 變量,變量初始值為配置的 maxTokens 值,值的範圍是 0 - maxToken,每次 RPC 請求都會影響這個 token_count 變量值
    每次失敗的 RPC 請求都會對 token_count 減 1
    每次成功的 RPC 請求都會對 token_count 增加 tokenRatio 值
               
  3. 如果

    token_count <= (maxTokens / 2)

    ,那麼後續發出的請求即使失敗也不會進行重試了,但是正常的請求還是會發出去,直到這個

    token_count > (maxTokens / 2)

    才又恢複對失敗請求的重試。這種政策可以有效的處理長時間故障。tokenRatio介于0~1之間,支援3位小數

設定逾時

  1. 設定逾時時間,設定整合stub逾時和目前請求的逾時
    @Slf4j
    public class GrpcConsumerTimeout {
        public static final String IP = "127.0.0.1";
        public static final int PORT = 8082;
    
        @Test
        public void testTimeout() {
            ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                    .usePlaintext()// 啟用明文
                    .build();
            HelloServiceGrpc.HelloServiceBlockingStub stub
                    = HelloServiceGrpc.newBlockingStub(channel)
                    //設定逾時時間,這是設定整個stub的deadline
                    .withDeadlineAfter(3, TimeUnit.SECONDS);
    
            HelloResponse helloResponse = stub
                    //設定目前請求的deadline
                    .withDeadlineAfter(3, TimeUnit.SECONDS)
                    .hello(HelloRequest.newBuilder()
                            .setFirstName("Jannal")
                            .setLastName("Jan")
                            .build());
            log.info("Response received from server:{}", helloResponse);
            channel.shutdown();
        }
    
    }
               

反射建立對象

  1. 有時需要将rest動态轉換為gRPC的調用,可以使用反射建立請求對象
  2. 第一種通過内省填充JavaBean對象
    @Test
    public void testReflection() throws ClassNotFoundException,
            NoSuchMethodException,
            InvocationTargetException,
            IllegalAccessException,
            InvalidProtocolBufferException {
        //可以通過參數傳遞過來
        String requestClassName = HelloRequest.class.getName();
    
        //建構請求的Message
        Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader()
                .loadClass(requestClassName);
        Method method = requestMessageClass.getMethod("newBuilder");
        Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{});
        //填充setter方法
        Descriptors.Descriptor descriptor = builder.getDescriptorForType();
        Descriptors.FieldDescriptor firstNameField = descriptor.findFieldByName("firstName");
        builder.setField(firstNameField, "Tom");
        Descriptors.FieldDescriptor lastNameField = descriptor.findFieldByName("lastName");
        builder.setField(lastNameField, "Jannal");
        Message requestMessage = builder.build();
    
    
        ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                .usePlaintext()// 啟用明文
                .build();
    
        String serviceGrpcName = HelloServiceGrpc.class.getName();
        String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName();
        Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName);
        Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub);
        //調用newBlockingStub擷取一個stub對象
        Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class);
        AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel});
        log.info("{}", stub);
    
        String methodName = "hello";
        //使用stub調用遠端方法
        Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class);
        // HellResponse實作了Message接口
        log.info("請求資料:{}", requestMessage);
        Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage);
        log.info("服務端響應資料:{}", helloResponse);
    
        String json = JsonFormat.printer().print(helloResponse);
        log.info("服務端響應資料JSON格式:{}", json);
    
        channel.shutdown();
    }
               
  3. 第二種通過請求的JSON直接填充請求對象
    //添加依賴 protobuf與json之間互相轉換  
    compile "com.googlecode.protobuf-java-format:protobuf-java-format"   
      
    @Test
    public void testReflectionJson() throws ClassNotFoundException,
            NoSuchMethodException,
            InvocationTargetException,
            IllegalAccessException,
            InvalidProtocolBufferException {
    
        //json轉protobuf
        String json = "{\n" +
                "\tfirstName:\"zhangsan\",\n" +
                "\tlastName:\"lisi\"\n" +
                "}";
        //可以通過參數傳遞過來
        String requestClassName = HelloRequest.class.getName();
        Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader()
                .loadClass(requestClassName);
        Method method = requestMessageClass.getMethod("newBuilder");
        Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{});
        JsonFormat.parser().merge(json, builder);
        Message requestMessage = builder.build();
    
    
        ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                .usePlaintext()// 啟用明文
                .build();
    
        String serviceGrpcName = HelloServiceGrpc.class.getName();
        String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName();
        Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName);
        Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub);
        //調用newBlockingStub擷取一個stub對象
        Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class);
        AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel});
        log.info("{}", stub);
    
        String methodName = "hello";
        //使用stub調用遠端方法
        Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class);
        // HellResponse實作了Message接口
        log.info("請求資料:{}", requestMessage);
        Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage);
        log.info("服務端響應資料:{}", helloResponse);
    
        String responseJson = JsonFormat.printer().print(helloResponse);
        log.info("服務端響應資料JSON格式:{}", responseJson);
    
        channel.shutdown();
    }