天天看點

spark2.1.0之源碼分析——RPC用戶端TransportClient詳解

版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/beliefer/article/details/82143001

提示:閱讀本文前最好先閱讀:

  1. 《Spark2.1.0之内置RPC架構》
  2. 《spark2.1.0之源碼分析——RPC配置TransportConf》
  3. 《spark2.1.0之源碼分析——RPC用戶端工廠TransportClientFactory》
  4. spark2.1.0之源碼分析——RPC伺服器TransportServer》
  5. 《spark2.1.0之源碼分析——RPC管道初始化》
  6. spark2.1.0之源碼分析——RPC傳輸管道處理器詳解
  7. spark2.1.0之源碼分析——服務端RPC處理器RpcHandler詳解
  8. spark2.1.0之源碼分析——RPC服務端引導程式TransportServerBootstrap

         在《

》一文曾介紹過服務端RpcHandler對請求消息的處理,現在來看看用戶端發送RPC請求的原理。我們在分析

中列出的代碼清單2中的createChannelHandler方法時,看到調用了TransportClient的構造器(見代碼清單1),其中TransportResponseHandler的引用将賦給handler屬性。

代碼清單1         TransportClient的構造器

public TransportClient(Channel channel, TransportResponseHandler handler) {
    this.channel = Preconditions.checkNotNull(channel);
    this.handler = Preconditions.checkNotNull(handler);
    this.timedOut = false;
  }
           

TransportClient一共有五個方法用于發送請求,分别為:

  1. fetchChunk:從遠端協商好的流中請求單個塊;
  2. stream:使用流的ID,從遠端擷取流資料;
  3. sendRpc:向服務端發送RPC的請求,通過At least Once Delivery原則保證請求不會丢失;
  4. sendRpcSync:向服務端發送異步的RPC的請求,并根據指定的逾時時間等待響應;
  5. send:向服務端發送RPC的請求,但是并不期望能擷取響應,因而不能保證投遞的可靠性;

本節隻選擇最常用的sendRpc和fetchChunk進行分析,其餘實作都可以觸類旁通。

發送RPC請求

         sendRpc方法的實作見代碼清單2。

代碼清單2         sendRpc的實作

public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
    final long startTime = System.currentTimeMillis();
    if (logger.isTraceEnabled()) {
      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    }
    // 使用UUID生成請求主鍵requestId
    final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    handler.addRpcRequest(requestId, callback);// 添加requestId與RpcResponseCallback的引用之間的關系
    // 發送RPC請求
    channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            long timeTaken = System.currentTimeMillis() - startTime;
            if (logger.isTraceEnabled()) {
              logger.trace("Sending request {} to {} took {} ms", requestId,
                getRemoteAddress(channel), timeTaken);
            }
          } else {
            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
              getRemoteAddress(channel), future.cause());
            logger.error(errorMsg, future.cause());
            handler.removeRpcRequest(requestId);
            channel.close();
            try {
              callback.onFailure(new IOException(errorMsg, future.cause()));
            } catch (Exception e) {
              logger.error("Uncaught exception in RPC response callback handler!", e);
            }
          }
        }
      });

    return requestId;
  }
           

結合代碼清單2,我們知道sendRpc方法的實作步驟如下:

  1. 使用UUID生成請求主鍵requestId;
  2. 調用addRpcRequest向handler(特别提醒下讀者這裡的handler不是RpcHandler,而是通過TransportClient構造器傳入的TransportResponseHandler)添加requestId與回調類RpcResponseCallback的引用之間的關系。TransportResponseHandler的addRpcRequest方法(見代碼清單3)将更新最後一次請求的時間為目前系統時間,然後将requestId與RpcResponseCallback之間的映射加入到outstandingRpcs緩存中。outstandingRpcs專門用于緩存發出的RPC請求資訊。
  3. 調用Channel的writeAndFlush方法将RPC請求發送出去,這和在《 》一文列出的代碼清單7中服務端調用的respond方法響應用戶端的一樣,都是使用Channel的writeAndFlush方法。當發送成功或者失敗時會回調ChannelFutureListener的operationComplete方法。如果發送成功,那麼隻會列印requestId、遠端位址及花費時間的日志,如果發送失敗,除了列印錯誤日志外,還要調用TransportResponseHandler的removeRpcRequest方法(見代碼清單4)将此次請求從outstandingRpcs緩存中移除。

代碼清單3        添加RPC請求到緩存

public void addRpcRequest(long requestId, RpcResponseCallback callback) {
    updateTimeOfLastRequest();
    outstandingRpcs.put(requestId, callback);
  }
           

代碼清單4         從緩存中删除RPC請求

