文章目录
- 异常重试
- 对冲策略
- 重试限流
- 设置超时
- 反射创建对象
异常重试
- 调用失败的情况
- 第一种:客户端请求异常,此时未达到服务端
- 第二种:请求达到服务端,但是服务端并未处理此请求
- 第三种:服务端处理请求,但是处理结果是失败
- 对于第一种和第二种gRPC客户端会自动重试,因为服务端并未处理,所以始终可以重试(透明重试transparent retries),第三种需要手动设置重试策略
//在客户端设置重试策略( 新建重试配置retry.json) 1. name 指定需要配置异常重试的RPC方法,service是必填项,method是可选项 2. retryPolicy 指定重试策略 2.1 maxAttempts 最大重试次数,指定一次RPC 调用中最多的请求次数,包括第一次请求。必须是大于 1 的整数,对于大于5的值会被视为5。 如果设置了调用的过期时间,那么到了过期时间,无论重试情况都会返回超时错误DeadlineExceeded 2.2 retryableStatusCode 重试状态码,当 RPC 调用返回非 OK 响应,会根据 retryableStatusCode 来判断是否进行重试, GRPC并没有提供自定义CODE的功能,所以只能用内置的CODE 2.3 initialBackoff,maxBackoff,backoffMultiplier 指数退避参数,在进行下一次重试请求前,会计算需要等待的时间。 必须指定,并且必须具有大于0。第一次重试间隔是random(0, initialBackoff), 第 n 次的重试间隔为random(0, min( initialBackoff*backoffMultiplier**(n-1) , maxBackoff)) { "methodConfig": [ { "name": [ { "service": "cn.jannal.grpc.facade.dto.HelloService", "method": "hello" } ], "retryPolicy": { "maxAttempts": 5, "initialBackoff": "0.5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } } ] }
- 客户端重试配置代码
@Slf4j public class GrpcConsumerRetry { public static final String IP = "127.0.0.1"; public static final int PORT = 8082; @Test public void testRetry() { ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT) .defaultServiceConfig(getRetryingServiceConfig()) .enableRetry() // 重要,客户端是默认关闭了重试的 .usePlaintext()// 启用明文 .build(); // 同步调用 HelloServiceGrpc.HelloServiceBlockingStub stub = HelloServiceGrpc.newBlockingStub(channel); HelloResponse helloResponse = stub.hello(HelloRequest.newBuilder() .setFirstName("Jannal") .setLastName("Jan") .build()); log.info("Response received from server:{}", helloResponse); channel.shutdown(); } protected static Map<String, ?> getRetryingServiceConfig() { return new Gson() .fromJson(new JsonReader(new InputStreamReader( Objects.requireNonNull(GrpcConsumerRetry.class.getClassLoader().getResourceAsStream( "retry.json")), StandardCharsets.UTF_8)), Map.class); } }
对冲策略
- 对冲是指在不等待响应的情况主动发送单次调用的多个请求。请求的流程
- 第一次正常的请求正常发出
- 在等待固定时间间隔后,没有收到正确的响应,第二个对冲请求会被发出
- 再等待固定时间间隔后,没有收到任何前面两个请求的正确响应,第三个会被发出
- 一直重复以上流程直到发出的对冲请求数量达到配置的最大次数
- 一旦收到正确响应,所有对冲请求都会被取消,响应会被返回给应用层
- 注意事项:使用对冲的时候,请求可能会访问到不同的后端(如果设置了负载均衡),所以要求对冲策略应该只用于幂等的操作
- 对冲策略配置
1. name 指定需要配置对冲策略的RPC方法,service是必填项,method是可选项 2. hedgingPolicy 指定对冲策略 2.1 maxAttempts 最大请求次数,指定一次RPC 调用中最多的请求次数,包括第一次请求。 必须是大于1的整数,对于大于5的值会被视为5。 如果设置了调用的过期时间,那么到了过期时间,无论重试情况都会返回超时错误DeadlineExceeded 2.2 hedgingDelay 等待时间,如果hedgingDelay时间内没有响应,那么直接发送第二次请求,如果指定0S,会立即将maxAttempts个请求发出 2.3 nonFatalStatusCodes 当对冲请求接收到nonFatalStatusCodes后,会立即发送下一个对冲请求,不管 hedgingDelay。 如果收到其他的状态码,则所有未完成的对冲请求都将被取消,并且将状态码返回给调用者。 本质上,对冲可以看做是收到FatalStatusCodes 前对RPC调用的重试。 可选的字段,因为在上一个请求没有响应的时候也会发送对冲请求 { "methodConfig": [ { "name": [ { "service": "cn.jannal.grpc.facade.dto.HelloService", "method": "hello" } ], "hedgingPolicy": { "maxAttempts": 3, "hedgingDelay": "1s", "nonFatalStatusCodes":[ ] } } ] }
重试限流
- 当客户端的失败和成功比超过某个阈值时,gRPC 会通过禁用这些重试策略来防止由于重试导致服务器过载
- 重试限流是根据服务器来设置的,而不是针对方法或者服务。对于每个 server,gRPC 的客户端都维护了一个 token_count 变量,变量初始值为配置的 maxTokens 值,值的范围是 0 - maxToken,每次 RPC 请求都会影响这个 token_count 变量值
每次失败的 RPC 请求都会对 token_count 减 1 每次成功的 RPC 请求都会对 token_count 增加 tokenRatio 值
- 如果
,那么后续发出的请求即使失败也不会进行重试了,但是正常的请求还是会发出去,直到这个token_count <= (maxTokens / 2)
才又恢复对失败请求的重试。这种策略可以有效的处理长时间故障。tokenRatio介于0~1之间,支持3位小数token_count > (maxTokens / 2)
设置超时
- 设置超时时间,设置整合stub超时和当前请求的超时
@Slf4j public class GrpcConsumerTimeout { public static final String IP = "127.0.0.1"; public static final int PORT = 8082; @Test public void testTimeout() { ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT) .usePlaintext()// 启用明文 .build(); HelloServiceGrpc.HelloServiceBlockingStub stub = HelloServiceGrpc.newBlockingStub(channel) //设置超时时间,这是设置整个stub的deadline .withDeadlineAfter(3, TimeUnit.SECONDS); HelloResponse helloResponse = stub //设置当前请求的deadline .withDeadlineAfter(3, TimeUnit.SECONDS) .hello(HelloRequest.newBuilder() .setFirstName("Jannal") .setLastName("Jan") .build()); log.info("Response received from server:{}", helloResponse); channel.shutdown(); } }
反射创建对象
- 有时需要将rest动态转换为gRPC的调用,可以使用反射创建请求对象
- 第一种通过内省填充JavaBean对象
@Test public void testReflection() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InvalidProtocolBufferException { //可以通过参数传递过来 String requestClassName = HelloRequest.class.getName(); //构建请求的Message Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader() .loadClass(requestClassName); Method method = requestMessageClass.getMethod("newBuilder"); Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{}); //填充setter方法 Descriptors.Descriptor descriptor = builder.getDescriptorForType(); Descriptors.FieldDescriptor firstNameField = descriptor.findFieldByName("firstName"); builder.setField(firstNameField, "Tom"); Descriptors.FieldDescriptor lastNameField = descriptor.findFieldByName("lastName"); builder.setField(lastNameField, "Jannal"); Message requestMessage = builder.build(); ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT) .usePlaintext()// 启用明文 .build(); String serviceGrpcName = HelloServiceGrpc.class.getName(); String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName(); Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName); Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub); //调用newBlockingStub获取一个stub对象 Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class); AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel}); log.info("{}", stub); String methodName = "hello"; //使用stub调用远程方法 Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class); // HellResponse实现了Message接口 log.info("请求数据:{}", requestMessage); Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage); log.info("服务端响应数据:{}", helloResponse); String json = JsonFormat.printer().print(helloResponse); log.info("服务端响应数据JSON格式:{}", json); channel.shutdown(); }
- 第二种通过请求的JSON直接填充请求对象
//添加依赖 protobuf与json之间相互转换 compile "com.googlecode.protobuf-java-format:protobuf-java-format" @Test public void testReflectionJson() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InvalidProtocolBufferException { //json转protobuf String json = "{\n" + "\tfirstName:\"zhangsan\",\n" + "\tlastName:\"lisi\"\n" + "}"; //可以通过参数传递过来 String requestClassName = HelloRequest.class.getName(); Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader() .loadClass(requestClassName); Method method = requestMessageClass.getMethod("newBuilder"); Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{}); JsonFormat.parser().merge(json, builder); Message requestMessage = builder.build(); ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT) .usePlaintext()// 启用明文 .build(); String serviceGrpcName = HelloServiceGrpc.class.getName(); String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName(); Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName); Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub); //调用newBlockingStub获取一个stub对象 Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class); AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel}); log.info("{}", stub); String methodName = "hello"; //使用stub调用远程方法 Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class); // HellResponse实现了Message接口 log.info("请求数据:{}", requestMessage); Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage); log.info("服务端响应数据:{}", helloResponse); String responseJson = JsonFormat.printer().print(helloResponse); log.info("服务端响应数据JSON格式:{}", responseJson); channel.shutdown(); }