天天看點

[源碼解析] Flink的Slot究竟是什麼?(2)[源碼解析] Flink的Slot究竟是什麼?(2)

[源碼解析] Flink的Slot究竟是什麼?(2)

文章目錄

  • [源碼解析] Flink的Slot究竟是什麼?(2)
    • 0x00 摘要
    • 0x01 前文回顧
    • 0x02 注冊/更新Slot
      • 2.1 TaskExecutor注冊成功
      • 2.2 心跳機制更新Slot狀态
    • 0x03 生成ExecutionGraph階段
    • 0x04 排程階段
    • 0x05 配置設定資源階段
      • 5.1 CompletableFuture
        • 5.1.1 Future 3
        • 6.1.2 Future 2
        • 6.1.3 Future 1
      • 5.2 流程圖
      • 5.3 具體執行路徑
    • 0x06 Deploy階段
    • 0x07 RM配置設定資源
    • 0x08 Offer資源階段
    • 0x09 Slot發揮作用
      • 9.1 部署階段
      • 9.2 運作階段
        • 9.2.1 FileInputSplit 的由來
        • 9.2.2 File Split
        • 9.2.3 Slot的使用
    • 0xFF 參考

0x00 摘要

Flink的Slot概念大家應該都聽說過,但是可能很多朋友還不甚了解其中細節,比如具體Slot究竟代表什麼?在代碼中如何實作?Slot在生成執行圖、排程、配置設定資源、部署、執行階段分别起到什麼作用?本文和上文将帶領大家一起分析源碼,為你揭開Slot背後的機理。

0x01 前文回顧

書接上回。前文 [源碼解析] Flink的Slot究竟是什麼?(1)中我們已經從系統架構和資料結構角度來分析了Slot,本文我們将從業務流程角度來分析Slot。我們重新放出系統架構圖

[源碼解析] Flink的Slot究竟是什麼?(2)[源碼解析] Flink的Slot究竟是什麼?(2)

和資料結構邏輯關系圖

[源碼解析] Flink的Slot究竟是什麼?(2)[源碼解析] Flink的Slot究竟是什麼?(2)

下面我們從幾個流程入手一一分析。

0x02 注冊/更新Slot

有兩個途徑會注冊Slot/更新Slot狀态。

  • 當TaskExecutor注冊成功之後會和RM互動進行注冊時,一并注冊Slot;
  • 定時心跳時,會在心跳payload中附加Slot狀态資訊;

2.1 TaskExecutor注冊成功

