版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/beliefer/article/details/82143001
提示:閱讀本文前最好先閱讀:
- 《Spark2.1.0之内置RPC架構》
- 《spark2.1.0之源碼分析——RPC配置TransportConf》
- 《spark2.1.0之源碼分析——RPC用戶端工廠TransportClientFactory》
- 《 spark2.1.0之源碼分析——RPC伺服器TransportServer》
- 《spark2.1.0之源碼分析——RPC管道初始化》
- spark2.1.0之源碼分析——RPC傳輸管道處理器詳解 》
- spark2.1.0之源碼分析——服務端RPC處理器RpcHandler詳解
- spark2.1.0之源碼分析——RPC服務端引導程式TransportServerBootstrap
在《
》一文曾介紹過服務端RpcHandler對請求消息的處理,現在來看看用戶端發送RPC請求的原理。我們在分析
中列出的代碼清單2中的createChannelHandler方法時,看到調用了TransportClient的構造器(見代碼清單1),其中TransportResponseHandler的引用将賦給handler屬性。
代碼清單1 TransportClient的構造器
public TransportClient(Channel channel, TransportResponseHandler handler) {
this.channel = Preconditions.checkNotNull(channel);
this.handler = Preconditions.checkNotNull(handler);
this.timedOut = false;
}
TransportClient一共有五個方法用于發送請求,分别為:
- fetchChunk:從遠端協商好的流中請求單個塊;
- stream:使用流的ID,從遠端擷取流資料;
- sendRpc:向服務端發送RPC的請求,通過At least Once Delivery原則保證請求不會丢失;
- sendRpcSync:向服務端發送異步的RPC的請求,并根據指定的逾時時間等待響應;
- send:向服務端發送RPC的請求,但是并不期望能擷取響應,因而不能保證投遞的可靠性;
本節隻選擇最常用的sendRpc和fetchChunk進行分析,其餘實作都可以觸類旁通。
發送RPC請求
sendRpc方法的實作見代碼清單2。
代碼清單2 sendRpc的實作
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
final long startTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}
// 使用UUID生成請求主鍵requestId
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
handler.addRpcRequest(requestId, callback);// 添加requestId與RpcResponseCallback的引用之間的關系
// 發送RPC請求
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", requestId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeRpcRequest(requestId);
channel.close();
try {
callback.onFailure(new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
}
});
return requestId;
}
結合代碼清單2,我們知道sendRpc方法的實作步驟如下:
- 使用UUID生成請求主鍵requestId;
- 調用addRpcRequest向handler(特别提醒下讀者這裡的handler不是RpcHandler,而是通過TransportClient構造器傳入的TransportResponseHandler)添加requestId與回調類RpcResponseCallback的引用之間的關系。TransportResponseHandler的addRpcRequest方法(見代碼清單3)将更新最後一次請求的時間為目前系統時間,然後将requestId與RpcResponseCallback之間的映射加入到outstandingRpcs緩存中。outstandingRpcs專門用于緩存發出的RPC請求資訊。
- 調用Channel的writeAndFlush方法将RPC請求發送出去,這和在《 》一文列出的代碼清單7中服務端調用的respond方法響應用戶端的一樣,都是使用Channel的writeAndFlush方法。當發送成功或者失敗時會回調ChannelFutureListener的operationComplete方法。如果發送成功,那麼隻會列印requestId、遠端位址及花費時間的日志,如果發送失敗,除了列印錯誤日志外,還要調用TransportResponseHandler的removeRpcRequest方法(見代碼清單4)将此次請求從outstandingRpcs緩存中移除。
代碼清單3 添加RPC請求到緩存
public void addRpcRequest(long requestId, RpcResponseCallback callback) {
updateTimeOfLastRequest();
outstandingRpcs.put(requestId, callback);
}
代碼清單4 從緩存中删除RPC請求
public void removeRpcRequest(long requestId) {
outstandingRpcs.remove(requestId);
}
請求發送成功後,用戶端将等待接收服務端的響應。根據
一文的圖1,傳回的消息也會傳遞給TransportChannelHandler的channelRead方法(見《
》一文的代碼清單1),根據之前的分析,消息的分析将最後交給TransportResponseHandler的handle方法來處理。TransportResponseHandler的handle方法分别對《
》一文的圖2中的
六種ResponseMessage進行處理,由于服務端使用processRpcRequest方法(見《
》一文的代碼清單4)處理RpcRequest類型的消息後傳回給用戶端的消息為RpcResponse或RpcFailure,是以我們來看看用戶端的TransportResponseHandler的handle方法是如何處理RpcResponse和RpcFailure,見代碼清單5。
代碼清單5 RpcResponse和RpcFailure消息的處理
} else if (message instanceof RpcResponse) {
RpcResponse resp = (RpcResponse) message;
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);// 擷取RpcResponseCallback
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
resp.requestId, getRemoteAddress(channel), resp.body().size());
} else {
outstandingRpcs.remove(resp.requestId);
try {
listener.onSuccess(resp.body().nioByteBuffer());
} finally {
resp.body().release();
}
}
} else if (message instanceof RpcFailure) {
RpcFailure resp = (RpcFailure) message;
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); // 擷取RpcResponseCallback
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
resp.requestId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingRpcs.remove(resp.requestId);
listener.onFailure(new RuntimeException(resp.errorString));
}
從代碼清單5看到,處理RpcResponse的邏輯為:
- 使用RpcResponse對應的RpcRequest的主鍵requestId,從outstandingRpcs緩存中擷取注冊的RpcResponseCallback,此處的RpcResponseCallback即為代碼清單2中傳遞給sendRpc方法的RpcResponseCallback;
- 移除outstandingRpcs緩存中requestId和RpcResponseCallback的注冊資訊;
- 調用RpcResponseCallback的onSuccess方法,處理成功響應後的具體邏輯。這裡的RpcResponseCallback需要各個使用TransportClient的sendRpc方法的場景中分别實作;
- 最後釋放RpcResponse的body,回收資源。
處理RpcFailure的邏輯為:
- 使用RpcFailure對應的RpcRequest的主鍵requestId,從outstandingRpcs緩存中擷取注冊的RpcResponseCallback,此處的RpcResponseCallback即為代碼清單2中傳遞給sendRpc方法的RpcResponseCallback;
- 調用RpcResponseCallback的onFailure方法,處理失敗響應後的具體邏輯。這裡的RpcResponseCallback需要在使用TransportClient的sendRpc方法時指定或實作。
發送擷取塊請求
fetchChunk的實作見代碼清單6。
代碼清單6 fetchChunk的實作
public void fetchChunk(
long streamId,
final int chunkIndex,
final ChunkReceivedCallback callback) {
final long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
}
final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);// 建立StreamChunkId
// 添加StreamChunkId與ChunkReceivedCallback之間的對應關系
handler.addFetchRequest(streamChunkId, callback);
// 發送塊請求
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeFetchRequest(streamChunkId);
channel.close();
try {
callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
}
});
}
結合代碼清單6,我們知道fetchChunk方法的實作步驟如下:
- 使用流的标記streamId和塊的索引chunkIndex建立StreamChunkId;
- 調用addFetchRequest向handler(特别提醒下讀者這裡的handler不是RpcHandler,而是通過TransportClient構造器傳入的TransportResponseHandler)添加StreamChunkId與回調類ChunkReceivedCallback的引用之間的關系。TransportResponseHandler的addFetchRequest方法(見代碼清單7)将更新最後一次請求的時間為目前系統時間,然後将StreamChunkId與ChunkReceivedCallback之間的映射加入到outstandingFetches緩存中。outstandingFetches專門用于緩存發出的塊請求資訊。
- 調用Channel的writeAndFlush方法将塊請求發送出去,這和在《 》一文列出的代碼清單7中服務端調用的respond方法響應用戶端的一樣,都是使用Channel的writeAndFlush方法。當發送成功或者失敗時會回調ChannelFutureListener的operationComplete方法。如果發送成功,那麼隻會列印StreamChunkId、遠端位址及花費時間的日志,如果發送失敗,除了列印錯誤日志外,還要調用TransportResponseHandler的removeFetchRequest方法(見代碼清單8)将此次請求從outstandingFetches緩存中移除。
代碼清單7 添加塊請求到緩存
public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
updateTimeOfLastRequest();
outstandingFetches.put(streamChunkId, callback);
}
代碼清單8 從緩存中删除塊請求
public void removeFetchRequest(StreamChunkId streamChunkId) {
outstandingFetches.remove(streamChunkId);
}
》一文的圖2中的六種處理結果進行處理,由于服務端使用processFetchRequest方法(見《
》一文的代碼清單3)處理ChunkFetchRequest類型的消息後傳回給用戶端的消息為ChunkFetchSuccess或ChunkFetchFailure,是以我們來看看用戶端的TransportResponseHandler的handle方法是如何處理ChunkFetchSuccess和ChunkFetchFailure,見代碼清單9
代碼清單9 ChunkFetchSuccess和ChunkFetchFailure消息的處理
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
resp.streamChunkId, getRemoteAddress(channel));
resp.body().release();
} else {
outstandingFetches.remove(resp.streamChunkId);
listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
resp.body().release();
}
} else if (message instanceof ChunkFetchFailure) {
ChunkFetchFailure resp = (ChunkFetchFailure) message;
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingFetches.remove(resp.streamChunkId);
listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
"Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
}
}
從代碼清單9看到,處理ChunkFetchSuccess的邏輯為:
- 使用ChunkFetchSuccess對應的StreamChunkId,從outstandingFetches緩存中擷取注冊的ChunkReceivedCallback,此處的ChunkReceivedCallback即為代碼清單6中傳遞給fetchChunk方法的ChunkReceivedCallback;
- 移除outstandingFetches緩存中StreamChunkId和ChunkReceivedCallback的注冊資訊;
- 調用ChunkReceivedCallback的onSuccess方法,處理成功響應後的具體邏輯。這裡的ChunkReceivedCallback需要各個使用TransportClient的fetchChunk方法的場景中分别實作;
- 最後釋放ChunkFetchSuccess的body,回收資源。
處理ChunkFetchFailure的邏輯為:
- 使用ChunkFetchFailure對應的StreamChunkId,從outstandingFetches緩存中擷取注冊的ChunkReceivedCallback,此處的ChunkReceivedCallback即為代碼清單6中傳遞給fetchChunk方法的ChunkReceivedCallback;
- 調用ChunkReceivedCallback的onFailure方法,處理失敗響應後的具體邏輯。這裡的ChunkReceivedCallback需要各個使用TransportClient的fetchChunk方法的場景中分别實作。
在詳細介紹了TransportClient和TransportResponseHandler之後,對于用戶端我們就可以擴充
一文的圖1,把TransportResponseHandler及TransportClient的處理流程增加進來,如下圖所示。
用戶端請求、響應流程圖
上圖中的序号①表示調用TransportResponseHandler的addRpcRequest方法(或addFetchRequest方法)将更新最後一次請求的時間為目前系統時間,然後将requestId與RpcResponseCallback之間的映射加入到outstandingRpcs緩存中(或将StreamChunkId與ChunkReceivedCallback之間的映射加入到outstandingFetches緩存中)。②表示調用Channel的writeAndFlush方法将RPC請求發送出去。圖中的虛線表示當TransportResponseHandler處理RpcResponse和RpcFailure時将從outstandingRpcs緩存中擷取此請求對應的RpcResponseCallback(或處理ChunkFetchSuccess和ChunkFetchFailure時将從outstandingFetches緩存中擷取StreamChunkId對應的ChunkReceivedCallback),并執行回調。此外,TransportClientBootstrap将可能存在于上圖中任何兩個元件的箭頭連線中間。
關于《Spark核心設計的藝術 架構設計與實作》
經過近一年的準備,《Spark核心設計的藝術 架構設計與實作》一書現已出版發行,圖書如圖:
紙質版售賣連結如下:
京東:
https://item.jd.com/12302500.html