文章目錄
- 異常重試
- 對沖政策
- 重試限流
- 設定逾時
- 反射建立對象
異常重試
- 調用失敗的情況
- 第一種:用戶端請求異常,此時未達到服務端
- 第二種:請求達到服務端,但是服務端并未處理此請求
- 第三種:服務端處理請求,但是處理結果是失敗
- 對于第一種和第二種gRPC用戶端會自動重試,因為服務端并未處理,是以始終可以重試(透明重試transparent retries),第三種需要手動設定重試政策
//在用戶端設定重試政策( 建立重試配置retry.json) 1. name 指定需要配置異常重試的RPC方法,service是必填項,method是可選項 2. retryPolicy 指定重試政策 2.1 maxAttempts 最大重試次數,指定一次RPC 調用中最多的請求次數,包括第一次請求。必須是大于 1 的整數,對于大于5的值會被視為5。 如果設定了調用的過期時間,那麼到了過期時間,無論重試情況都會傳回逾時錯誤DeadlineExceeded 2.2 retryableStatusCode 重試狀态碼,當 RPC 調用傳回非 OK 響應,會根據 retryableStatusCode 來判斷是否進行重試, GRPC并沒有提供自定義CODE的功能,是以隻能用内置的CODE 2.3 initialBackoff,maxBackoff,backoffMultiplier 指數退避參數,在進行下一次重試請求前,會計算需要等待的時間。 必須指定,并且必須具有大于0。第一次重試間隔是random(0, initialBackoff), 第 n 次的重試間隔為random(0, min( initialBackoff*backoffMultiplier**(n-1) , maxBackoff)) { "methodConfig": [ { "name": [ { "service": "cn.jannal.grpc.facade.dto.HelloService", "method": "hello" } ], "retryPolicy": { "maxAttempts": 5, "initialBackoff": "0.5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } } ] }
- 用戶端重試配置代碼
@Slf4j public class GrpcConsumerRetry { public static final String IP = "127.0.0.1"; public static final int PORT = 8082; @Test public void testRetry() { ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT) .defaultServiceConfig(getRetryingServiceConfig()) .enableRetry() // 重要,用戶端是預設關閉了重試的 .usePlaintext()// 啟用明文 .build(); // 同步調用 HelloServiceGrpc.HelloServiceBlockingStub stub = HelloServiceGrpc.newBlockingStub(channel); HelloResponse helloResponse = stub.hello(HelloRequest.newBuilder() .setFirstName("Jannal") .setLastName("Jan") .build()); log.info("Response received from server:{}", helloResponse); channel.shutdown(); } protected static Map<String, ?> getRetryingServiceConfig() { return new Gson() .fromJson(new JsonReader(new InputStreamReader( Objects.requireNonNull(GrpcConsumerRetry.class.getClassLoader().getResourceAsStream( "retry.json")), StandardCharsets.UTF_8)), Map.class); } }
對沖政策
- 對沖是指在不等待響應的情況主動發送單次調用的多個請求。請求的流程
- 第一次正常的請求正常發出
- 在等待固定時間間隔後,沒有收到正确的響應,第二個對沖請求會被發出
- 再等待固定時間間隔後,沒有收到任何前面兩個請求的正确響應,第三個會被發出
- 一直重複以上流程直到發出的對沖請求數量達到配置的最大次數
- 一旦收到正确響應,所有對沖請求都會被取消,響應會被傳回給應用層
- 注意事項:使用對沖的時候,請求可能會通路到不同的後端(如果設定了負載均衡),是以要求對沖政策應該隻用于幂等的操作
- 對沖政策配置
1. name 指定需要配置對沖政策的RPC方法,service是必填項,method是可選項 2. hedgingPolicy 指定對沖政策 2.1 maxAttempts 最大請求次數,指定一次RPC 調用中最多的請求次數,包括第一次請求。 必須是大于1的整數,對于大于5的值會被視為5。 如果設定了調用的過期時間,那麼到了過期時間,無論重試情況都會傳回逾時錯誤DeadlineExceeded 2.2 hedgingDelay 等待時間,如果hedgingDelay時間内沒有響應,那麼直接發送第二次請求,如果指定0S,會立即将maxAttempts個請求發出 2.3 nonFatalStatusCodes 當對沖請求接收到nonFatalStatusCodes後,會立即發送下一個對沖請求,不管 hedgingDelay。 如果收到其他的狀态碼,則所有未完成的對沖請求都将被取消,并且将狀态碼傳回給調用者。 本質上,對沖可以看做是收到FatalStatusCodes 前對RPC調用的重試。 可選的字段,因為在上一個請求沒有響應的時候也會發送對沖請求 { "methodConfig": [ { "name": [ { "service": "cn.jannal.grpc.facade.dto.HelloService", "method": "hello" } ], "hedgingPolicy": { "maxAttempts": 3, "hedgingDelay": "1s", "nonFatalStatusCodes":[ ] } } ] }
重試限流
- 當用戶端的失敗和成功比超過某個門檻值時,gRPC 會通過禁用這些重試政策來防止由于重試導緻伺服器過載
- 重試限流是根據伺服器來設定的,而不是針對方法或者服務。對于每個 server,gRPC 的用戶端都維護了一個 token_count 變量,變量初始值為配置的 maxTokens 值,值的範圍是 0 - maxToken,每次 RPC 請求都會影響這個 token_count 變量值
每次失敗的 RPC 請求都會對 token_count 減 1 每次成功的 RPC 請求都會對 token_count 增加 tokenRatio 值
- 如果
,那麼後續發出的請求即使失敗也不會進行重試了,但是正常的請求還是會發出去,直到這個token_count <= (maxTokens / 2)
才又恢複對失敗請求的重試。這種政策可以有效的處理長時間故障。tokenRatio介于0~1之間,支援3位小數token_count > (maxTokens / 2)
設定逾時
- 設定逾時時間,設定整合stub逾時和目前請求的逾時
@Slf4j public class GrpcConsumerTimeout { public static final String IP = "127.0.0.1"; public static final int PORT = 8082; @Test public void testTimeout() { ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT) .usePlaintext()// 啟用明文 .build(); HelloServiceGrpc.HelloServiceBlockingStub stub = HelloServiceGrpc.newBlockingStub(channel) //設定逾時時間,這是設定整個stub的deadline .withDeadlineAfter(3, TimeUnit.SECONDS); HelloResponse helloResponse = stub //設定目前請求的deadline .withDeadlineAfter(3, TimeUnit.SECONDS) .hello(HelloRequest.newBuilder() .setFirstName("Jannal") .setLastName("Jan") .build()); log.info("Response received from server:{}", helloResponse); channel.shutdown(); } }
反射建立對象
- 有時需要将rest動态轉換為gRPC的調用,可以使用反射建立請求對象
- 第一種通過内省填充JavaBean對象
@Test public void testReflection() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InvalidProtocolBufferException { //可以通過參數傳遞過來 String requestClassName = HelloRequest.class.getName(); //建構請求的Message Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader() .loadClass(requestClassName); Method method = requestMessageClass.getMethod("newBuilder"); Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{}); //填充setter方法 Descriptors.Descriptor descriptor = builder.getDescriptorForType(); Descriptors.FieldDescriptor firstNameField = descriptor.findFieldByName("firstName"); builder.setField(firstNameField, "Tom"); Descriptors.FieldDescriptor lastNameField = descriptor.findFieldByName("lastName"); builder.setField(lastNameField, "Jannal"); Message requestMessage = builder.build(); ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT) .usePlaintext()// 啟用明文 .build(); String serviceGrpcName = HelloServiceGrpc.class.getName(); String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName(); Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName); Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub); //調用newBlockingStub擷取一個stub對象 Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class); AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel}); log.info("{}", stub); String methodName = "hello"; //使用stub調用遠端方法 Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class); // HellResponse實作了Message接口 log.info("請求資料:{}", requestMessage); Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage); log.info("服務端響應資料:{}", helloResponse); String json = JsonFormat.printer().print(helloResponse); log.info("服務端響應資料JSON格式:{}", json); channel.shutdown(); }
- 第二種通過請求的JSON直接填充請求對象
//添加依賴 protobuf與json之間互相轉換 compile "com.googlecode.protobuf-java-format:protobuf-java-format" @Test public void testReflectionJson() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InvalidProtocolBufferException { //json轉protobuf String json = "{\n" + "\tfirstName:\"zhangsan\",\n" + "\tlastName:\"lisi\"\n" + "}"; //可以通過參數傳遞過來 String requestClassName = HelloRequest.class.getName(); Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader() .loadClass(requestClassName); Method method = requestMessageClass.getMethod("newBuilder"); Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{}); JsonFormat.parser().merge(json, builder); Message requestMessage = builder.build(); ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT) .usePlaintext()// 啟用明文 .build(); String serviceGrpcName = HelloServiceGrpc.class.getName(); String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName(); Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName); Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub); //調用newBlockingStub擷取一個stub對象 Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class); AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel}); log.info("{}", stub); String methodName = "hello"; //使用stub調用遠端方法 Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class); // HellResponse實作了Message接口 log.info("請求資料:{}", requestMessage); Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage); log.info("服務端響應資料:{}", helloResponse); String responseJson = JsonFormat.printer().print(helloResponse); log.info("服務端響應資料JSON格式:{}", responseJson); channel.shutdown(); }