當TaskExecutor注冊成功之後會和RM互動進行注冊。會通過如下的代碼調用路徑來向ResourceManager(SlotManagerImpl)注冊Slot。SlotManagerImpl 在擷取消息之後,會更新Slot狀态,如果此時已經有如果有pendingSlotRequest,就直接配置設定,否則就更新freeSlots變量。

  • TaskExecutor#establishResourceManagerConnection;
  • TaskSlotTableImpl#createSlotReport;建立 report
    • 這時候的 report如下:
      slotReport = {SlotReport@9633} 
      
        0 = {SlotStatus@8969} "SlotStatus{slotID=40d390ec-7d52-4f34-af86-d06bb515cc48_0, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
         slotID = {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0"
         resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
         allocationID = null
         jobID = null
          
        1 = {SlotStatus@9638} "SlotStatus{slotID=40d390ec-7d52-4f34-af86-d06bb515cc48_1, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
         slotID = {SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1"
         resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
         allocationID = null
         jobID = null
                 
  • ResourceManager#sendSlotReport;通過RPC(resourceManagerGateway.sendSlotReport)調用到RM
  • SlotManagerImpl#registerTaskManager;把TaskManager注冊到SlotManager
  • SlotManagerImpl#registerSlot;
  • SlotManagerImpl#createAndRegisterTaskManagerSlot;生成注冊了TaskManagerSlot
    • 這時候代碼 & 變量如下,我們可以看到,就是把TM的Slot資訊注冊到SlotManager中:
      private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) {
         final TaskManagerSlot slot = new TaskManagerSlot(
                                          slotId, resourceProfile, taskManagerConnection);
         slots.put(slotId, slot);
         return slot;
      }
      
      slot = {TaskManagerSlot@13322} 
       slotId = {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0"
       resourceProfile = {ResourceProfile@4194} 
        cpuCores = {CPUResource@11616} "Resource(CPU: 89884656743115785...0)"
        taskHeapMemory = {MemorySize@11617} "4611686018427387903 bytes"
        taskOffHeapMemory = {MemorySize@11618} "4611686018427387903 bytes"
        managedMemory = {MemorySize@11619} "64 mb"
        networkMemory = {MemorySize@11620} "32 mb"
        extendedResources = {HashMap@11621}  size = 0
       taskManagerConnection = {WorkerRegistration@11121} 
       allocationId = null
       jobId = null
       assignedSlotRequest = null
       state = {TaskManagerSlot$State@13328} "FREE"
                 
  • SlotManagerImpl#updateSlot
  • SlotManagerImpl#updateSlotState;如果有pendingSlotRequest,就直接配置設定
  • SlotManagerImpl#handleFreeSlot;否則就更新freeSlots變量

流程結束後,SlotManager如下,可以看到此時slots個數是兩個,freeSlots也是兩個,說明都是空閑的:

this = {SlotManagerImpl@11120} 
 scheduledExecutor = {ActorSystemScheduledExecutorAdapter@11125} 
 slotRequestTimeout = {Time@11127} "300000 ms"
 taskManagerTimeout = {Time@11128} "30000 ms"
 slots = {HashMap@11122}  size = 2
  {SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1" -> {TaskManagerSlot@19206} 
  {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0" -> {TaskManagerSlot@13322} 
 freeSlots = {LinkedHashMap@11129}  size = 2
  {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0" -> {TaskManagerSlot@13322} 
  {SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1" -> {TaskManagerSlot@19206} 
 taskManagerRegistrations = {HashMap@11130}  size = 1
 fulfilledSlotRequests = {HashMap@11131}  size = 0
 pendingSlotRequests = {HashMap@11132}  size = 0
 pendingSlots = {HashMap@11133}  size = 0
 slotMatchingStrategy = {AnyMatchingSlotMatchingStrategy@11134} "INSTANCE"
 slotRequestTimeoutCheck = {ActorSystemScheduledExecutorAdapter$ScheduledFutureTask@11139} 
           

2.2 心跳機制更新Slot狀态

Flink的心跳機制也會被利用來進行Slots資訊的彙報,Slot Report被包括在心跳payload中。

首先在 TE 中建立Slot Report

  • TaskExecutor#heartbeatFromResourceManager
  • HeartbeatManagerImpl#requestHeartbeat
  • TaskExecutor$ResourceManagerHeartbeatListener # retrievePayload
  • TaskSlotTableImpl # createSlotReport

程式運作到 RM,于是 SlotManagerImpl 調用到 reportSlotStatus,進行Slot狀态更新。

  • ResourceManager#heartbeatFromTaskManager
  • HeartbeatManagerImpl#receiveHeartbeat
  • ResourceManager$TaskManagerHeartbeatListener#reportPayload
  • SlotManagerImpl#reportSlotStatus,此時的SlotReport如下:
    • slotReport = {SlotReport@8718} 
       slotsStatus = {ArrayList@8717}  size = 2
        0 = {SlotStatus@9025} "SlotStatus{slotID=d99e16d7-a30c-4e21-b270-f82884b1813f_0, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
         slotID = {SlotID@9032} "d99e16d7-a30c-4e21-b270-f82884b1813f_0"
         resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
         allocationID = null
         jobID = null
        1 = {SlotStatus@9026} "SlotStatus{slotID=d99e16d7-a30c-4e21-b270-f82884b1813f_1, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
         slotID = {SlotID@9029} "d99e16d7-a30c-4e21-b270-f82884b1813f_1"
         resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
         allocationID = null
         jobID = null
                 
  • SlotManagerImpl#updateSlot
  • SlotManagerImpl#updateSlotState;如果有pendingSlotRequest,就直接配置設定
  • SlotManagerImpl#handleFreeSlot;否則就更新freeSlots變量

0x03 生成ExecutionGraph階段

當Job送出之後,經過一系列處理,Scheduler會建立ExecutionGraph。ExecutionGraph 是 JobGraph 的并行版本。而通過一系列的分析,才可以最終把任務分發到相關的任務槽中。槽會根據CPU的數量提前指定出來,這樣可以最大限度的利用CPU的計算資源。如果Slot耗盡,也就意味着新分發的作業任務是無法執行的。

ExecutionGraph

JobManager

根據

JobGraph

生成的分布式執行圖,是排程層最核心的資料結構。

一個JobVertex / ExecutionJobVertex代表的是一個operator,而具體的ExecutionVertex則代表了一個Task。

在生成StreamGraph時候,

StreamGraph.addOperator

方法就已經确定了operator是什麼類型,比如OneInputStreamTask,或者SourceStreamTask等。

假設

OneInputStreamTask.class

即為生成的StreamNode的vertexClass。這個值會一直傳遞,當StreamGraph被轉化成JobGraph的時候,這個值會被傳遞到JobVertex的invokableClass。然後當JobGraph被轉成ExecutionGraph的時候,這個值被傳入到ExecutionJobVertex.TaskInformation.invokableClassName中,最後一直傳到Task中。

本系列代碼執行序列如下:

  • JobMaster#createScheduler
  • DefaultSchedulerFactory#createInstance
  • DefaultScheduler#init
  • SchedulerBase#init
  • SchedulerBase#createAndRestoreExecutionGraph
  • SchedulerBase#createExecutionGraph
  • ExecutionGraphBuilder#buildGraph
  • ExecutionGraph#attachJobGraph
  • ExecutionJobVertex#init,這裡根據并行度來确定要建立多少個Task,即多少個ExecutionVertex。
    • int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
      this.taskVertices = new ExecutionVertex[numTaskVertices];
                 
  • ExecutionVertex#init,這裡會生成Execution。
    • this.currentExecution = new Execution( 
      			getExecutionGraph().getFutureExecutor(),
      			this,	0, initialGlobalModVersion,	createTimestamp, timeout);
                 

0x04 排程階段

任務的流程就是通過作業分發到TaskManager,然後再分發到指定的Slot進行執行。

這部分排程階段的代碼隻是利用CompletableFuture把程式執行架構搭建起來,可以把認為是自頂之下進行操作。

Job開始排程之後,代碼執行序列如下:

  • JobMaster#startJobExecution
  • JobMaster#resetAndStartScheduler
  • Future操作
  • JobMaster#startScheduling
  • SchedulerBase#startScheduling
  • DefaultScheduler#startSchedulingInternal
  • LazyFromSourcesSchedulingStrategy#startScheduling,這裡開始針對Vertices進行資源配置設定和部署
  • LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices,這裡會周遊ExecutionVertex,篩選出Create狀态的 & 輸入Ready的節點。
    • private void allocateSlotsAndDeployExecutionVertices(
            final Iterable<? extends SchedulingExecutionVertex<?, ?>> vertices) {
         // 取出狀态是CREATED,且輸入Ready的 ExecutionVertex
      	 final Set<ExecutionVertexID> verticesToDeploy = IterableUtils.toStream(vertices)
      			.filter(IS_IN_CREATED_EXECUTION_STATE.and(isInputConstraintSatisfied()))
      			.map(SchedulingExecutionVertex::getId)
      			.collect(Collectors.toSet());
         // 根據 ExecutionVertex 建立 DeploymentOption
         final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions = ...;
         // 配置設定資源并且部署
         schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
      }
      
                 
  • DefaultScheduler#allocateSlotsAndDeploy

這裡來到了本文第一個關鍵函數 allocateSlotsAndDeploy。其主要功能是:

  1. allocateSlots配置設定Slot,其實這時候并沒有配置設定,而是建立一系列Future,然後根據Future傳回SlotExecutionVertexAssignment清單。
  2. 根據SlotExecutionVertexAssignment建立DeploymentHandle
  3. 根據deploymentHandles進行部署,其實是根據Future把部署搭建起來,具體如何部署需要在slot配置設定成功之後再執行。
@Override
public void allocateSlotsAndDeploy(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
   validateDeploymentOptions(executionVertexDeploymentOptions);

   final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex =
      groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);

   final List<ExecutionVertexID> verticesToDeploy = executionVertexDeploymentOptions.stream()
      .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
      .collect(Collectors.toList());

   final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
      executionVertexVersioner.recordVertexModifications(verticesToDeploy);

   transitionToScheduled(verticesToDeploy);

   // 配置設定Slot,其實這時候并沒有配置設定,而是建立一系列Future,然後根據Future傳回SlotExecutionVertexAssignment清單
   final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
      allocateSlots(executionVertexDeploymentOptions);

   // 根據SlotExecutionVertexAssignment建立DeploymentHandle
   final List<DeploymentHandle> deploymentHandles = createDeploymentHandles(
      requiredVersionByVertex,
      deploymentOptionsByVertex,
      slotExecutionVertexAssignments);
  
   // 根據deploymentHandles進行部署,其實是根據Future把部署搭建起來,具體如何部署需要在slot配置設定成功之後再執行
   if (isDeployIndividually()) {
      deployIndividually(deploymentHandles);
   } else {
      waitForAllSlotsAndDeploy(deploymentHandles);
   }
}


           

接下來 兩個小章節我們分别針對 allocateSlots 和 deployIndividually / waitForAllSlotsAndDeploy 進行分析。

0x05 配置設定資源階段

注意,此處的入口為 allocateSlotsAndDeploy 的allocateSlots 調用。

在配置設定slot時,首先會在JobMaster中SlotPool中進行配置設定,具體是先SlotPool中擷取所有slot,然後嘗試選擇一個最合适的slot進行配置設定,這裡的選擇有兩種政策,即按照位置優先和按照之前已配置設定的slot優先;若從SlotPool無法配置設定,則通過RPC請求向ResourceManager請求slot,若此時并未連接配接上ResourceManager,則會将請求緩存起來,待連接配接上ResourceManager後再申請。

5.1 CompletableFuture

CompletableFuture 首先是一個 Future,它擁有 Future 所有的功能,包括取得異步執行結果,取消正在執行的任務等,其次是 一個CompleteStage,其最大作用是将回調改為鍊式調用,進而将 Future 組合起來。

此處生成了執行架構,即通過三個 CompletableFuture 構成了執行架構。

我們按照出現順序命名為 Future 1,Future 2,Future 3。

但是這個反過來說明反而更友善。我們可以看到,’

出現次序是 Future 1,Future 2,Future 3

調用順序是 Future 3 —> Future 2 —> Future 1

5.1.1 Future 3

我們可以稱之為 PhysicalSlot Future

類型是:CompletableFuture

生成在:requestNewAllocatedSlot 函數中對 PendingRequest 的生成。PendingRequest 的構造函數中有 new CompletableFuture<>(),這個 Future 3 是 PendingRequest 的成員變量。

用處是:

  • PendingRequest 會 加入到 waitingForResourceManager

回調函數作用是:

  • 在 allocateMultiTaskSlot 的 whenComplete 會把payload指派給slot,allocatedSlot.tryAssignPayload
  • 進一步回調在 createRootSlot 函數 的 forward . thenApply 語句,會 設定為 Future 3 回調 Future 2 的回調函數

何時回調:

  • TM,TE offer Slot的時候,會根據 PendingRequest 間接回調到這裡

6.1.2 Future 2

我們可以稱之為 allocationFuture

類型是:

  • CompletableFuture ,CompletableFuture 有類型轉換

生成在:

  • createRootSlot函數中。final CompletableFuture slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();

用處是:

  • 把 Future 2 設定為 multiTaskSlot 的成員變量 private final CompletableFuture<? extends SlotContext> slotContextFuture;
  • Future 2 其實也就是 SingleTaskSlot 的 parent.getSlotContextFuture(),因為 multiTaskSlot 和 SingleTaskSlot 是父子關系
  • 在 SingleTaskSlot 構造函數 中,Future 2 會指派給 SingleTaskSlot 的成員變量 singleLogicalSlotFuture。
  • 即 Future 2 實際上是 SingleTaskSlot 的成員變量 singleLogicalSlotFuture
  • SchedulerImpl # allocateSharedSlot 函數,return leaf.getLogicalSlotFuture(); 會被傳回 singleLogicalSlotFuture 給外層調用,就是外層看到的 allocationFuture。

回調函數作用是:

  • 在 SingleTaskSlot 構造函數 中,會生成一個 SingleLogicalSlot(未來回調時候會真正生成 )
  • 在 internalAllocateSlot 函數中,會回調 Future 1,allocationResultFuture的回調函數

何時回調:

  • 被 Future 3 的回調函數調用

6.1.3 Future 1

我們可以稱之為 allocationResultFuture

類型是:

  • CompletableFuture

生成在:

  • SchedulerImpl#allocateSlotInternal,這裡生成了第一個 CompletableFuture

用處是:

  • 後續 Deploy 時候會用到 這個 Future 1,會通過 handle 給 Future 1 再加上兩個後續調用,是在 Future 1 結束之後的後續調用。

回調函數作用是:

  • allocateSlotsFor 函數中有錯誤處理
  • 後續 Deploy 時候會用到 這個 Future 1,會通過 handle 給 Future 1 再加上兩個後續調用,是在 Future 1 結束之後的後續調用。

何時回調:

  • 語句在internalAllocateSlot中,但是在 Future 2 回調函數中調用

5.2 流程圖

這裡比較複雜,先給出流程圖

*  Run in Job Manager
 *
 *    DefaultScheduler#allocateSlotsAndDeploy 
 *        |
 *        +----> DefaultScheduler#allocateSlots
 *        |     //把ExecutionVertex轉化為ExecutionVertexSchedulingRequirements
 *        |     
 *        +----> DefaultExecutionSlotAllocator#allocateSlotsFor( 調用 1 開始 )
 *        |     // 得到 我們的第一個 CompletableFuture,我們稱之為 Future 1
 *        |  
 *        |    
 *        +--------------> NormalSlotProviderStrategy#allocateSlot 
 *        |    
 *        |        
 *        +--------------> SchedulerImpl#allocateSlotInternal
 *        |     // 生成了第一個 CompletableFuture,以後稱之為 allocationResultFuture
 *        |  
 *  ┌────────────┐   
 *  │  Future 1  │ 生成 allocationResultFuture
 *  └────────────┘ 
 *        │     
 *        │            
 *        +----> SchedulerImpl#internalAllocateSlot( 調用 2 開始 )  
 *        |      // Future 1 做為參數被傳進來,這裡會繼續調用,生成 Future 2, Future 3
 *        |     
 *        |       
 *        +-----------> SchedulerImpl#allocateSharedSlot( 調用 3 開始 )
 *        |    	// 這裡涉及到 MultiTaskSlot 和 SingleTaskSlot
 *        |        
 *        +-----------> SchedulerImpl # allocateMultiTaskSlot ( 調用 4 開始 )
 *        |  
 *        |       
 *        +--------------------> SchedulerImpl # requestNewAllocatedSlot
 *        |    
 *        |    
 *        +--------------------> SlotPoolImpl#requestNewAllocatedSlot
 *        |    	// 這裡生成一個 PendingRequest
 *        | 	// PendingRequest的構造函數中有 new CompletableFuture<>(),
 *        |     // 是以這裡是生成了第三個 Future,注意這裡的 Future 是針對 PhysicalSlot  
 *        |  
 *        |  
 *  ┌────────────┐   
 *  │  Future 3  │ 生成 Future<PhysicalSlot>,這個 Future 3 實際是對使用者不可見的。
 *  └────────────┘      
 *        |  
 *        | 
 *        +-----------> SchedulerImpl # allocateMultiTaskSlot( 調用 4 結束 )
 *        |      // 回到 ( 調用 4 ) 這裡,得倒 Future 3
 *        |      // 這裡得倒了第三個 Future<PhysicalSlot> 
 *        |      // 第三是因為從使用者角度看,它是第三個出現的     
 *        |     
 *        +-----------------------> slotSharingManager # createRootSlot  
 *        |      // 把 Future 3 做為參數傳進去     
 *        |      // 這裡馬上生成 Future 2
 *        |      // Future 2 被設定為 multiTaskSlot 的成員變量 slotContextFuture;  
 *        |      // 然後forward . thenApply 語句 會 設定為 Future 3 回調 Future 2 的回調函數
 *        |     
 *        |       
 *        +-----------> SchedulerImpl#allocateSharedSlot
 *        |    	// 回到 ( 調用 3 ) 這裡   
 *        |     
 *        | 
 *        +-----------------------> SlotSharingManager#allocateSingleTaskSlo  
 *        |  // 在 rootMultiTaskSlot 之上生成一個 SingleTaskSlot leaf加入到allTaskSlots。    
 *        |  // leaf.getLogicalSlotFuture(); 這個就是Future 2,設定好的
 *        |   
 *        |        
 *        +-----------> SchedulerImpl#allocateSharedSlot
 *        |    	// 還在 ( 調用 3 ) 這裡  
 *        |     // return leaf.getLogicalSlotFuture(); 傳回 Future 2   
 *        | 
 *        |       
 *  ┌────────────┐   
 *  │  Future 2  │
 *  └────────────┘  
 *        |     
 *        |       
 *        |       
 *        +----> SchedulerImpl#internalAllocateSlot    
 *        |      // 回到 ( 調用 2 ) 這裡 
 *        |      // 設定,在 Future 2 的回調函數中會調用 Future 1    
 *        |  
 *        |      
 *        +----> DefaultExecutionSlotAllocator#allocateSlotsFor 
 *        |     // 回到 ( 調用 1 ) 這裡 
 *        |
 *        |  
 *        |       
 *  ┌────────────┐   
 *  │  Future 1  │ 
 *  └────────────┘    
 *        |  
 *        |      
 *        +---->  createDeploymentHandles    
 *        |  // 生成 DeploymentHandle
 *        |     
 *        |       
 *        +-----------> deployIndividually(deploymentHandles);    
 *        |           // 這裡會給 Future 1 再加上兩個 回調函數,作為 部署回調
 *        | 


           

下圖是為了手機觀看

[源碼解析] Flink的Slot究竟是什麼?(2)[源碼解析] Flink的Slot究竟是什麼?(2)

5.3 具體執行路徑

預設情況下,Flink 允許subtasks共享slot,條件是它們都來自同一個Job的不同task的subtask。結果可能一個slot持有該job的整個pipeline。允許slot共享有以下兩點好處:

  • Flink 叢集所需的task slots數與job中最高的并行度一緻。也就是說我們不需要再去計算一個程式總共會起多少個task了。
  • 更容易獲得更充分的資源利用。如果沒有slot共享,那麼非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享,将基線的2個并行度增加到6個,能充分利用slot資源,同時保證每個TaskManager能平均配置設定到重的subtasks。

此處執行路徑大緻如下:

  • DefaultScheduler#allocateSlotsAndDeploy
  • DefaultScheduler#allocateSlots;該過程會把ExecutionVertex轉化為ExecutionVertexSchedulingRequirements,會封裝包含一些location資訊、sharing資訊、資源資訊等
  • DefaultExecutionSlotAllocator#allocateSlotsFor;我們小節實際是從這裡開始分析,這裡會進行一系列操作,一層層調用下去。首先這個函數會得到我們的第一個 CompletableFuture,我們稱之為 allocationResultFuture,這個名字的由來後續就會知道。這個 slotFuture 會指派給 SlotExecutionVertexAssignment,然後傳遞給外面。後續 Deploy 時候會用到 這個 slotFuture,會通過 handle 給 slotFuture 再加上兩個後續調用,是在slotFuture結束之後的後續調用。
    • public List<SlotExecutionVertexAssignment> allocateSlotsFor(...) {
      		for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
            
            // 得到第一個 CompletableFuture,具體是在 calculatePreferredLocations 中通過 
      			CompletableFuture<LogicalSlot> slotFuture = 
              calculatePreferredLocations(...).thenCompose(...) ->
      								slotProviderStrategy.allocateSlot( // 函數裡面生成了第一個CompletableFuture
      									slotRequestId,
      									new ScheduledUnit(...),
      									SlotProfile.priorAllocation(...)));
      
      			SlotExecutionVertexAssignment slotExecutionVertexAssignment =
      					new SlotExecutionVertexAssignment(executionVertexId, slotFuture);
      
      			slotFuture.whenComplete(
      					(ignored, throwable) -> { // 第一個CompletableFuture的回調函數,裡其實隻是異常處理,後續有人會調用到這裡
      						pendingSlotAssignments.remove(executionVertexId);
      						if (throwable != null) {
      							slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable);
      						}
      					});
      
      			slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
      		}
      
      		return slotExecutionVertexAssignments;
      }
      
      
                 
  • NormalSlotProviderStrategy#allocateSlot(slotProviderStrategy.allocateSlot)
  • SchedulerImpl#allocateSlotInternal,這裡生成了第一個 CompletableFuture,我們可以稱之為 allocationResultFuture
    • private CompletableFuture<LogicalSlot> allocateSlotInternal(...) {
          // 這裡生成了第一個 CompletableFuture,我們以後稱之為 allocationResultFuture
      		final CompletableFuture<LogicalSlot> allocationResultFuture = new CompletableFuture<>();
          // allocationResultFuture 會傳送進去繼續處理
      		internalAllocateSlot(allocationResultFuture, slotRequestId, scheduledUnit, 
                               slotProfile, allocationTimeout);
          // 傳回 allocationResultFuture
      		return allocationResultFuture;
      }
      
      
                 
  • SchedulerImpl#allocateSlot
  • SchedulerImpl#internalAllocateSlot,該方法會根據vertex是否共享slot來配置設定singleSlot/SharedSlot。這裡得到第二個 CompletableFuture,我們以後成為 allocationFuture
    • private void internalAllocateSlot(
      			CompletableFuture<LogicalSlot> allocationResultFuture, ...) {
        	// 這裡得到第二個 CompletableFuture,我們以後稱為 allocationFuture,注意目前隻是得到,不是生成。
      		CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
      			allocateSingleSlot(slotRequestId, slotProfile, allocationTimeout) :
      			allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout);
      		// 第二個Future,allocationFuture的回調函數。注意,CompletableFuture可以連續調用多個whenComplete。
      		allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
      			if (failure != null) { // 異常處理
      				cancelSlotRequest(...);
      				allocationResultFuture.completeExceptionally(failure);
      			} else {
      				allocationResultFuture.complete(slot); // 它将回調第一個 allocationResultFuture的回調函數
      			}
      		});
      }
      
      
                 
  • SchedulerImpl#allocateSharedSlot,這裡也比較複雜,涉及到 MultiTaskSlot 和 SingleTaskSlot
    • private CompletableFuture<LogicalSlot> allocateSharedSlot(...) {
       		// allocate slot with slot sharing
       		final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
       			scheduledUnit.getSlotSharingGroupId(),
       			id -> new SlotSharingManager(id,slotPool,this)); // 生成 SlotSharingManager
       
       		final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
       
       			if (scheduledUnit.getCoLocationConstraint() != null) {
       				multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(...);
       			} else {
       				multiTaskSlotLocality = allocateMultiTaskSlot(...); // 這裡生成 MultiTaskSlot
       			}
       
         	// 這裡生成 SingleTaskSlot
       		final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(...);
         
       		return leaf.getLogicalSlotFuture(); // 傳回 SingleTaskSlot 的 future,就是第二個Future,具體生成我們在下面會詳述
       	}
      
      
                 
  • SchedulerImpl # allocateMultiTaskSlot,這裡是一個難點函數。因為這裡生成了第三個 Future ,這裡把第三個 Future 提前說明,第三是因為從使用者角度看,它是第三個出現的。
    • private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(...) {
       
       		SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId);
       
       		if (multiTaskSlot == null) { 
            // requestNewAllocatedSlot 會調用 SlotPoolImpl 的同名函數
            // 得到第 三 個 Future,注意,這個 Future 針對的是 PhysicalSlot
       			final CompletableFuture<PhysicalSlot> slotAllocationFuture = requestNewAllocatedSlot(...); 
       
           // 使用 第 三 個 Future 來建構 multiTaskSlot
       			multiTaskSlot = slotSharingManager.createRootSlot(...,slotAllocationFuture,...);
       
           // 第 三 個 Future的回調函數,這裡會把payload指派給slot
       			slotAllocationFuture.whenComplete(
       				(PhysicalSlot allocatedSlot, Throwable throwable) -> {
       					final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId);
       
       					if (taskSlot != null) {
                   // 會把payload指派給slot
       							if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) {...}
       					} 
       				});
       		}
       
       		return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN);
       	}
      
      
      
                 
  • SchedulerImpl # requestNewAllocatedSlot 會調用 SlotPoolImpl 的同名函數
  • SlotPoolImpl#requestNewAllocatedSlot,這裡生成一個 PendingRequest
    • public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(...) {
       
       		// 生成 PendingRequest
       		final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);
       
       		// 添加 PendingRequest 到 waitingForResourceManager,然後傳回Future
       		return requestNewAllocatedSlotInternal(pendingRequest)
       			.thenApply((Function.identity()));
       	}
      
      
                 
    • PendingRequest的構造函數中有 new CompletableFuture<>(),是以這裡是生成了第三個 Future,注意這裡的 Future 是針對 PhysicalSlot
    • requestNewAllocatedSlotInternal
      • private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {
        
           if (resourceManagerGateway == null) {
              // 就是把 pendingRequest 加到 waitingForResourceManager 之中
              stashRequestWaitingForResourceManager(pendingRequest);
           } else {
              requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
           }
           return pendingRequest.getAllocatedSlotFuture(); // 第三個Future
        }
        
        
                   
  • SlotSharingManager#createRootSlot,這裡才是生成 第二個 Future 的地方
    • MultiTaskSlot createRootSlot(
            SlotRequestId slotRequestId,
            CompletableFuture<? extends SlotContext> slotContextFuture, // 參數是第三個Future
            SlotRequestId allocatedSlotRequestId) {
      
         // 生成第二個Future<SlotContext>
         final CompletableFuture<SlotContext> slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();
        
         final MultiTaskSlot rootMultiTaskSlot = createAndRegisterRootSlot(...
            slotContextFutureAfterRootSlotResolution); // 第二個Future 在 createAndRegisterRootSlot 函數中 被指派為 MultiTaskSlot的 slotContextFuture 成員變量
      
         FutureUtils.forward(
            slotContextFuture.thenApply( // 第三個Future進一步回調時候,會回調第二個Future
               (SlotContext slotContext) -> {
                  // add the root node to the set of resolved root nodes once the SlotContext future has
                  // been completed and we know the slot's TaskManagerLocation
                  tryMarkSlotAsResolved(slotRequestId, slotContext);
                  return slotContext;
               }),
            slotContextFutureAfterRootSlotResolution); // 在這裡回調第二個Future
      
         return rootMultiTaskSlot;
      }
      
      
                 
  • SlotSharingManager#allocateSingleTaskSlot,這裡的目的是在 rootMultiTaskSlot 之上生成一個 SingleTaskSlot leaf加入到allTaskSlots。
    • SingleTaskSlot allocateSingleTaskSlot(
      				SlotRequestId slotRequestId, ResourceProfile resourceProfile,
      				AbstractID groupId, Locality locality) {
      
      			final SingleTaskSlot leaf = new SingleTaskSlot(
      				slotRequestId, resourceProfile, groupId, this, locality);
      
      			children.put(groupId, leaf);
      
      			// register the newly allocated slot also at the SlotSharingManager
      			allTaskSlots.put(slotRequestId, leaf);
      
      			reserveResource(resourceProfile);
      
      			return leaf;
      }
      
      
                 
  • 最後回到 SchedulerImpl # allocateSharedSlot 函數,return leaf.getLogicalSlotFuture(); 這裡也是一個難點,即 getLogicalSlotFuture 傳回的是一個 CompletableFuture(就是第二個 Future),但是這個 SingleLogicalSlot 是未來回調時候才會生成。
    • public final class SingleTaskSlot extends TaskSlot {
      		private final MultiTaskSlot parent;
         // future containing a LogicalSlot which is completed once the underlying SlotContext future is completed
      		private final CompletableFuture<SingleLogicalSlot> singleLogicalSlotFuture;
        
          private SingleTaskSlot() {
                singleLogicalSlotFuture = parent.getSlotContextFuture()
                  .thenApply(
                    (SlotContext slotContext) -> {
                      return new SingleLogicalSlot( // 未來回調時候才會生成
                        slotRequestId,
                        slotContext,
                        slotSharingGroupId,
                        locality,
                        slotOwner);
                    });
              }
      
          CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
             return singleLogicalSlotFuture.thenApply(Function.identity());
          }  
      }
      
      
                 

