SparkRPC源码分析之OneWayMessage消息
文章目录
- SparkRPC源码分析之OneWayMessage消息
-
- OneWayMessage
OneWayMessage
首先看一下OneWayMessage的定义
什么是OneWayMessage?
英文注释为:A RPC that does not expect a reply, which is handled by a remote。用普通话说,就是仅仅远程处理而不进行回复消息的RPC消息。
入口信息和其他RequestMessage消息一样
@Override
public void handle(RequestMessage request) {
if (request instanceof ChunkFetchRequest) {
processFetchRequest((ChunkFetchRequest) request);
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
} else if (request instanceof StreamRequest) {
processStreamRequest((StreamRequest) request);
} else if (request instanceof UploadStream) {
processStreamUpload((UploadStream) request);
} else {
throw new IllegalArgumentException("Unknown request type: " + request);
}
}
直接进入processOneWayMessage方法内部
private void processOneWayMessage(OneWayMessage req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
} finally {
req.body().release();
}
}
//对比一下processRPCRequest消息
private void processRpcRequest(final RpcRequest req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
@Override
public void onFailure(Throwable e) {
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
});
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
} finally {
req.body().release();
}
}
它仅仅是RPC消息接受了信息相比于其他消息他少了 respond函数返回的内容。
他调用receive方法和RPCRequest请求嗲用的receive方法不同,可以清楚地看到这个receive方法传入了两个参数,而RPCRequest请求处理的receive方法传入了三个参数,第三个参数为RPCReponseCallback,由于在这里消息是不用被回复的,因此仅需两个参数,其实该函数内部调用的时候还是会调用三个参数的receive方法,代码如下
public void receive(TransportClient client, ByteBuffer message) {
receive(client, message, ONE_WAY_CALLBACK);
}
只不过传入的消息 参数为ONE_WAY_CALLBACK,这个参数定义为
private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
/**
*此处省略部分代码
*/
private static class OneWayRpcCallback implements RpcResponseCallback {
private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);
@Override
public void onSuccess(ByteBuffer response) {
logger.warn("Response provided for one-way RPC.");
}
@Override
public void onFailure(Throwable e) {
logger.error("Error response provided for one-way RPC.", e);
}
}
可以看到只不过在此处覆写的onSuccess和onFailure方法都是只打印日志不做业务处理。