天天看点

grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall

grpc分层设计

grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall

按照grpc的分层设计,结合源码,分析grpc客户端启动和调用流程。

grpc的ManagedChannel创建

ManagedChannel 是对 Transport 层 SocketChannel 的抽象,Transport 层负责协议消息的序列化和反序列化,以及协议消息的发送和读取。

ManagedChannel 将处理后的请求和响应传递给与之相关联的 ClientCall 进行上层处理,同时,ManagedChannel 提供了对 Channel 的生命周期管理(链路创建、空闲、关闭等)。

创建代码如下:

ManagedChannel channel = ManagedChannelBuilder
               .forAddress(server.getHost(), server.getPort())
               .defaultLoadBalancingPolicy("round_robin")
               .nameResolverFactory(new DnsNameResolverProvider())
               .idleTimeout(30, TimeUnit.SECONDS)
               .usePlaintext().build();
           

a、 forAddress()

这个方法中主要是设置默认的provider和绑定服务端,这里并没有去连接服务端,只是根据host和port拼接String字符串,并返回NettyChannelBuilder 对象

b、 usePlaintext()

不会再去尝试升级http1

a、 build()

channel是利用反射进行创建的,默认基于epoll,如果没有找到对应的类,则创建nio对应的channel

buildTransportFactory(): 新建NettyChannelBuilder.NettyTransportFactory对象,NettyTransportFactory主要用来创建NettyClientTransport对象。

stub

定义的.proto文件中生成的代码中会生成对应的stub

根据build()中返回的channel和需要创建对应的stub,生成的代码中默认提供了newStub(channel),newBlockingStub(channel),newFutureStub(channel) 三种stub。

newStub(channel): 创建异步stub支持所有类型的service调用

newBlockingStub(channel): 创建一个阻塞的stub,支持一元的和输出流的service调用

newFutureStub(channel): 创建一个listener future的stub,支持一元的和输出流的service调用

简单demo进行测试,分别使用50,100,200并发。持续5分钟,

测试机器为4核8g的windows系统

50线程并发5分钟测试结果:

grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall
grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall
grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall

100线程并发5分钟测试结果:

grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall
grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall
grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall

200线程并发5分钟测试结果:

grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall
grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall
grpc 客户端启动及调用流程简单分析grpc分层设计grpc的ManagedChannel创建stubClientCall

ClientCall

客户端直接调用.proto生成的代码

@java.lang.Override
    public io.grpc.stub.StreamObserver<com.why.grpc.OptionServiceOuterClass.OrdersQuery> query(
        io.grpc.stub.StreamObserver<com.why.grpc.OptionServiceOuterClass.OrdersAck> responseObserver) {
      return asyncBidiStreamingCall(
          getChannel().newCall(METHOD_QUERY, getCallOptions()), responseObserver);
    }
           

getChannel(): 这里会获取创建stub时传入的channel,上面build()创建的channel类型为ManagedChannelOrphanWrapper( extends ForwardingManagedChannel --> extends ManagedChannel -->> extends Channel)

newCall:根据channel的类型,调用ForwardingManagedChannel 中的方法,interceptorChannel的类型为ClientInterceptors类型,下面就会进入ClientInterceptors的newCall方法

ForwardingManagedChannel ------------------->
 this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
  @Override
  public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
      MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
    return delegate.newCall(methodDescriptor, callOptions);
  }
           

在build创建ManagedChannelOrphanWrapper时的代码如下,可知delegate时ManagedChannelImpl的类型,代码会进入到ManagedChannelImpl的方法中

AbstractManagedChannelImplBuilder ---->>
  @Override
  public ManagedChannel build() {
    return new ManagedChannelOrphanWrapper(new ManagedChannelImpl(
        this,
        buildTransportFactory(),
        // TODO(carl-mastrangelo): Allow clients to pass this in
        new ExponentialBackoffPolicy.Provider(),
        SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
        GrpcUtil.STOPWATCH_SUPPLIER,
        getEffectiveInterceptors(),
        TimeProvider.SYSTEM_TIME_PROVIDER));
  }
           

在ManagedChannelImpl类中会继续调用ClientInterceptors.InterceptorChannel中的方法

ManagedChannelImpl----->>
 this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
  /*
   * Creates a new outgoing call on the channel.
   */
  @Override
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
      CallOptions callOptions) {
    return interceptorChannel.newCall(method, callOptions);
  }
           

ClientInterceptors.InterceptorChannel中的newCall根据类型,在build()方法中的getEffectiveInterceptors()默认会设置StatsClientInterceptor和TracingClientInterceptor两个拦截器,在ClientInterceptors.intercept(channel, interceptors)方法调用时,会把这两个拦截器分别创建InterceptorChannel并返回,返回的channel包装了两层连接器,所以调用时先

ClientInterceptors.InterceptorChannel------------------->
private final ClientInterceptor interceptor;

    private InterceptorChannel(Channel channel, ClientInterceptor interceptor) {
      this.channel = channel;
      this.interceptor = Preconditions.checkNotNull(interceptor, "interceptor");
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
      return interceptor.interceptCall(method, callOptions, channel);
    }
           

在build()中新建ManagedChannelImpl中,创建的channel中channel使用了ServiceConfigInterceptor拦截器,在ServiceConfigInterceptor中调用next.newCall(method, callOptions),build()方法中Channel channel = new RealChannel(nameResolver.getServiceAuthority());是创建的RealChannel对象接下来会进入RealChannel的newCall方法,最后创建ClientCallImpl对象。在CensusTracingModule中,创建SimpleForwardingClientCall(call)对象,在CensusStatsModule中创建SimpleForwardingClientCall(call)对象,最后返回到有.proto文件生成的代码getChannel().newCall中。

ClientCall newCall() 创建流程

clientcall组装好之后,在asyncBidiStreamingCall(clientcall,oberserver)中创建连接和stream并启动,

ManagedChannelImpl.ChannelTransportProvider: get 方法第一次调用时,异步进行域名解析和负载均衡,并返回DelayedClientTransport对象

DelayedStream:在transport可用之前的stream,当transport可用的时候会委托给真实的stream

这里的DelayedClientTransport和DelayedStream是把dns解析和发送消息等线程放入syncContext中进行异步执行,在异步执行ManagedChannelImpl的requestConnection方法时调用 InternalSubchannel的obtainActiveTransport方法判断是否存在activeTransport,若没有会新建NettyClientTransport并进行连接,在创建stream时会建立连接,在连接成功后更新InternalSubchannel的activeTransport的值和ManagedChannelImpl的subchannelPicker,下次直接从该transport创建stream。

grpc客户端创建连接和发送消息流程