0x06 Deploy階段

注意,此處的入口為 allocateSlotsAndDeploy函數中 的 deployIndividually / waitForAllSlotsAndDeploy 語句。

此處執行路徑大緻如下:

  • DefaultScheduler#allocateSlotsAndDeploy
  • DefaultScheduler#allocateSlots;得到 SlotExecutionVertexAssignment 清單,上節已經詳細介紹(該過程會ExecutionVertex轉化為ExecutionVertexSchedulingRequirements,會封裝包含一些location資訊、sharing資訊、資源資訊等)
  • List deploymentHandles = createDeploymentHandles() 根據SlotExecutionVertexAssignment建立DeploymentHandle
  • DefaultScheduler#deployIndividually 根據deploymentHandles進行部署,其實是根據Future把部署搭建起來,具體如何部署需要在slot配置設定成功之後再執行。我們小節實際是從這裡開始分析,具體代碼可以看出,取出了 Future 1 進行一些列操作。
    • private void deployIndividually(final List<DeploymentHandle> deploymentHandles) {
         for (final DeploymentHandle deploymentHandle : deploymentHandles) {
            FutureUtils.assertNoException(
               deploymentHandle
                  .getSlotExecutionVertexAssignment()
                  .getLogicalSlotFuture()
                  .handle(assignResourceOrHandleError(deploymentHandle))
                  .handle(deployOrHandleError(deploymentHandle)));
         }
      }
      
      
                 
  • DefaultScheduler#assignResourceOrHandleError;就是傳回函數,以備後續回調使用
    • private BiFunction<LogicalSlot, Throwable, Void> assignResourceOrHandleError(final DeploymentHandle deploymentHandle) {
        
         final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
         final ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId();
      
         return (logicalSlot, throwable) -> {
            if (throwable == null) {
               final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
               final boolean sendScheduleOrUpdateConsumerMessage = deploymentHandle.getDeploymentOption().sendScheduleOrUpdateConsumerMessage();
               executionVertex
                  .getCurrentExecutionAttempt()
                  .registerProducedPartitions(logicalSlot.getTaskManagerLocation(), sendScheduleOrUpdateConsumerMessage);
               executionVertex.tryAssignResource(logicalSlot);
            } else {
               handleTaskDeploymentFailure(executionVertexId, maybeWrapWithNoResourceAvailableException(throwable));
            }
            return null;
         };
      }
      
      
                 
  • deployOrHandleError 就是傳回函數,以備後續回調使用
    • private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandle deploymentHandle) {
        
         final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
         final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
      
         return (ignored, throwable) -> {
            if (throwable == null) {
               deployTaskSafe(executionVertexId);
            } else {
               handleTaskDeploymentFailure(executionVertexId, throwable);
            }
            return null;
         };
      }
      
      
                 

