天天看点

SparkRPC源码分析之OneWayMessage消息SparkRPC源码分析之OneWayMessage消息

SparkRPC源码分析之OneWayMessage消息

文章目录

  • SparkRPC源码分析之OneWayMessage消息
    • OneWayMessage

OneWayMessage

首先看一下OneWayMessage的定义

什么是OneWayMessage?

英文注释为:A RPC that does not expect a reply, which is handled by a remote。用普通话说,就是仅仅远程处理而不进行回复消息的RPC消息。

入口信息和其他RequestMessage消息一样

@Override
  public void handle(RequestMessage request) {
    if (request instanceof ChunkFetchRequest) {
      processFetchRequest((ChunkFetchRequest) request);
    } else if (request instanceof RpcRequest) {
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
      processStreamRequest((StreamRequest) request);
    } else if (request instanceof UploadStream) {
      processStreamUpload((UploadStream) request);
    } else {
      throw new IllegalArgumentException("Unknown request type: " + request);
    }
  }
           

直接进入processOneWayMessage方法内部

private void processOneWayMessage(OneWayMessage req) {
    try {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
    } finally {
      req.body().release();
    }
  }

//对比一下processRPCRequest消息
private void processRpcRequest(final RpcRequest req) {
    try {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }
        @Override
        public void onFailure(Throwable e) {
          respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
        }
      });
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
      respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
    } finally {
      req.body().release();
    }
  }

           

它仅仅是RPC消息接受了信息相比于其他消息他少了 respond函数返回的内容。

他调用receive方法和RPCRequest请求嗲用的receive方法不同,可以清楚地看到这个receive方法传入了两个参数,而RPCRequest请求处理的receive方法传入了三个参数,第三个参数为RPCReponseCallback,由于在这里消息是不用被回复的,因此仅需两个参数,其实该函数内部调用的时候还是会调用三个参数的receive方法,代码如下

public void receive(TransportClient client, ByteBuffer message) {
    receive(client, message, ONE_WAY_CALLBACK);
  }

           

只不过传入的消息 参数为ONE_WAY_CALLBACK,这个参数定义为

private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
/**
*此处省略部分代码
*/
private static class OneWayRpcCallback implements RpcResponseCallback {
    private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);
    @Override
    public void onSuccess(ByteBuffer response) {
      logger.warn("Response provided for one-way RPC.");
    }
    @Override
    public void onFailure(Throwable e) {
      logger.error("Error response provided for one-way RPC.", e);
    }
  }
           

可以看到只不过在此处覆写的onSuccess和onFailure方法都是只打印日志不做业务处理。

继续阅读