public void removeRpcRequest(long requestId) {
    outstandingRpcs.remove(requestId);
  }
           

請求發送成功後,用戶端将等待接收服務端的響應。根據

一文的圖1,傳回的消息也會傳遞給TransportChannelHandler的channelRead方法(見《

》一文的代碼清單1),根據之前的分析,消息的分析将最後交給TransportResponseHandler的handle方法來處理。TransportResponseHandler的handle方法分别對《

》一文的圖2中的

六種

ResponseMessage進行處理,由于服務端使用processRpcRequest方法(見《

》一文的代碼清單4)處理RpcRequest類型的消息後傳回給用戶端的消息為RpcResponse或RpcFailure,是以我們來看看用戶端的TransportResponseHandler的handle方法是如何處理RpcResponse和RpcFailure,見代碼清單5。

代碼清單5         RpcResponse和RpcFailure消息的處理

} else if (message instanceof RpcResponse) {
      RpcResponse resp = (RpcResponse) message;
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);// 擷取RpcResponseCallback
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.body().size());
      } else {
        outstandingRpcs.remove(resp.requestId);
        try {
          listener.onSuccess(resp.body().nioByteBuffer());
        } finally {
          resp.body().release();
        }
      }
    } else if (message instanceof RpcFailure) {
      RpcFailure resp = (RpcFailure) message;
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); // 擷取RpcResponseCallback
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.errorString);
      } else {
        outstandingRpcs.remove(resp.requestId);
        listener.onFailure(new RuntimeException(resp.errorString));
      }
           

從代碼清單5看到,處理RpcResponse的邏輯為:

  1. 使用RpcResponse對應的RpcRequest的主鍵requestId,從outstandingRpcs緩存中擷取注冊的RpcResponseCallback,此處的RpcResponseCallback即為代碼清單2中傳遞給sendRpc方法的RpcResponseCallback;
  2. 移除outstandingRpcs緩存中requestId和RpcResponseCallback的注冊資訊;
  3. 調用RpcResponseCallback的onSuccess方法,處理成功響應後的具體邏輯。這裡的RpcResponseCallback需要各個使用TransportClient的sendRpc方法的場景中分别實作;
  4. 最後釋放RpcResponse的body,回收資源。

處理RpcFailure的邏輯為:

  1. 使用RpcFailure對應的RpcRequest的主鍵requestId,從outstandingRpcs緩存中擷取注冊的RpcResponseCallback,此處的RpcResponseCallback即為代碼清單2中傳遞給sendRpc方法的RpcResponseCallback;
  2. 調用RpcResponseCallback的onFailure方法,處理失敗響應後的具體邏輯。這裡的RpcResponseCallback需要在使用TransportClient的sendRpc方法時指定或實作。

發送擷取塊請求

         fetchChunk的實作見代碼清單6。

代碼清單6         fetchChunk的實作

public void fetchChunk(
      long streamId,
      final int chunkIndex,
      final ChunkReceivedCallback callback) {
    final long startTime = System.currentTimeMillis();
    if (logger.isDebugEnabled()) {
      logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
    }

    final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);// 建立StreamChunkId
    // 添加StreamChunkId與ChunkReceivedCallback之間的對應關系
    handler.addFetchRequest(streamChunkId, callback);
    // 發送塊請求
    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            long timeTaken = System.currentTimeMillis() - startTime;
            if (logger.isTraceEnabled()) {
              logger.trace("Sending request {} to {} took {} ms", streamChunkId,
                getRemoteAddress(channel), timeTaken);
            }
          } else {
            String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
              getRemoteAddress(channel), future.cause());
            logger.error(errorMsg, future.cause());
            handler.removeFetchRequest(streamChunkId);
            channel.close();
            try {
              callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
            } catch (Exception e) {
              logger.error("Uncaught exception in RPC response callback handler!", e);
            }
          }
        }
      });
  }
           

結合代碼清單6,我們知道fetchChunk方法的實作步驟如下:

  1. 使用流的标記streamId和塊的索引chunkIndex建立StreamChunkId;
  2. 調用addFetchRequest向handler(特别提醒下讀者這裡的handler不是RpcHandler,而是通過TransportClient構造器傳入的TransportResponseHandler)添加StreamChunkId與回調類ChunkReceivedCallback的引用之間的關系。TransportResponseHandler的addFetchRequest方法(見代碼清單7)将更新最後一次請求的時間為目前系統時間,然後将StreamChunkId與ChunkReceivedCallback之間的映射加入到outstandingFetches緩存中。outstandingFetches專門用于緩存發出的塊請求資訊。
  3. 調用Channel的writeAndFlush方法将塊請求發送出去,這和在《 》一文列出的代碼清單7中服務端調用的respond方法響應用戶端的一樣,都是使用Channel的writeAndFlush方法。當發送成功或者失敗時會回調ChannelFutureListener的operationComplete方法。如果發送成功,那麼隻會列印StreamChunkId、遠端位址及花費時間的日志,如果發送失敗,除了列印錯誤日志外,還要調用TransportResponseHandler的removeFetchRequest方法(見代碼清單8)将此次請求從outstandingFetches緩存中移除。