0x07 RM配置設定資源

之前的工作基本都是在 JM 之中。通過 Scheduler 和 SlotPool 來完成申請資源和部署階段。目前 SlotPool 之中已經積累了一個 PendingRequest,等 SlotPool 連接配接上 RM,就可以開始向 RM 申請資源了。

當ResourceManager收到申請slot請求時,若發現該JobManager未注冊,則直接抛出異常;否則将請求轉發給SlotManager處理,SlotManager中維護了叢集所有空閑的slot(TaskManager會向ResourceManager上報自己的資訊,在ResourceManager中由SlotManager儲存Slot和TaskManager對應關系),并從其中找出符合條件的slot,然後向TaskManager發送RPC請求申請對應的slot。

代碼執行路徑如下:

  • JobMaster # establishResourceManagerConnection 程式執行在 JM 之中
  • SlotPoolImpl # connectToResourceManager
  • SlotPoolImpl # requestSlotFromResourceManager,這裡 Pool 會向 RM 進行 RPC 請求。
    • private void requestSlotFromResourceManager(
      			final ResourceManagerGateway resourceManagerGateway,
      			final PendingRequest pendingRequest) {
          // 生成一個 AllocationID,這個會傳到 TM 那裡,注冊到 TaskSlot上。
          final AllocationID allocationId = new AllocationID();
          // 生成一個SlotRequest,并且向 RM 進行 RPC 請求。
      	CompletableFuture<Acknowledge> rmResponse = 
              				resourceManagerGateway.requestSlot(
                                      jobMasterId,
                                      new SlotRequest(jobId, allocationId, 
                                           		pendingRequest.getResourceProfile(), 
                                      jobManagerAddress),
                                      rpcTimeout);
      }
      
                 
  • RPC
  • ResourceManager # requestSlot 程式切換到 RM 之中
  • SlotManagerImpl # registerSlotRequest。registerSlotRequest方法會先執行checkDuplicateRequest判斷是否有重複,沒有重複的話,則将該slotRequest維護到pendingSlotRequests,然後調用internalRequestSlot進行配置設定,如果出現異常則從pendingSlotRequests中異常,然後抛出SlotManagerException。
    • pendingSlotRequests.put
      
                 
  • SlotManagerImpl # internalRequestSlot
  • SlotManagerImpl # findMatchingSlot
  • SlotManagerImpl # internalAllocateSlot,此時是沒有資源的,需要向 TM 要求資源
    • private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
         final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
         OptionalConsumer.of(findMatchingSlot(resourceProfile))
            .ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
            .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
      }
      
                 
  • SlotManagerImpl # allocateSlot,向task manager要求資源。TaskExecutorGateway接口用來通過RPC配置設定任務槽,或者說配置設定任務的資源。
    • TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
      CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
      			slotId,
      			pendingSlotRequest.getJobId(),
      			allocationId,
      			pendingSlotRequest.getResourceProfile(),
      			pendingSlotRequest.getTargetAddress(),
      			resourceManagerId,
      			taskManagerRequestTimeout);
      
                 
  • RPC
  • TaskExecutor # requestSlot,程式切換到 TE
  • TaskSlotTableImpl # allocateSlot,配置設定資源,更新task slot map,把slot加入到 set of job slots 中。
    • public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId,
      			ResourceProfile resourceProfile,Time slotTimeout) {
          taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId);
          taskSlots.put(index, taskSlot);
          allocatedSlots.put(allocationId, taskSlot);
          slots.add(allocationId);
      }
      
      
                 

