目錄
RPC管道處理TransportChannelHandler
RPC服務端處理RpcHandler
引導程式Bootstrap
RPC用戶端TransportClient
總結
接着【Spark核心源碼】内置的RPC架構,Spark的通信兵(一) 接着分析
RPC管道處理TransportChannelHandler
TransportContext最後一個作用就是使用org.apache.spark.network.server.TransportChannelHandler設定Netty Channel pipelines(Netty的通信管道)。
在TransportClientFactory的createClient方法和TransportServer的init方法中都執行了初始化管道方法,也就是TransportContext中的initializePipeline方法。
TransportClientFactory中調用initializePipeline方法
TransportServer中調用initializePipeline方法
initializePipeline方法的代碼如下:
public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
/**
* 管道設定
* Request時按照順序執行,TransportFrameDecoder-》MessageDecoder-》IdleStateHandler-》TransportChannelHandler
* Response時按照逆序執行,IdleStateHandler-》MessageEncoder
* */
channel.pipeline()
.addLast("encoder", encoder) //為pipeline設定encoder
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) //為pipeline設定frameDecoder
.addLast("decoder", decoder) //為pipeline設定decoder
.addLast("idleStateHandler", //為pipeline設定IdleStateHandler,Netty内置對象
new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
throw e;
}
}
首先是調用createChannelHandler方法建立TransportChannelHandler對象,createChannelHandler代碼如下:
/**
* TransportChannelHandler
* 在服務端代理TransportRequestHandler處理請求消息
* 在用戶端代理TransportResponseHandler處理相應資訊
*
* */
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
// 建立TransportChannelHandler的同時,建立了TransportResponseHandler、TransportRequestHandler和TransportClient
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
// 真正意義上的建立TransportClient
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
}
建立TransportChannelHandler之前,先建立了TransportResponseHandler、TransportClient和TransportRequestHandler,在這裡才是真正意義的建立TransportClient對象,與管道一一對應,保證所有使用者使用channel時得到的是同一個TrasportClient對象。TransportChannelHandler在服務端代理TransportRequestHandler處理請求消息,在用戶端代理TransportResponseHandler處理相應資訊。建立了TransportChannelHandler之後,對管道pipeline進行設定,代碼如下:
管道設定
TransportFrameDecoder、MessageDecoder、TransportChannelHandler本質上都是繼承了ChannelInboundHandler,MessageEncoder本質上都是繼承了ChannelOutboundHandler,IdleStateHandler本質上都是繼承了ChannelInboundHandler,ChannelOutboundHandler(繼承和接口實作),根據Netty中handler的執行順序,得出如下:
Request時按照順序執行,TransportFrameDecoder-》MessageDecoder-》IdleStateHandler-》TransportChannelHandler,Response時按照逆序執行,IdleStateHandler-》MessageEncoder。結構如下:
管道處理請求和響應的流程
RPC服務端處理RpcHandler
下面的代碼是TransportRequestHandler中的代碼,可以清楚的看到,TransportRequestHandler是将請求消息交給rpcHandler做進一步處理。
将請求消息交給rpcHandler
RpcHandler是一個抽象類,主要有以下幾個方法:
receive方法,接收單一PRC消息,RpcResponseCallback用來處理結束後的回掉,無論成功與否,都會執行一次。有一個receive重載方法,預設執行OneWayRpcCallback回調,這個回調隻負責列印成功和失敗時的資訊。
/**
* 抽象方法用來接收單一RPC消息
* RpcResponseCallback用來處理結束後的回掉,無論成功與否,都會執行一次
* */
public abstract void receive(
TransportClient client,
ByteBuffer message,
RpcResponseCallback callback);
/**
* 重載receive方法,預設執行ONE_WAY_CALLBACK回調
* */
public void receive(TransportClient client, ByteBuffer message) {
receive(client, message, ONE_WAY_CALLBACK);
}
getStreamManager方法,抽象方法,用于擷取getStreamManager
/**
* 抽象方法擷取StreamManager
* */
public abstract StreamManager getStreamManager();
channelActive、channelInactive、exceptionCaught方法,分别與用戶端相關聯的channel處于活動/非活動/異常狀态時調用
/**
* 當與用戶端相關聯的channel處于活動狀态時調用
* */
public void channelActive(TransportClient client) { }
/**
* 當與用戶端相關聯的channel處于非活動狀态時調用
* */
public void channelInactive(TransportClient client) { }
/**
* 當與用戶端相關聯的channel産生異常時調用
* */
public void exceptionCaught(Throwable cause, TransportClient client) { }
了解了RpcHandler的結構後,再看一下TransportRequestHandler的handle(RequestMessage request)方法
public void handle(RequestMessage request) {
if (request instanceof ChunkFetchRequest) {
processFetchRequest((ChunkFetchRequest) request);
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
} else if (request instanceof StreamRequest) {
processStreamRequest((StreamRequest) request);
} else {
throw new IllegalArgumentException("Unknown request type: " + request);
}
}
TransportRequestHandler處理4中RequestMessage:
1、處理塊擷取請求
private void processFetchRequest(final ChunkFetchRequest req) {
if (logger.isTraceEnabled()) {
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
req.streamChunkId);
}
ManagedBuffer buf;
try {
// this.streamManager = rpcHandler.getStreamManager();
// 校驗用戶端是否有權限從流中讀取消息
streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
// 将一個流與一個用戶端的TCP連結關聯起來,單個流隻會有一個用戶端讀取
streamManager.registerChannel(channel, req.streamChunkId.streamId);
// 擷取塊
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
} catch (Exception e) {
logger.error(String.format("Error opening block %s for request from %s",
req.streamChunkId, getRemoteAddress(channel)), e);
respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
return;
}
// 将ManagedBuffer和流的塊ID封裝到ChunkFetchSuccess中,調用respond方法傳回給用戶端
respond(new ChunkFetchSuccess(req.streamChunkId, buf));
}
processFetchRequest做了以下幾件事:
- 校驗用戶端是否有權限從流中讀取消息
- 将一個流與一個用戶端的TCP連結關聯起來,單個流隻會有一個用戶端讀取
- 擷取塊
- 将ManagedBuffer和流的塊ID封裝到ChunkFetchSuccess中,調用respond方法傳回給用戶端
2、處理RPC請求
代碼如下:
private void processRpcRequest(final RpcRequest req) {
try {
/**
* 将發送消息的用戶端、RpcRequest消息的内容和RpcResponseCallback回調類作為參數傳遞給RpcHandler的receive方法
* 是以說真正處理消息的是RpcHandler,而不是TrnsportRequestHandler
* */
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
@Override
public void onFailure(Throwable e) {
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
});
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
} finally {
req.body().release();
}
}
将發送消息的用戶端、RpcRequest消息的内容和RpcResponseCallback回調類作為參數傳遞給RpcHandler的receive方法,是以說真正處理消息的是RpcHandler,而不是TrnsportRequestHandler。
3、處理無需回複的RPC請求
處理無需回複的RPC請求,回調類是OneWayRpcCallback,處理完RPC請求後不會給用戶端作出響應。
private void processOneWayMessage(OneWayMessage req) {
try {
/**
* 處理無需回複的RPC請求,回調類是OneWayRpcCallback,處理完RPC請求後不會給用戶端作出響應
* */
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
} finally {
req.body().release();
}
}
4、處理流請求
使用streamManager.openStream方法将流資料封裝為ManagedBuffer。
private void processStreamRequest(final StreamRequest req) {
ManagedBuffer buf;
try {
// 将擷取的流資料封裝為ManagedBuffer
buf = streamManager.openStream(req.streamId);
} catch (Exception e) {
logger.error(String.format(
"Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
return;
}
// 無論成功還是失敗,都要響應用戶端
if (buf != null) {
respond(new StreamResponse(req.streamId, buf.size(), buf));
} else {
respond(new StreamFailure(req.streamId, String.format(
"Stream '%s' was not found.", req.streamId)));
}
}
上面這四種處理請求的方法,除了processOneWayMessage不需要調用respond方法外,其他三個都需要調用respond方法,用來響應用戶端。respond方法中實際是調用channel.writeAndFlush來響應用戶端的。
private void respond(final Encodable result) {
final SocketAddress remoteAddress = channel.remoteAddress();
//響應用戶端
channel.writeAndFlush(result).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();
}
}
}
);
}
引導程式Bootstrap
在TransportServer中有一個成員變量List<TransportServerBootstrap>是TransportServer引導程式清單,在初始化管道時,調用了每一個引導程式的doBootstrap方法。
執行引導程式
TransportServerBootstrap定義了服務端引導程式的規範,當用戶端與服務端建立了連接配接,在服務端持有的用戶端管道上執行引導程式。
TransportServerBootstrap接口定義如下:
public interface TransportServerBootstrap {
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}
TransportServerBootstrap有兩個實作類,一個是SaslServerBootstrap,另一個是EncryptionCheckerBootstrap,以SaslServerBootstrap為例說明引導程式的作用。
直接看SaslServerBootstrap的doBootstrap方法:
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
}
doBootstrap方法直接建立了一個RpcHandler的具體實作類SaslRpcHandler。SaslRpcHandler負責對管道進行SASL加密,它內建了RpcHandler,是以核心代碼就在receive中:
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
if (isComplete) {
// Authentication complete, delegate to base handler.
// 将處理好的消息傳遞給下遊的RpcHandler
delegate.receive(client, message, callback);
return;
}
ByteBuf nettyBuf = Unpooled.wrappedBuffer(message);
SaslMessage saslMessage;
try {
// 進行SASL加密
saslMessage = SaslMessage.decode(nettyBuf);
} finally {
nettyBuf.release();
}
if (saslServer == null) {
// First message in the handshake, setup the necessary state.
client.setClientId(saslMessage.appId);
// 如果saslServer為空,建立SparkSaslServer
saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder,
conf.saslServerAlwaysEncrypt());
}
byte[] response;
try {
// saslServer處理已經解密的資訊
response = saslServer.response(JavaUtils.bufferToArray(
saslMessage.body().nioByteBuffer()));
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
callback.onSuccess(ByteBuffer.wrap(response));
// Setup encryption after the SASL response is sent, otherwise the client can't parse the
// response. It's ok to change the channel pipeline here since we are processing an incoming
// message, so the pipeline is busy and no new incoming messages will be fed to it before this
// method returns. This assumes that the code ensures, through other means, that no outbound
// messages are being written to the channel while negotiation is still going on.
if (saslServer.isComplete()) {
logger.debug("SASL authentication successful for channel {}", client);
isComplete = true; // 處理完成
if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
logger.debug("Enabling encryption for channel {}", client);
// 進行管道加密
SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());
saslServer = null;
} else {
saslServer.dispose();
saslServer = null;
}
}
}
receive做了以下幾件事:
- 如果認證已經完成(isComplete=true),将消息傳遞給下遊RpcHandler
- 如果認證未經完成(isComplete=false)對用戶端發送的消息進行加密
- 如果saslServer=null,建立SparkSaslServer,SaslRpcHandler接收用戶端第一條消息時執行此操作
- 使用saslServer處理已解密的消息,并執行回調傳回給用戶端
- 如果認證已經完成,改變isComplete=true
- 對管道進行Sasl加密
可以看到引導程式主要起到了引導、包裝、傳遞、代理的作用,類似的還有TransportClientBootstrap。
RPC用戶端TransportClient
看完RPC服務端利用RpcHandler處理消息後,這裡也看看RPC用戶端如何處理消息的,在TransportContext的createChannelHandler中建立TransportClient。TransportClient一共有5個方法用于發送請求:
- fetchChunk:從遠端協商好的流中請求單個塊
- stream:使用流的ID,從遠端擷取流資料
- sendRpc:向服務端發送RPC請求,通過at least once delivery原則保證請求不丢失
- sendRpcSync:向服務端發送異步RPC請求
- send:想服務端發送RPC請求,但并期望擷取響應,不能保證可靠性
這裡重點分析一下sendRpc方法,代碼如下:
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
final long startTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}
// UUID生成requestId
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
/**
* 這裡的handler是TransportResponseHandler
* 更新最後一次請求時間
* addRpcRequest中利用Map設定requestId和回調類的關系,requestId為key,callback為value
* */
handler.addRpcRequest(requestId, callback);
/**
* channel.writeAndFlush發送請求,無論成功還是失敗都會回調ChannelFutureListener的operationComplete方法
* 成功的話列印日志資訊
* 失敗的話不僅要列印日志,還要執行handler.removeRpcRequest(requestId),移除此次請求
* */
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", requestId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeRpcRequest(requestId);
channel.close();
try {
callback.onFailure(new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
}
});
return requestId;
}
從上面的代碼可以看出,sendRpc做了如下事情:
- 利用UUID生成requestId
- 更新最後一次請求時間,并利用Map設定了requestId和回調類的對應關系
- channel.writeAndFlush發送請求,無論成功還是失敗都會回調ChannelFutureListener的operationComplete方法,成功的話列印日志資訊,失敗的話不僅要列印日志,還要執行handler.removeRpcRequest(requestId),移除此次請求
- 傳回requestId
請求發送成功後,用戶端将會等待接收服務端響應,傳回的消息會傳會給TransportChannelHandler的channelRead方法。
傳回消息傳遞給channelRead
接着進入responseHandler.handle((ResponseMessage) request);方法,其中有6中類型的判斷,RPC對應的是RpcResponse和RpcFailure。
RpcResponse對應的處理如下:
RpcFailure對應的處理如下:
總結
根據上面對Spark RPC元件的分析可以得到RPC用戶端服務端的請求響應流程,如下圖所示:
請求、響應流程圖
用戶端發送請求是在TransportClient的對應方法執行了channel.writeAndFlush方法,并設定了成功和失敗監聽;服務端響應請求是TransportRequestHandler的respond方法中執行了channel.writeAndFlush方法。無論是服務端得到請求還是用戶端接收響應都是通過TransportChannelHandler的channelRead方法判斷Message類型,服務端得到請求交由TransportRequestHandler處理,用戶端接收響應交由TransportResponseHandler處理。引導程式則貫穿于各個步驟當中。
最後總結出Spark RPC架構結構如下圖所示:
Spark RPC架構結構