代碼清單7         添加塊請求到緩存

public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
    updateTimeOfLastRequest();
    outstandingFetches.put(streamChunkId, callback);
  }
           

代碼清單8         從緩存中删除塊請求

public void removeFetchRequest(StreamChunkId streamChunkId) {
    outstandingFetches.remove(streamChunkId);
  }           

》一文的圖2中的六種處理結果進行處理,由于服務端使用processFetchRequest方法(見《

》一文的代碼清單3)處理ChunkFetchRequest類型的消息後傳回給用戶端的消息為ChunkFetchSuccess或ChunkFetchFailure,是以我們來看看用戶端的TransportResponseHandler的handle方法是如何處理ChunkFetchSuccess和ChunkFetchFailure,見代碼清單9

代碼清單9         ChunkFetchSuccess和ChunkFetchFailure消息的處理

if (message instanceof ChunkFetchSuccess) {
      ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
      if (listener == null) {
        logger.warn("Ignoring response for block {} from {} since it is not outstanding",
          resp.streamChunkId, getRemoteAddress(channel));
        resp.body().release();
      } else {
        outstandingFetches.remove(resp.streamChunkId);
        listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
        resp.body().release();
      }
    } else if (message instanceof ChunkFetchFailure) {
      ChunkFetchFailure resp = (ChunkFetchFailure) message;
      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
      if (listener == null) {
        logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
          resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
      } else {
        outstandingFetches.remove(resp.streamChunkId);
        listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
          "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
      }
    }
           

從代碼清單9看到,處理ChunkFetchSuccess的邏輯為:

  1. 使用ChunkFetchSuccess對應的StreamChunkId,從outstandingFetches緩存中擷取注冊的ChunkReceivedCallback,此處的ChunkReceivedCallback即為代碼清單6中傳遞給fetchChunk方法的ChunkReceivedCallback;
  2. 移除outstandingFetches緩存中StreamChunkId和ChunkReceivedCallback的注冊資訊;
  3. 調用ChunkReceivedCallback的onSuccess方法,處理成功響應後的具體邏輯。這裡的ChunkReceivedCallback需要各個使用TransportClient的fetchChunk方法的場景中分别實作;
  4. 最後釋放ChunkFetchSuccess的body,回收資源。

處理ChunkFetchFailure的邏輯為:

  1. 使用ChunkFetchFailure對應的StreamChunkId,從outstandingFetches緩存中擷取注冊的ChunkReceivedCallback,此處的ChunkReceivedCallback即為代碼清單6中傳遞給fetchChunk方法的ChunkReceivedCallback;
  2. 調用ChunkReceivedCallback的onFailure方法,處理失敗響應後的具體邏輯。這裡的ChunkReceivedCallback需要各個使用TransportClient的fetchChunk方法的場景中分别實作。

在詳細介紹了TransportClient和TransportResponseHandler之後,對于用戶端我們就可以擴充

一文的圖1,把TransportResponseHandler及TransportClient的處理流程增加進來,如下圖所示。

用戶端請求、響應流程圖

上圖中的序号①表示調用TransportResponseHandler的addRpcRequest方法(或addFetchRequest方法)将更新最後一次請求的時間為目前系統時間,然後将requestId與RpcResponseCallback之間的映射加入到outstandingRpcs緩存中(或将StreamChunkId與ChunkReceivedCallback之間的映射加入到outstandingFetches緩存中)。②表示調用Channel的writeAndFlush方法将RPC請求發送出去。圖中的虛線表示當TransportResponseHandler處理RpcResponse和RpcFailure時将從outstandingRpcs緩存中擷取此請求對應的RpcResponseCallback(或處理ChunkFetchSuccess和ChunkFetchFailure時将從outstandingFetches緩存中擷取StreamChunkId對應的ChunkReceivedCallback),并執行回調。此外,TransportClientBootstrap将可能存在于上圖中任何兩個元件的箭頭連線中間。

關于《Spark核心設計的藝術 架構設計與實作》

經過近一年的準備,《Spark核心設計的藝術 架構設計與實作》一書現已出版發行,圖書如圖:

紙質版售賣連結如下:

京東:

https://item.jd.com/12302500.html

繼續閱讀