0x08 Offer資源階段

此階段是由 TE,TM 開始,就是TE 向 RM 提供 Slot,然後 RM 通知 JM 可以運作 Job。也可以認為這部分是從底向上的執行。

等待所有的slot申請完成後,然後會将ExecutionVertex對應的Execution配置設定給對應的Slot,即從Slot中配置設定對應的資源給Execution,完成配置設定後可開始部署作業。

這裡兩個關鍵點是:

  • 當 JM 收到 SlotOffer時候,就會根據 RPC傳遞過來的 taskManagerId 參數,建構一個 taskExecutorGateway,然後這個 taskExecutorGateway 被賦予為 AllocatedSlot . taskManagerGateway。這樣就把 JM 範疇的 Slot 和 Slot 所在的 taskManager 聯系起來。
  • Execution 部署時候,是 從 SingleLogicalSlot —> AllocatedSlot —> TaskManagerGateway 這個順序擷取了 TaskManager 的 RPC 網關,然後通過 taskManagerGateway.submitTask 才能送出任務的。這樣就把 Execution 部署階段和執行階段聯系起來了。
---------- Task Executor ----------
       │ 
       │ 
┌─────────────┐   
│  TaskSlot   │  requestSlot
└─────────────┘     
       │ 
       │                  
┌──────────────┐   
│  SlotOffer   │  offerSlotsToJobManager
└──────────────┘       
       │ 
       │      
------------- Job Manager -------------
       │ 
       │       
┌──────────────┐   
│  SlotOffer   │  JobMaster#offerSlots(taskManagerId,slots)
└──────────────┘     
       │ //taskManager = registeredTaskManagers.get(taskManagerId);     
       │ //taskManagerLocation = taskManager.f0;     
       │ //taskExecutorGateway = taskManager.f1;     
       │    
       │       
┌──────────────┐   
│  SlotOffer   │  SlotPoolImpl#offerSlots
└──────────────┘       
       │ 
       │      
┌───────────────┐   
│ AllocatedSlot │  SlotPoolImpl#offerSlot
└───────────────┘      
       │ 
       │      
┌───────────────┐   
│ 回調 Future 3  │ SlotSharingManager#createRootSlot
└───────────────┘      
       │ 
       │      
┌───────────────┐   
│ 回調 Future 2  │  SingleTaskSlot#SingleTaskSlot 
└───────────────┘      
       │ 
       │      
┌───────────────────┐   
│ SingleLogicalSlot │ new SingleLogicalSlot
└───────────────────┘    
       │ 
       │     
┌───────────────────┐   
│ SingleLogicalSlot │  
│ 回調 Future 1      │ allocationResultFuture.complete()
└───────────────────┘   
       │    
       │        
┌───────────────────────────────┐  
│     SingleLogicalSlot         │  
│回調 assignResourceOrHandleError│ 
└───────────────────────────────┘
       │    
       │        
┌────────────────┐   
│ ExecutionVertex│ tryAssignResource
└────────────────┘    
       │    
       │        
┌────────────────┐   
│    Execution   │ tryAssignResource
└────────────────┘       
       │    
       │        
┌──────────────────┐   
│ SingleLogicalSlot│ tryAssignPayload
└──────────────────┘  
       │    
       │        
┌───────────────────────┐   
│   SingleLogicalSlot   │      
│ 回調deployOrHandleError│ 
└───────────────────────┘   
       │    
       │       
┌────────────────┐   
│ ExecutionVertex│ deploy
└────────────────┘    
       │    
       │        
┌────────────────┐   
│    Execution   │ deploy // 關鍵點
└────────────────┘        
       │  
       │    
       │        
 ---------- Task Executor ----------
       │    
       │     
┌────────────────┐   
│  TaskExecutor  │ submitTask
└────────────────┘     
       │    
       │        
┌────────────────┐   
│  TaskExecutor  │ startTaskThread
└────────────────┘         

           

執行路徑如下:

  • TaskExecutor # establishJobManagerConnection
  • TaskExecutor # offerSlotsToJobManager,這裡就是周遊已經配置設定的TaskSlot,然後每個TaskSlot會生成一個SlotOffer(裡面是allocationId,slotIndex,resourceProfile),這個會通過RPC發給 JM。
    • private void offerSlotsToJobManager(final JobID jobId) {
      				final Iterator<TaskSlot<Task>> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
      				final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
      
      				final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
      
      				while (reservedSlotsIterator.hasNext()) {
      					SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
      					reservedSlots.add(offer);
      				}
          			// 把 SlotOffer 通過RPC發給 JM
      				CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = 
                          jobMasterGateway.offerSlots(
                                  getResourceID(),
                                  reservedSlots,
                                  taskManagerConfiguration.getTimeout());    
      }
      
      
                 
  • RPC
  • JobMaster # offerSlots 。程式執行到 JM。當 JM 收到 SlotOffer時候,就會根據 RPC傳遞過來的 taskManagerId 參數,建構一個 taskExecutorGateway,然後這個 taskExecutorGateway 被賦予為 AllocatedSlot . taskManagerGateway。這樣就把 JM 範疇的 Slot 和 Slot 所在的 taskManager 聯系起來。
    • public CompletableFuture<Collection<SlotOffer>> offerSlots(
      			final ResourceID taskManagerId,
      			final Collection<SlotOffer> slots,
      			final Time timeout) {
      
      		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
      
      		final TaskManagerLocation taskManagerLocation = taskManager.f0;
      		final TaskExecutorGateway taskExecutorGateway = taskManager.f1;
      
      		final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken());
      
      		return CompletableFuture.completedFuture(
      			slotPool.offerSlots(
      				taskManagerLocation,
      				rpcTaskManagerGateway,
      				slots));
      	}
      
      
                 
  • SlotPoolImpl # offerSlots
  • SlotPoolImpl # offerSlot,這裡根據 SlotOffer 的資訊生成一個 AllocatedSlot,對于 AllocatedSlot 來說,有效資訊就是 slotIndex, resourceProfile。提醒,AllocatedSlot implements PhysicalSlot。
    • boolean offerSlot(
      			final TaskManagerLocation taskManagerLocation,
      			final TaskManagerGateway taskManagerGateway,
      			final SlotOffer slotOffer) {
              
          // 根據 SlotOffer 的資訊生成一個 AllocatedSlot,對于 AllocatedSlot 來說,有效資訊就是 slotIndex, resourceProfile
      	final AllocatedSlot allocatedSlot = new AllocatedSlot(
             allocationID,
             taskManagerLocation,
             slotOffer.getSlotIndex(),
             slotOffer.getResourceProfile(),
             taskManagerGateway);
          
          allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
      		if (pendingRequest != null) {
      			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
      
            // 這裡取出了 pendingRequest 的 Future, 就是我們之前的 Future 3,進行回調
      			if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) 
                  {
      				// we could not complete the pending slot future --> try to fulfill another pending request
      				allocatedSlots.remove(pendingRequest.getSlotRequestId());
      				tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
      			} 
      		}
      }
      
      
                 
  • 開始回調 Future 3,代碼在 SlotSharingManager # createRootSlot 這裡
    • FutureUtils.forward(
         slotContextFuture.thenApply(
            (SlotContext slotContext) -> {
               // add the root node to the set of resolved root nodes once the SlotContext future has
               // been completed and we know the slot's TaskManagerLocation
               tryMarkSlotAsResolved(slotRequestId, slotContext); // 運作到這裡
               return slotContext;
            }),
         slotContextFutureAfterRootSlotResolution); // 然後到這裡
      
      
                 
  • 開始回調 Future 2,代碼在 SingleTaskSlot 構造函數 ,因為有 PhysicalSlot extends SlotContext, 是以這裡就把 實體Slot 映射成了一個 邏輯Slot
    • singleLogicalSlotFuture = parent.getSlotContextFuture()
         .thenApply(
            (SlotContext slotContext) -> {
               return new SingleLogicalSlot( // 回調生成了 SingleLogicalSlot
                  slotRequestId,
                  slotContext,
                  slotSharingGroupId,
                  locality,
                  slotOwner);
            });
      
      
                 
  • 開始回調 Future 1,代碼在這裡,調用到 後續 Deploy 時候設定的回調函數。
    • allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
         if (failure != null) {
            cancelSlotRequest(
               slotRequestId,
               scheduledUnit.getSlotSharingGroupId(),
               failure);
            allocationResultFuture.completeExceptionally(failure);
         } else {
            allocationResultFuture.complete(slot); // 代碼在這裡
         }
      });
      
      
                 
  • 繼續回調到 Deploy 階段設定的回調函數 assignResourceOrHandleError,就是配置設定資源
    • private BiFunction<LogicalSlot, Throwable, Void> assignResourceOrHandleError(final DeploymentHandle deploymentHandle) {
       
       		return (logicalSlot, throwable) -> {
       			if (executionVertexVersioner.isModified(requiredVertexVersion)) {
       
       			if (throwable == null) {
       				final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
       				final boolean sendScheduleOrUpdateConsumerMessage = deploymentHandle.getDeploymentOption().sendScheduleOrUpdateConsumerMessage();
       				executionVertex
       					.getCurrentExecutionAttempt()
       					.registerProducedPartitions(logicalSlot.getTaskManagerLocation(), sendScheduleOrUpdateConsumerMessage);
       				executionVertex.tryAssignResource(logicalSlot); // 運作到這裡
       			} 
       			return null;
       		};
       	}
      
      
                 
    • 回調函數會深入調用 executionVertex.tryAssignResource,
    • ExecutionVertex # tryAssignResource
    • Execution # tryAssignResource
    • SingleLogicalSlot# tryAssignPayload(this),這裡會把 Execution 自己 指派給Slot.payload,最後 Execution 在 runtime 的變量舉例如下:
      • payload = {Execution@10669} "Attempt #0 (CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:47) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:64)) -> Combine (SUM(1), at main(WordCount.java:67) (1/1)) @ [email protected]c7928f - [SCHEDULED]"
         executor = {ScheduledThreadPoolExecutor@5928} "[email protected][Running, pool size = 3, active threads = 0, queued tasks = 1, completed tasks = 2]"
         vertex = {ExecutionVertex@10534} "CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:47) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:64)) -> Combine (SUM(1), at main(WordCount.java:67) (1/1)"
         attemptId = {ExecutionAttemptID@10792} "2f8b6c7297527225ee4c8036c457ba27"
         globalModVersion = 1
         stateTimestamps = {long[9]@10793} 
         attemptNumber = 0
         rpcTimeout = {Time@5924} "18000000 ms"
         partitionInfos = {ArrayList@10794}  size = 0
         terminalStateFuture = {CompletableFuture@10795} "[email protected][Not completed]"
         releaseFuture = {CompletableFuture@10796} "[email protected][Not completed]"
         taskManagerLocationFuture = {CompletableFuture@10797} "[email protected][Not completed]"
         state = {ExecutionState@10789} "SCHEDULED"
         assignedResource = {SingleLogicalSlot@10507} 
         failureCause = null
         taskRestore = null
         assignedAllocationID = null
         accumulatorLock = {Object@10798} 
         userAccumulators = null
         ioMetrics = null
         producedPartitions = {LinkedHashMap@10799}  size = 1
        
        
                   
  • 繼續回調到 Deploy 階段設定的回調函數 deployOrHandleError,就是部署
    • private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandle deploymentHandle) {
      
         return (ignored, throwable) -> {
            if (executionVertexVersioner.isModified(requiredVertexVersion)) {
      
            if (throwable == null) {
               deployTaskSafe(executionVertexId); // 在這裡部署
            } else {
               handleTaskDeploymentFailure(executionVertexId, throwable);
            }
            return null;
         };
      }
      
      
                 
    • 回調函數深入調用其他函數
    • DefaultScheduler # deployTaskSafe
    • ExecutionVertex # deploy
    • Execution # deploy。每次排程ExecutionVertex,都會有一個Execution,在此階段會将Execution的狀态變更為DEPLOYING狀态,并且為該ExecutionVertex生成對應的部署描述資訊,然後從對應的slot中擷取對應的TaskManagerGateway,以便向對應的TaskManager送出Task。其中,ExecutionVertex.createDeploymentDescriptor方法中,包含了從Execution Graph到真正實體執行圖的轉換。如将IntermediateResultPartition轉化成ResultPartition,ExecutionEdge轉成InputChannelDeploymentDescriptor(最終會在執行時轉化成InputGate)。
      • // 這裡一個關鍵點是:Execution 部署時候,是 從 SingleLogicalSlot ---> AllocatedSlot ---> TaskManagerGateway 這個順序擷取了 TaskManager 的 RPC 網關,然後通過 taskManagerGateway.submitTask 才能送出任務的。這樣就把 Execution 部署階段和執行階段聯系起來了
        public void deploy() throws JobException {
        			final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory
        			.fromExecutionVertex(vertex, attemptNumber)
        				.createDeploymentDescriptor(
        				slot.getAllocationId(),
        					slot.getPhysicalSlotNumber(),
        					taskRestore,
        					producedPartitions.values());
          
            	// 這裡就是關鍵點
          		final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
          
              // 在這裡通過RPC送出task給了TaskManager
          		CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor).thenCompose(Function.identity())
        }
        
        
                   
  • TaskExecutor # submitTask, 程式執行到 TE,這就是正式執行了。TaskManager(TaskExecutor)在接收到送出Task的請求後,會經過一些初始化(如從BlobServer拉取檔案,反序列化作業和Task資訊、LibaryCacheManager等),然後這些初始化的資訊會用于生成Task(Runnable對象),然後啟動該Task,其代碼調用路徑如下 Task#startTaskThread(啟動Task線程)-> Task#run(将ExecutionVertex狀态變更為RUNNING狀态,此時在FLINK web前台檢視頂點狀态會變更為RUNNING狀态,另外還會生成了一個AbstractInvokable對象,該對象是FLINK銜接執行使用者代碼的關鍵。
    • // 這個方法會建立真正的Task,然後調用task.startTaskThread();開始task的執行。
      public CompletableFuture<Acknowledge> submitTask(
            TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
        		// taskSlot.getMemoryManager(); 會擷取slot的記憶體管理器,這裡就是分割記憶體的部分功能
        		memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
        		// 在Task構造函數中,會根據輸入的參數,建立InputGate, ResultPartition, ResultPartitionWriter等。
      			Task task = new Task(
      				jobInformation,
      				taskInformation,
      				tdd.getExecutionAttemptId(),
      				tdd.getAllocationId(),
      				tdd.getSubtaskIndex(),
      				tdd.getAttemptNumber(),
      				tdd.getProducedPartitions(),
      				tdd.getInputGates(),
      				tdd.getTargetSlotNumber(),
      				memoryManager,
      				taskExecutorServices.getIOManager(),
      				taskExecutorServices.getShuffleEnvironment(),
      				taskExecutorServices.getKvStateService(),
      				taskExecutorServices.getBroadcastVariableManager(),
      				taskExecutorServices.getTaskEventDispatcher(),
      				taskStateManager,
      				taskManagerActions,
      				inputSplitProvider,
      				checkpointResponder,
      				aggregateManager,
      				blobCacheService,
      				libraryCache,
      				fileCache,
      				taskManagerConfiguration,
      				taskMetricGroup,
      				resultPartitionConsumableNotifier,
      				partitionStateChecker,
      				getRpcService().getExecutor());
        
            taskAdded = taskSlotTable.addTask(task);
        		task.startTaskThread();
      }
      
      
                 
    • 開始了線程了。而

      startTaskThread

      方法,則會執行

      executingThread.start

      ,進而調用

      Task.run

      方法。
      • public void startTaskThread() {
           executingThread.start();
        }
        
        
                   
  • 最後會執行到 Task,就是調用使用者代碼。這裡的invokable即為operator對象執行個體,通過反射建立。具體地,即為OneInputStreamTask,或者SourceStreamTask等。以OneInputStreamTask為例,Task的核心執行代碼即為

    OneInputStreamTask.invoke

    方法,它會調用

    StreamTask.run

    方法,這是個抽象方法,最終會調用其派生類的run方法,即OneInputStreamTask, SourceStreamTask等。
    • // 這裡的invokable即為operator對象執行個體,通過反射建立。
      private void doRun() {
      	AbstractInvokable invokable = null;
      	invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
      	// run the invokable
        invokable.invoke();
      }
      
      
                 
  • tryFulfillSlotRequestOrMakeAvailable

0x09 Slot發揮作用

有人可能有一個疑問:Slot配置設定之後,在運作時候怎麼發揮作用呢?

這裡我們就用WordCount示例來看看。

示例代碼就是WordCount。隻不過做了一些配置:

  • taskmanager.numberOfTaskSlots 是為了設定有幾個taskmanager。
  • 其他是為了調試,加長了心跳時間或者逾時時間。
public class WordCount {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.setString("heartbeat.timeout", "18000000");
        conf.setString("resourcemanager.job.timeout", "18000000");
        conf.setString("resourcemanager.taskmanager-timeout", "18000000");
        conf.setString("slotmanager.request-timeout", "18000000");
        conf.setString("slotmanager.taskmanager-timeout", "18000000");
        conf.setString("slot.request.timeout", "18000000");
        conf.setString("slot.idle.timeout", "18000000");
        conf.setString("akka.ask.timeout", "18000000");
        conf.setString("taskmanager.numberOfTaskSlots", "1");

        final LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text = null;
        if (params.has("input")) {
            // union all the inputs from text files
            for (String input : params.getMultiParameterRequired("input")) {
                if (text == null) {
                    text = env.readTextFile(input);
                } else {
                    text = text.union(env.readTextFile(input));
                }
            }
        } else {
            // get default test text data
            text = WordCountData.getDefaultTextLineDataSet(env);
        }

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output"), "\n", " ");
            env.execute("WordCount Example");
        } else {
            counts.print();
        }
    }

    // *************************************************************************
    //     USER FUNCTIONS
    // *************************************************************************
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

           

9.1 部署階段

這裡 Slot 起到了一個承接作用,把具體送出部署和執行階段聯系起來。

前面提到,當TE 送出一個Slot之後,RM會在這個Slot上送出Task。具體邏輯如下:

每次排程ExecutionVertex,都會有一個Execution。在 Execution # deploy 函數中。

  • 會将Execution的狀态變更為DEPLOYING狀态,并且為該ExecutionVertex生成對應的部署描述資訊。其中,ExecutionVertex.createDeploymentDescriptor方法中,包含了從Execution Graph到真正實體執行圖的轉換。
    • 如将IntermediateResultPartition轉化成ResultPartition
    • ExecutionEdge轉成InputChannelDeploymentDescriptor(最終會在執行時轉化成InputGate)。
  • 然後從對應的slot中擷取對應的TaskManagerGateway,以便向對應的TaskManager送出Task。這裡一個關鍵點是:Execution 部署時候,是 從 SingleLogicalSlot —> AllocatedSlot —> TaskManagerGateway 這個順序擷取了 TaskManager 的 RPC 網關。
  • 最後通過 taskManagerGateway.submitTask 送出 Task。

具體代碼如下:

// 這裡一個關鍵點是:Execution 部署時候,是 從 SingleLogicalSlot ---> AllocatedSlot ---> TaskManagerGateway 這個順序擷取了 TaskManager 的 RPC 網關,然後通過 taskManagerGateway.submitTask 才能送出任務的。這樣就把 Execution 部署階段和執行階段聯系起來了
public void deploy() throws JobException {
			final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory
			.fromExecutionVertex(vertex, attemptNumber)
				.createDeploymentDescriptor(
				slot.getAllocationId(),
					slot.getPhysicalSlotNumber(),
					taskRestore,
					producedPartitions.values());

   	// 這裡就是關鍵點
  		final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

      // 在這裡通過RPC送出task給了TaskManager
  		CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor).thenCompose(Function.identity())
}


           

9.2 運作階段

這裡僅以Split為例子說明,Slot在其中也起到了連接配接作用,使用者從Slot中可以得到其 TaskManager 的host,然後Split會根據這個host繼續操作。

當 Source 讀取輸入之後,可能涉及到分割輸入,Flink就會進行輸入分片的切分。

9.2.1 FileInputSplit 的由來

Flink 一般把檔案按并行度拆分成FileInputSplit的個數,當然并不是完全有幾個并行度就生成幾個FileInputSplit對象,根據具體算法得到,但是FileInputSplit個數,一定是(并行度個數,或者并行度個數+1)。因為計算FileInputSplit個數時,參照物是檔案大小 / 并行度 ,如果沒有餘數,剛好整除,那麼FileInputSplit個數一定是并行度,如果有餘數,FileInputSplit個數就為是(并行度個數,或者并行度個數+1)。

Flink在生成階段,會把JobVertex 轉化為ExecutionJobVertex,調用new ExecutionJobVertex(),ExecutionJobVertex中存了inputSplits,是以會根據并行并來計算inputSplits的個數。

在 ExecutionJobVertex 構造函數中有如下代碼,這些代碼作用是生成 InputSplit,指派到 ExecutionJobVertex 的成員變量 inputSplits 中,這樣就知道了從哪裡得倒 Split:

// set up the input splits, if the vertex has any
try {
			InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();

			if (splitSource != null) {
				try {
					inputSplits = splitSource.createInputSplits(numTaskVertices);
					if (inputSplits != null) {
						splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
					}
				} 
}
            
// 此時splitSource如下:
splitSource = {CollectionInputFormat@7603} "[To be, or not to be,--that is the question:--, Whether 'tis nobler in the mind to suffer, The slings and arrows of outrageous fortune, ...]"
 serializer = {StringSerializer@7856} 
 dataSet = {ArrayList@7857}  size = 35
 iterator = null
 partitionNumber = 0
 runtimeContext = null    

           

9.2.2 File Split

這裡以網上文章Flink-1.10.0中的readTextFile解讀内容為例,給大家看看檔案切片大緻流程。當然他介紹的是Stream類型。

readTextFile分成兩個階段,一個Source,一個Split Reader。這兩個階段可以分為多個線程,不一定是2個線程。因為Split Reader的并行度時根據配置檔案或者啟動參數來決定的。

Source的執行流程如下,Source的是用來建構輸入切片的,不做資料的讀取操作。這裡是按照本地運作模式整理的。

Task.run()
 |-- invokable.invoke()
 |    |-- StreamTask.invoke()
 |    |    |-- beforeInvoke()
 |    |    |    |-- init()
 |    |    |    |    |-- SourceStreamTask.init()
 |    |    |    |-- initializeStateAndOpen()
 |    |    |    |    |-- operator.initializeState()
 |    |    |    |    |-- operator.open()
 |    |    |    |    |    |-- SourceStreamTask.LegacySourceFunctionThread.run()
 |    |    |    |    |    |    |-- StreamSource.run()
 |    |    |    |    |    |    |    |-- userFunction.run(ctx)
 |    |    |    |    |    |    |    |    |-- ContinuousFileMonitoringFunction.run()
 |    |    |    |    |    |    |    |    |    |-- RebalancePartitioner.selectChannel()
 |    |    |    |    |    |    |    |    |    |-- RecordWriter.emit()


           

Split Reader的代碼執行流程如下:

Task.run()
 |-- invokable.invoke()
 |    |-- StreamTask.invoke()
 |    |    |-- beforeInvoke()
 |    |    |    |-- init()
 |    |    |    |    |--OneInputStreamTask.init()
 |    |    |    |-- initializeStateAndOpen()
 |    |    |    |    |-- operator.initializeState()
 |    |    |    |    |    |-- ContinuousFileReaderOperator.initializeState()
 |    |    |    |    |-- operator.open()
 |    |    |    |    |    |-- ContinuousFileReaderOperator.open()
 |    |    |    |    |    |    |-- ContinuousFileReaderOperator.SplitReader.run()
 |    |    |-- runMailboxLoop()
 |    |    |    |-- StreamTask.processInput()
 |    |    |    |    |-- StreamOneInputProcessor.processInput()
 |    |    |    |    |    |-- StreamTaskNetworkInput.emitNext() while循環不停的處理輸入資料
 |    |    |    |    |    |    |-- ContinuousFileReaderOperator.processElement()
 |    |    |-- afterInvoke()    


           

9.2.3 Slot的使用

針對本文示例,我們重點介紹Slot在其中的使用。

調用路徑如下:

  • DataSourceTask # invoke,此時運作在 TE
  • DataSourceTask # hasNext
  • RpcInputSplitProvider # getNextInputSplit
  • RPC
  • 來到 JM
  • JobMaster # requestNextInputSplit
  • SchedulerBase # requestNextInputSplit,這裡會從 executionGraph 擷取 Execution,然後從 Execution 擷取 InputSplit
    • public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
      
      		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
      
      		final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
      
      		final InputSplit nextInputSplit = execution.getNextInputSplit();
      
      		final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
              
      		return new SerializedInputSplit(serializedInputSplit);
      }
      
                 
    • 這裡 execution.getNextInputSplit() 就會調用 Slot,可以看到,這裡先擷取Slot,然後從Slot擷取其 TaskManager 的host。再從 Vertiex 擷取 InputSplit。
      • public InputSplit getNextInputSplit() {
        		final LogicalSlot slot = this.getAssignedResource();
        		final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
        		return this.vertex.getNextInputSplit(host);
        }
        
                   
      • public InputSplit getNextInputSplit(String host) {
        		final int taskId = getParallelSubtaskIndex();
        		synchronized (inputSplits) {
        			final InputSplit nextInputSplit = jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
        			if (nextInputSplit != null) {
        				inputSplits.add(nextInputSplit);
        			}
        			return nextInputSplit;
        		}
        }
        
        // runtime 資訊如下
        inputSplits = {GenericInputSplit[1]@13113} 
         0 = {GenericInputSplit@13121} "GenericSplit (0/1)"
          partitionNumber = 0
          totalNumberOfPartitions = 1
        
                   
  • 回到 SchedulerBase # requestNextInputSplit,傳回 return new SerializedInputSplit(serializedInputSplit);
  • RPC
  • 傳回 算子 Task,TE,擷取到了 InputSplit,就可以繼續處理輸入。
    • final InputSplit split = splitIterator.next();
      final InputFormat<OT, InputSplit> format = this.format;			
      // open input format
      // open還沒開始真正的讀資料,隻是定位,設定目前切片資訊(切片的開始位置,切片長度),和定位開始位置。把第一個換行符,分到前一個分片,自己從第二個換行符開始讀取資料
      format.open(split);
      
                 

0xFF 參考

一文了解 Apache Flink 的資源管理機制

Flink5:Flink運作架構(Slot和并行度)

Flink Slot詳解與Job Execution Graph優化

聊聊flink的slot.request.timeout配置

Apache Flink 源碼解析(三)Flink on Yarn (2) Resource Manager

Flink on Yarn模式下的TaskManager個數

Flink on YARN時,如何确定TaskManager數

Flink】Flink作業排程流程分析

Flink原理與實作:如何生成ExecutionGraph及實體執行圖

Flink源碼走讀(一):Flink工程目錄

flink分析使用之七任務的啟動

flink源碼解析3 ExecutionGraph的形成與實體執行

Flink 内部原理之作業與排程

Flink之使用者代碼生成排程層圖結構

3. Flink Slot申請

Flink 任務和排程

Flink的Slot是如何做到平均劃分TM記憶體的?

Flink-1.10.0中的readTextFile解讀

Flink1.7.2 Dataset 檔案切片計算方式和切片資料讀取源碼分析

flink任務送出流程分析

Flink Parallelism和Slot了解

★★★★★★關于生活和技術的思考★★★★★★

微信公衆賬号:羅西的思考

如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,可以掃描下面二維碼(或者長按識别二維碼)關注個人公衆号)。

[源碼解析] Flink的Slot究竟是什麼?(2)[源碼解析] Flink的Slot究竟是什麼?(2)

繼續閱讀