天天看點

Flink 流量控制及反壓流量控制

流量控制

         Flink在兩個Task之間建立Netty連接配接進行資料傳輸,每一個Task會配置設定兩個緩沖池,一個用于輸出資料,一個用于接收資料。當一個Task的緩沖池用盡之後,網絡連接配接就處于阻塞狀态,上遊Task無法産出資料,下遊Task無法接收資料,也就是我們所說的“反壓”狀态。這是一種非常自然的“反壓”的機制,但是過程也相對比較粗暴。由于TaskManager之間的網絡連接配接是由不同Task複用的,一旦網絡處于阻塞狀态,所有Task都無法向TCP連接配接中寫入資料或者從中讀取資料,即便其它Task關聯的緩沖池仍然存在空餘。此外,由于網絡發生了阻塞,諸如CheckpointBarrier等事件也無法在Task之間進行流轉。

Flink V1.5版之前網絡流控介紹

       在Flink V1.5版之前,其實Flink并沒有刻意做上述所說的動态回報。那麼問題來了,沒有做上述的動态回報機制,Flink難道不怕資料丢失或者上遊和下遊的一些Buffer把記憶體撐爆嗎?當然不怕了,因為Flink已經依賴其他機制來實作了所謂的動态回報。如下圖所示,對于一個Flink任務,動态回報可以抽象成以下兩個階段:

Flink 流量控制及反壓流量控制

1、跨Task,動态回報如何從下遊Task的Receive Buffer回報給上遊Task的Send Buffer

  • 當下遊 Task C 的 Receive Buffer 滿了,如何告訴上遊 Task B 應該降低資料發送速率
  • 當下遊 Task C 的 Receive Buffer 空了,如何告訴上遊 Task B 應該提升資料發送速率
  • 注:這裡又分了兩種情況,Task B 和 Task C 可能在同一台節點上運作,也有可能不在同一個台節點運作
  • Task B 和 Task C 在同一台節點上運作指的是:一台節點運作了一個或多個 TaskManager,包含了多個 Slot,Task B 和 Task C 都運作在這台節點上,且 Task B 是 Task C 的上遊,給 Task C 發送資料。此時 Task B 給 Task C 發送資料實際上是同一個 JVM 内的資料發送,是以不存在網絡通信
  • Task B 和 Task C 不在同一台節點上運作指的是:Task B 和 Task C 運作在不同的 TaskManager 中,且 Task B 是 Task C 的上遊,給 Task C 發送資料。此時 Task B 給 Task C 發送資料是跨節點的,是以會存在網絡通信

2、Task内,動态回報如何從内部的 Send Buffer 回報給内部的 Receive Buffer

  • 當 Task B 的 Send Buffer 滿了,如何告訴 Task B 内部的 Receive Buffer 下遊 Send Buffer 滿了、下遊處理性能不行了?因為要讓 Task B 的 Receive Buffer 感受到壓力,才能把下遊的壓力傳遞到 Task A
  • 當 Task B 的 Send Buffer 空了,如何告訴 Task B 内部的 Receive Buffer 下遊 Send Buffer 空了,下遊處理性能很強,上遊加快處理資料吧

1、跨TaskManager,反壓如何向上遊傳播

先了解一下Flink的TaskManager之間網絡傳輸的資料流向:

Flink 流量控制及反壓流量控制

        圖中,我們可以看到 TaskManager A 給 TaskManager B 發送資料,TaskManager A 做為 Producer,TaskManager B 做為 Consumer。Producer 端的 Operator 執行個體會産生資料,最後通過網絡發送給 Consumer 端的 Operator 執行個體。Producer 端 Operator 執行個體生産的資料首先緩存到 TaskManager 内部的 NetWork Buffer。NetWork 依賴 Netty 來做通信,Producer 端的 Netty 内部有 ChannelOutbound Buffer,Consumer 端的 Netty 内部有 ChannelInbound Buffer。Netty 最終還是要通過 Socket 發送網絡請求,Socket 這一層也會有 Buffer,Producer 端有 Send Buffer,Consumer 端有 Receive Buffer。

        總結一下,現在有兩個 TaskManager A、B,TaskManager A 中 Producer Operator 處理完的資料由 TaskManager B 中 Consumer Operator 處理。那麼 Producer Operator 處理完的資料是怎麼到達 Consumer Operator 的?首先 Producer Operator 從自己的上遊或者外部資料源讀取到資料後,對一條條的資料進行處理,處理完的資料首先輸出到 Producer Operator 對應的 NetWork Buffer 中。Buffer 寫滿或者逾時後,就會觸發将 NetWork Buffer 中的資料拷貝到 Producer 端 Netty 的 ChannelOutbound Buffer,之後又把資料拷貝到 Socket 的 Send Buffer 中,這裡有一個從使用者态拷貝到核心态的過程,最後通過 Socket 發送網絡請求,把 Send Buffer 中的資料發送到 Consumer 端的 Receive Buffer。資料到達 Consumer 端後,再依次從 Socket 的 Receive Buffer 拷貝到 Netty 的 ChannelInbound Buffer,再拷貝到 Consumer Operator 的 NetWork Buffer,最後 Consumer Operator 就可以讀到資料進行處理了。這就是兩個 TaskManager 之間的資料傳輸過程,我們可以看到發送方和接收方各有三層的 Buffer。

        了解了資料傳輸流程,我們再具體了解一下跨 TaskManager 的反壓過程,如下圖所示,Producer 端生産資料速率為 2,Consumer 消費資料速率為 1。持續下去,下遊消費較慢,Buffer 容量又是有限的,那 Flink 反壓是怎麼做的?

Flink 流量控制及反壓流量控制

        上面介紹後,我們知道每個 Operator 計算資料時,輸出和輸入都有對應的 NetWork Buffer,這個 NetWork Buffer 對應到 Flink 就是圖中所示的 ResultSubPartition 和 InputChannel。ResultSubPartition 和 InputChannel 都是向 LocalBufferPool 申請 Buffer 空間,然後 LocalBufferPool 再向 NetWork BufferPool 申請記憶體空間。這裡,NetWork BufferPool 是 TaskManager 内所有 Task 共享的 BufferPool,TaskManager 初始化時就會向堆外記憶體申請 NetWork BufferPool。LocalBufferPool 是每個 Task 自己的 BufferPool,假如一個 TaskManager 内運作着 5 個 Task,那麼就會有 5 個 LocalBufferPool,但 TaskManager 内永遠隻有一個 NetWork BufferPool。Netty 的 Buffer 也是初始化時直接向堆外記憶體申請記憶體空間。雖然可以申請,但是必須明白記憶體申請肯定是有限制的,不可能無限制的申請,我們在啟動任務時可以指定該任務最多可能申請多大的記憶體空間用于 NetWork Buffer。

        繼續分析該場景, Producer 端生産資料速率為2,Consumer 端消費資料速率為1。資料從 Task A 的 ResultSubPartition 按照上面的流程最後傳輸到 Task B 的 InputChannel 供 Task B 讀取并計算。持續一段時間後,由于 Task B 消費比較慢,導緻 InputChannel 被占滿了,是以 InputChannel 向 LocalBufferPool 申請新的 Buffer 空間,LocalBufferPool 配置設定給 InputChannel 一些 Buffer。

Flink 流量控制及反壓流量控制

        再持續一段時間後,InputChannel 重複向 LocalBufferPool 申請 Buffer 空間,導緻 LocalBufferPool 也滿了,是以 LocalBufferPool 向 NetWork BufferPool 申請 Buffer 空間,NetWork BufferPool 給 LocalBufferPool 配置設定 Buffer。

Flink 流量控制及反壓流量控制

        再持續下去,NetWork BufferPool 滿了,或者說 NetWork BufferPool 不能把自己的 Buffer 全配置設定給 Task B 對應的 LocalBufferPool ,因為 TaskManager 上一般會運作了多個 Task,每個 Task 隻能使用 NetWork BufferPool 中的一部分。是以,可以認為 Task B 把自己可以使用的 InputChannel 、 LocalBufferPool 和 NetWork BufferPool 都用完了。此時 Netty 還想把資料寫入到 InputChannel,但是發現 InputChannel 滿了,是以 Socket 層會把 Netty 的 autoRead disable,Netty 不會再從 Socket 中去讀消息。可以看到下圖中多個 ❌,表示 Buffer 已滿,資料已經不能往下遊寫了,發生了阻塞。

Flink 流量控制及反壓流量控制

        由于 Netty 不從 Socket 的 Receive Buffer 讀資料了,是以很快 Socket 的 Receive Buffer 就會變滿,TCP 的 Socket 通信有動态回報的流控機制,會把容量為0的消息回報給上遊發送端,是以上遊的 Socket 就不會往下遊再發送資料 。

Flink 流量控制及反壓流量控制

Task A 持續生産資料,發送端 Socket 的 Send Buffer 很快被打滿,是以 Task A 端的 Netty 也會停止往 Socket 寫資料。

Flink 流量控制及反壓流量控制

        接下來,資料會在 Netty 的 Buffer 中緩存資料,但 Netty 的 Buffer 是無界的。但可以設定 Netty 的高水位,即:設定一個 Netty 中 Buffer 的上限。是以每次 ResultSubPartition 向 Netty 中寫資料時,都會檢測 Netty 是否已經到達高水位,如果達到高水位就不會再往 Netty 中寫資料,防止 Netty 的 Buffer 無限制的增長。

Flink 流量控制及反壓流量控制

        接下來,資料會在 Task A 的 ResultSubPartition 中累積,ResultSubPartition 滿了後,會向 LocalBufferPool 申請新的 Buffer 空間,LocalBufferPool 配置設定給 ResultSubPartition 一些 Buffer。

Flink 流量控制及反壓流量控制

持續下去 LocalBufferPool 也會用完,LocalBufferPool 再向 NetWork BufferPool 申請 Buffer。

Flink 流量控制及反壓流量控制

        然後 NetWork BufferPool 也會用完,或者說 NetWork BufferPool 不能把自己的 Buffer 全配置設定給 Task A 對應的 LocalBufferPool ,因為 TaskManager 上一般會運作了多個 Task,每個 Task 隻能使用 NetWork BufferPool 中的一部分。此時,Task A 已經申請不到任何的 Buffer 了,Task A 的 Record Writer 輸出就被 wait ,Task A 不再生産資料。

Flink 流量控制及反壓流量控制

        通過上述的這個流程,來動态回報,保障各個 Buffer 都不會因為資料太多導緻記憶體溢出。上面描述了整個阻塞的流程,當下遊 Task B 持續消費,Buffer 的可用容量會增加,所有被阻塞的資料通道會被一個個打開,之後 Task A 又可以開始正常的生産資料了。

        之前介紹,Task 之間的資料傳輸可能存在上遊的 Task A 和下遊的 Task B 運作在同一台節點的情況,整個流程與上述類似,隻不過由于 Task A 和 B 運作在同一個 JVM,是以不需要網絡傳輸的環節,Task B 的 InputChannel 會直接從 Task A 的 ResultSubPartition 讀取資料。

2、Task内部,反壓如何向上遊傳播

       假如 Task A 的下遊所有 Buffer 都占滿了,那麼 Task A 的 Record Writer 會被 block,Task A 的 Record Reader、Operator、Record Writer 都屬于同一個線程,是以 Task A 的 Record Reader 也會被 block。

Flink 流量控制及反壓流量控制

        然後可以把這裡的 Task A 類比成上面所說的 Task B,Task A 上遊持續高速率發送資料到 Task A 就會導緻可用的 InputChannel、 LocalBufferPool 和 NetWork BufferPool 都會被用完。然後 Netty 、Socket 同理将壓力傳輸到 Task A 的上遊。

Flink 流量控制及反壓流量控制

       假設 Task A 的上遊是 Task X,那麼 Task A 将壓力回報給 Task X 的過程與 Task B 将壓力回報給 Task A 的過程是一樣的。整個 Flink 的反壓是從下遊往上遊傳播的,一直傳播到 Source Task,Source Task 有壓力後,會降低從外部元件中讀取資料的速率,例如:Source Task 會降低從 Kafka 中讀取資料的速率,來降低整個 Flink Job 中緩存的資料,進而降低負載。

Flink V1.5版之前的反壓政策存在的問題

         看着挺完美的反壓機制,其實是有問題的。如下圖所示,我們的任務有4個 SubTask,SubTask A 是 SubTask B的上遊,即 SubTask A 給 SubTask B 發送資料。Job 運作在兩個 TaskManager中, TaskManager 1 運作着 SubTask A.1 和 SubTask A.2, TaskManager 2 運作着 SubTask B.3 和 SubTask B.4。現在假如由于CPU共享或者記憶體緊張或者磁盤IO瓶頸造成 SubTask B.4 遇到瓶頸、處理速率有所下降,但是上遊源源不斷地生産資料,是以導緻 SubTask A.2 與 SubTask B.4 産生反壓。

Flink 流量控制及反壓流量控制

        這裡需要明确一點:不同 Job 之間的每個(遠端)網絡連接配接将在 Flink 的網絡堆棧中獲得自己的TCP通道。但是,如果同一 Task 的不同 SubTask 被安排到同一個TaskManager,則它們與其他 TaskManager 的網絡連接配接将被多路複用并共享一個TCP信道以減少資源使用。例如,圖中的 A.1 -> B.3、A.1 -> B.4、A.2 -> B.3、A.2 -> B.4 這四條将會多路複用共享一個 TCP 信道。

        現在 SubTask B.3 并沒有壓力,從上面跨 TaskManager 的反壓流程,我們知道當上圖中 SubTask A.2 與 SubTask B.4 産生反壓時,會把 TaskManager1 端該任務對應 Socket 的 Send Buffer 和 TaskManager2 端該任務對應 Socket 的 Receive Buffer 占滿,多路複用的 TCP 通道已經被占住了,會導緻 SubTask A.1 和 SubTask A.2 要發送給 SubTask B.3 的資料全被阻塞了,進而導緻本來沒有壓力的 SubTask B.3 現在接收不到資料了。是以,Flink 1.5 版之前的反壓機制會存在當一個 Task 出現反壓時,可能導緻其他正常的 Task 接收不到資料。

Credit的反壓政策實作原理

         為了解決上述問題,Flink1.5重構了網絡棧,引入了“基于信用值的流量控制算法”(Credit-basedFlowControl),確定TaskManager之間的網絡連接配接始終不會處于阻塞狀态。Credit-basedFlowControl的思路其實也比較簡單,它是在接收端和發送端之間建立一種類似“信用評級”的機制,發送端向接收端發送的資料永遠不會超過接收端的信用值的大小。在Flink這裡,信用值就是接收端可用的Buffer的數量,這樣就可以保證發送端不會向TCP連接配接中發送超出接收端緩沖區可用容量的資料。相比于之前所有的InputChannel共享同一個本地緩沖池的方式,在重構網絡棧之後,Flink會為每一個InputChannel配置設定一批獨占的緩沖(exclusivebuffers),而本地緩沖池中的buffer則作為流動的(floatingbuffers),可以被所有的InputChannel使用。

Credit-basedFlowControl的具體機制為:

  1. 接收端向發送端聲明可用的Credit(一個可用的buffer對應一點credit)(服務端接受端可用的buffer值);
  2. 當發送端獲得了X點Credit,表明它可以向網絡中發送X個buffer;當接收端配置設定了X點Credit給發送端,表明它有X個空閑的buffer可以接收資料;
  3. 隻有在Credit>0的情況下發送端才發送buffer;發送端每發送一個buffer,Credit也相應地減少一點
  4. 由于CheckpointBarrier,EndOfPartitionEvent等事件可以被立即處理,因而事件可以立即發送,無需使用Credit
  5. 當發送端發送buffer的時候,它同樣把目前堆積的buffer數量(backlogsize)告知接收端;接收端根據發送端堆積的數量來申請floatingbuffer

基本示例如下:         

         如下圖所示,反壓機制作用于 Flink 的應用層,即在 ResultSubPartition 和 InputChannel 這一層引入了反壓機制。每次上遊 SubTask A.2 給下遊 SubTask B.4 發送資料時,會把 Buffer 中的資料和上遊 ResultSubPartition 堆積的資料量 Backlog size發給下遊,下遊會接收上遊發來的資料,并向上遊回報目前下遊現在的 Credit 值,Credit 值表示目前下遊可以接收上遊的 Buffer 量,1 個Buffer 等價于 1 個 Credit 。

Flink 流量控制及反壓流量控制

        例如,上遊 SubTask A.2 發送完資料後,還有 5 個 Buffer 被積壓,那麼會把發送資料和 Backlog size = 5 一塊發送給下遊 SubTask B.4,下遊接受到資料後,知道上遊積壓了 5 個Buffer,于是向 Buffer Pool 申請 Buffer,由于容量有限,下遊 InputChannel 目前僅有 2 個 Buffer 空間,是以,SubTask B.4 會向上遊 SubTask A.2 回報 Channel Credit = 2。然後上遊下一次最多隻給下遊發送 2 個 Buffer 的資料,這樣每次上遊發送的資料都是下遊 InputChannel 的 Buffer 可以承受的資料量,是以通過這種回報政策,保證了不會在公用的 Netty 和 TCP 這一層資料堆積而影響其他 SubTask 通信。

Credit的源碼實作如下

1、初始化:首先,在向NetworkEnvironment注冊的時候,會為InputGate配置設定本地緩沖池,還會為RemoteInputChannel配置設定獨占的buffer:

class NetworkEnvironment {
	public void setupInputGate(SingleInputGate gate) throws IOException {
		BufferPool bufferPool = null;
		int maxNumberOfMemorySegments;
		try {
			if (config.isCreditBased()) { //使用 Credit-based Flow Control
				//本地緩沖池使用的 buffer 數量,如果是 bounded,則緩沖池的大小最大為 taskmanager.network.memory.floating-buffers-per-gate
				maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
					config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;

				// assign exclusive buffers to input channels directly and use the rest for floating buffers
				// 獨占的buffer,不包含在配置設定的 LocalBufferPool 中
				gate.assignExclusiveSegments(networkBufferPool, config.networkBuffersPerChannel());
				bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
			} else {
				maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
					gate.getNumberOfInputChannels() * config.networkBuffersPerChannel() +
						config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;

				bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
					maxNumberOfMemorySegments);
			}
			//配置設定 LocalBufferPool 本地緩沖池,這是所有 channel 共享的
			gate.setBufferPool(bufferPool);
		} catch (Throwable t) {
			if (bufferPool != null) {
				bufferPool.lazyDestroy();
			}
			ExceptionUtils.rethrowIOException(t);
		}
	}
}

class SingleInputGate {
	public void assignExclusiveSegments(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) throws IOException {
		checkState(this.isCreditBased, "Bug in input gate setup logic: exclusive buffers only exist with credit-based flow control.");
		checkState(this.networkBufferPool == null, "Bug in input gate setup logic: global buffer pool has" +
			"already been set for this input gate.");

		this.networkBufferPool = checkNotNull(networkBufferPool);
		this.networkBuffersPerChannel = networkBuffersPerChannel;

		synchronized (requestLock) {
			for (InputChannel inputChannel : inputChannels.values()) {
				if (inputChannel instanceof RemoteInputChannel) {
					//RemoteInputChannel 請求獨占的 buffer
					((RemoteInputChannel) inputChannel).assignExclusiveSegments(
						networkBufferPool.requestMemorySegments(networkBuffersPerChannel));
				}
			}
		}
	}
}
           

2、RemoteInputChannel管理可用buffer:在RemoteInputChannel内部使用AvailableBufferQueue來管理所有可用的buffer:

class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
	//可用的 buffer 隊列,包含 exclusive + floating
	/** The available buffer queue wraps both exclusive and requested floating buffers. */
	private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue();

	//配置設定獨占的 buffer
	void assignExclusiveSegments(List<MemorySegment> segments) {
        this.initialCredit = segments.size();
        this.numRequiredBuffers = segments.size();
        
        synchronized (bufferQueue) {
           for (MemorySegment segment : segments) {
              // 注意這個 NetworkBuffer 的回收器是 RemoteInputChannel 自身
              bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), numRequiredBuffers);
           }
        }
	}

	 // 獨占的 buffer 釋放後會直接被 RemoteInputChannel 回收
     /**
     * Exclusive buffer is recycled to this input channel directly and it may trigger return extra
     * floating buffer and notify increased credit to the producer.
     *
     * @param segment The exclusive segment of this channel.
     */
    @Override
    public void recycle(MemorySegment segment) {
       int numAddedBuffers;
       synchronized (bufferQueue) {
          // Similar to notifyBufferAvailable(), make sure that we never add a buffer
          // after releaseAllResources() released all buffers (see below for details).
          if (isReleased.get()) { // 如果這個channle已經被釋放
             try {
                // 這個MemorySegment會被歸還給NetworkBufferPool
                inputGate.returnExclusiveSegments(Collections.singletonList(segment));
                return;
             } catch (Throwable t) {
                ExceptionUtils.rethrow(t);
             }
          }
          // 重新加入到AvailableBufferQueue中
          numAddedBuffers = bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), numRequiredBuffers);
       }
       if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(numAddedBuffers) == 0) {
          notifyCreditAvailable();
       }
    }
    
    /**
     * Manages the exclusive and floating buffers of this channel, and handles the
     * internal buffer related logic.
     */
    private static class AvailableBufferQueue {
       /** The current available floating buffers from the fixed buffer pool. */
       private final ArrayDeque<Buffer> floatingBuffers;  // 這部分是流動的
       /** The current available exclusive buffers from the global buffer pool. */
       private final ArrayDeque<Buffer> exclusiveBuffers; // 這部分是獨占的
    
       AvailableBufferQueue() {
          this.exclusiveBuffers = new ArrayDeque<>();
          this.floatingBuffers = new ArrayDeque<>();
       }
    
       /**
        * Adds an exclusive buffer (back) into the queue and recycles one floating buffer if the
        * number of available buffers in queue is more than the required amount.
        *
        * @param buffer The exclusive buffer to add
        * @param numRequiredBuffers The number of required buffers
        *
        * @return How many buffers were added to the queue
        */
       //添加一個獨占的buffer,如果目前可用的 buffer 總量超出了要求的數量,則向本地緩沖池歸還一個流動的buffer
       //傳回值是新增的 buffer 數量
       int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
          exclusiveBuffers.add(buffer);
          if (getAvailableBufferSize() > numRequiredBuffers) {
             Buffer floatingBuffer = floatingBuffers.poll();
             floatingBuffer.recycleBuffer();  //加一個,歸還一個,相當于沒加
             return 0;
          } else {
             return 1;
          }
       }
       
       void addFloatingBuffer(Buffer buffer) {   //添加一個流動的buffer
          floatingBuffers.add(buffer);
       }
    
       /**
        * Takes the floating buffer first in order to make full use of floating
        * buffers reasonably.
        *
        * @return An available floating or exclusive buffer, may be null
        * if the channel is released.
        */
       @Nullable
       Buffer takeBuffer() {            //優先取流動的buffer
          if (floatingBuffers.size() > 0) {
             return floatingBuffers.poll();
          } else {
             return exclusiveBuffers.poll();
          }
       }
    
       int getAvailableBufferSize() {
          return floatingBuffers.size() + exclusiveBuffers.size();
       }
    }	
}
           

3、請求遠端子分區:RemoteInputChannel請求遠端的ResultSubpartition,會建立一個PartitionRequestClient,并通過Netty發送PartitionRequest請求,這時會帶上目前InputChannel的id和初始的credit資訊:

class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
	public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
		//REMOTE,需要網絡通信,使用 Netty 建立網絡
		//通過 ConnectionManager 來建立連接配接:建立 PartitionRequestClient,通過 PartitionRequestClient 發起請求
		if (partitionRequestClient == null) {
			// Create a client and request the partition
			partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId);

			//請求分區,通過 netty 發起請求
			partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
		}
	}
}

public class PartitionRequestClient {
	public ChannelFuture requestSubpartition(
			final ResultPartitionID partitionId,
			final int subpartitionIndex,
			final RemoteInputChannel inputChannel,
			int delayMs) throws IOException {
		//向 NetworkClientHandler 注冊目前 RemoteInputChannel
		//單個 Task 所有的 RemoteInputChannel 的資料傳輸都通過這個 PartitionRequestClient 處理
		clientHandler.addInputChannel(inputChannel);

		//PartitionRequest封裝了請求的 sub-partition 的資訊,目前 input channel 的 ID,以及初始 credit
		final PartitionRequest request = new PartitionRequest(
				partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());

		final ChannelFutureListener listener = new ChannelFutureListener() {
			@Override
			public void operationComplete(ChannelFuture future) throws Exception {
				if (!future.isSuccess()) {
					//如果請求發送失敗,要移除目前的 inputChannel
					clientHandler.removeInputChannel(inputChannel);
					SocketAddress remoteAddr = future.channel().remoteAddress();
					inputChannel.onError(
							new LocalTransportException(
								String.format("Sending the partition request to '%s' failed.", remoteAddr),
								future.channel().localAddress(), future.cause()
							));
				}
			}
		};

		//通過 netty 發送請求
		if (delayMs == 0) {
			ChannelFuture f = tcpChannel.writeAndFlush(request);
			f.addListener(listener);
			return f;
		} else {
			final ChannelFuture[] f = new ChannelFuture[1];
			tcpChannel.eventLoop().schedule(new Runnable() {
				@Override
				public void run() {
					f[0] = tcpChannel.writeAndFlush(request);
					f[0].addListener(listener);
				}
			}, delayMs, TimeUnit.MILLISECONDS);
			return f[0];
		}
	}
}
           

4、生産端的處理流程

        生産者端即ResultSubpartition一側,在網絡通信中對應NettyServer。NettyServer有兩個重要的ChannelHandler,即PartitionRequestServerHandler和PartitionRequestQueue。其中,PartitionRequestServerHandler負責處理消費端通過PartitionRequestClient發送的PartitionRequest和AddCredit等請求;PartitionRequestQueue則包含了一個可以從中讀取資料的NetworkSequenceViewReader隊列,它會監聽NettyChannel的可寫入狀态,一旦可以寫入資料,就使用NetworkSequenceViewReader來消費ResultSubpartition寫入的Buffer資料,将其寫入到NettyChannel。

        首先,當NettyServer接收到PartitionRequest消息後,PartitionRequestServerHandler會建立一個NetworkSequenceViewReader對象,請求建立ResultSubpartitionView,并将NetworkSequenceViewReader儲存在PartitionRequestQueue中。PartitionRequestQueue會持有所有請求消費資料的RemoteInputChannel的ID和NetworkSequenceViewReader之間的映射關系。

        其中ResultSubpartitionView用來消費ResultSubpartition中的資料,并在ResultSubpartition中有資料可用時獲得提醒;NetworkSequenceViewReader則相當于對ResultSubpartition的一層包裝,她會按順序為讀取的每一個buffer配置設定一個序列号,并且記錄了接收資料的RemoteInputChannel的ID。在使用Credit-basedFlowControl的情況下,NetworkSequenceViewReader的具體實作對應為CreditBasedSequenceNumberingViewReader。CreditBasedSequenceNumberingViewReader同時還實作了BufferAvailabilityListener接口,因而可以作為PipelinedSubpartitionView的回調對象。

class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMessage> {
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
		try {
			Class<?> msgClazz = msg.getClass();
			if (msgClazz == PartitionRequest.class) {
				//Server 端接收到 client 發送的 PartitionRequest
				PartitionRequest request = (PartitionRequest) msg;
				try {
					NetworkSequenceViewReader reader;
					if (creditBasedEnabled) {
						reader = new CreditBasedSequenceNumberingViewReader(
							request.receiverId,
							request.credit,
							outboundQueue);
					} else {
						reader = new SequenceNumberingViewReader(
							request.receiverId,
							outboundQueue);
					}

					//通過 ResultPartitionProvider(實際上就是 ResultPartitionManager)建立 ResultSubpartitionView
					//在有可被消費的資料産生後,PartitionRequestQueue.notifyReaderNonEmpty 會被回調,進而在 netty channelPipeline 上觸發一次 fireUserEventTriggered
					reader.requestSubpartitionView(
						partitionProvider,
						request.partitionId,
						request.queueIndex);

					//通知 PartitionRequestQueue 建立了一個 NetworkSequenceViewReader
					outboundQueue.notifyReaderCreated(reader);
				} catch (PartitionNotFoundException notFound) {
					respondWithError(ctx, notFound, request.receiverId);
				}
			}
			......
		} catch (Throwable t) {
			respondWithError(ctx, t);
		}
	}
}


class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListener, NetworkSequenceViewReader {
	private final InputChannelID receiverId; //對應的 RemoteInputChannel 的 ID
	private final PartitionRequestQueue requestQueue;
	//消費 ResultSubpartition 的資料,并在 ResultSubpartition 有資料可用時獲得通知
	private volatile ResultSubpartitionView subpartitionView;
	//numCreditsAvailable的值是消費端還能夠容納的buffer的數量,也就是允許生産端發送的buffer的數量
	private int numCreditsAvailable;
	private int sequenceNumber = -1; //序列号,自增

	//建立一個 ResultSubpartitionView,用于讀取資料,并在有資料可用時獲得通知
	@Override
	public void requestSubpartitionView(
		ResultPartitionProvider partitionProvider,
		ResultPartitionID resultPartitionId,
		int subPartitionIndex) throws IOException {

		synchronized (requestLock) {
			if (subpartitionView == null) {
				this.subpartitionView = partitionProvider.createSubpartitionView(
					resultPartitionId,
					subPartitionIndex,
					this);
			} else {
				throw new IllegalStateException("Subpartition already requested");
			}
		}
	}

	//讀取資料
	@Override
	public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
		BufferAndBacklog next = subpartitionView.getNextBuffer(); //讀取資料
		if (next != null) {
			sequenceNumber++; //序列号
			//要發送一個buffer,對應的 numCreditsAvailable 要減 1
			if (next.buffer().isBuffer() && --numCreditsAvailable < 0) {
				throw new IllegalStateException("no credit available");
			}
			return new BufferAndAvailability(
				next.buffer(), isAvailable(next), next.buffersInBacklog());
		} else {
			return null;
		}
	}

	//是否還可以消費資料:
	// 1. ResultSubpartition 中有更多的資料
	// 2. credit > 0 或者下一條資料是事件(事件不需要消耗credit)
	@Override
	public boolean isAvailable() {
		// BEWARE: this must be in sync with #isAvailable(BufferAndBacklog)!
		return hasBuffersAvailable() &&
			//要求 numCreditsAvailable > 0 或者是 Event
			(numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent());
	}

	boolean hasBuffersAvailable() {
		return subpartitionView.isAvailable();
	}

	//和上面 isAvailable() 是等價的
	private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
		// BEWARE: this must be in sync with #isAvailable()!
		return bufferAndBacklog.isMoreAvailable() &&
			(numCreditsAvailable > 0 || bufferAndBacklog.nextBufferIsEvent());
	}

	//在 ResultSubparition 中有資料時會回調該方法
	@Override
	public void notifyDataAvailable() {
		//告知 PartitionRequestQueue 目前 ViewReader 有資料可讀
		requestQueue.notifyReaderNonEmpty(this);
	}
}
           

PartitionRequestQueue負責将ResultSubparition中的資料通過網絡發送給RemoteInputChannel。在PartitionRequestQueue中儲存了所有的NetworkSequenceViewReader和InputChannelID之間的映射關系,以及一個ArrayDeque<NetworkSequenceViewReader>availableReaders隊列。當一個NetworkSequenceViewReader中有資料可以被消費時,就會被加入到availableReaders隊列中。

class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
	/** The readers which are already enqueued available for transferring data. */
	private final ArrayDeque<NetworkSequenceViewReader> availableReaders = new ArrayDeque<>();
	/** All the readers created for the consumers' partition requests. */
	private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> allReaders = new ConcurrentHashMap<>();

	//添加新的 NetworkSequenceViewReader
	public void notifyReaderCreated(final NetworkSequenceViewReader reader) {
		allReaders.put(reader.getReceiverId(), reader);
	}

	//通知 NetworkSequenceViewReader 有資料可讀取
	void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
		//觸發一次使用者自定義事件
		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
	}

	//自定義使用者事件的處理
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
		// The user event triggered event loop callback is used for thread-safe
		// hand over of reader queues and cancelled producers.
		if (msg instanceof NetworkSequenceViewReader) {
			//NetworkSequenceViewReader有資料可讀取,加入隊列中
			enqueueAvailableReader((NetworkSequenceViewReader) msg);
		} else if (msg.getClass() == InputChannelID.class) {
			// 對應的 RemoteInputChannel 請求取消消費
			// Release partition view that get a cancel request.
			InputChannelID toCancel = (InputChannelID) msg;
			if (released.contains(toCancel)) {
				return;
			}
			// Cancel the request for the input channel
			int size = availableReaders.size();
			for (int i = 0; i < size; i++) {
				NetworkSequenceViewReader reader = pollAvailableReader();
				if (reader.getReceiverId().equals(toCancel)) {
					reader.releaseAllResources();
					markAsReleased(reader.getReceiverId());
				} else {
					registerAvailableReader(reader);
				}
			}
			allReaders.remove(toCancel);
		} else {
			ctx.fireUserEventTriggered(msg);
		}
	}

	//加入隊列
	private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
		if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
			//已經被注冊到隊列中,或者暫時沒有 buffer 或沒有 credit 可用
			return;
		}
		boolean triggerWrite = availableReaders.isEmpty();
		registerAvailableReader(reader);
		if (triggerWrite) {
			//如果這是隊列中第一個元素,調用 writeAndFlushNextMessageIfPossible 發送資料
			writeAndFlushNextMessageIfPossible(ctx.channel());
		}
	}
}
           

PartitionRequestQueue會監聽NettyChannel的可寫入狀态,當Channel可寫入時,就會從availableReaders隊列中取出NetworkSequenceViewReader,讀取資料并寫入網絡。可寫入狀态是Netty通過水位線進行控制的,NettyServer在啟動的時候會配置水位線,如果Netty輸出緩沖中的位元組數超過了高水位值,我們會等到其降到低水位值以下才繼續寫入資料。通過水位線機制確定不往網絡中寫入太多資料。

class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
	@Override
	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
		//目前channel的讀寫狀态發生變化
		writeAndFlushNextMessageIfPossible(ctx.channel());
	}

	private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
		if (fatalError || !channel.isWritable()) {
			//如果目前不可寫入,則直接傳回
			return;
		}
		BufferAndAvailability next = null;
		try {
			while (true) {
				//取出一個 reader
				NetworkSequenceViewReader reader = pollAvailableReader();

				if (reader == null) {
					return;
				}

				next = reader.getNextBuffer();
				if (next == null) {
					//沒有讀到資料
					if (!reader.isReleased()) {
						//還沒有釋放目前 subpartition,繼續處理下一個 reader
						continue;
					}
					markAsReleased(reader.getReceiverId());

					//出錯了
					Throwable cause = reader.getFailureCause();
					if (cause != null) {
						ErrorResponse msg = new ErrorResponse(
							new ProducerFailedException(cause),
							reader.getReceiverId());

						ctx.writeAndFlush(msg);
					}
				} else {
					// 讀到了資料
					if (next.moreAvailable()) {
						//這個 reader 還可以讀到更多的資料,繼續加入隊列
						registerAvailableReader(reader);
					}

					BufferResponse msg = new BufferResponse(
						next.buffer(),
						reader.getSequenceNumber(),
						reader.getReceiverId(),
						next.buffersInBacklog());

					// 向 client 發送資料,發送成功之後通過 writeListener 的回調觸發下一次發送
					channel.writeAndFlush(msg).addListener(writeListener);
					return;
				}
			}
		} catch (Throwable t) {
			if (next != null) {
				next.buffer().recycleBuffer();
			}
			throw new IOException(t.getMessage(), t);
		}
	}

	private class WriteAndFlushNextMessageIfPossibleListener implements ChannelFutureListener {
		@Override
		public void operationComplete(ChannelFuture future) throws Exception {
			try {
				if (future.isSuccess()) {
					//發送成功,再次嘗試寫入
					writeAndFlushNextMessageIfPossible(future.channel());
				} else if (future.cause() != null) {
					handleException(future.channel(), future.cause());
				} else {
					handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
				}
			} catch (Throwable t) {
				handleException(future.channel(), t);
			}
		}
	}

}
           

在Credit-basedFlowControl算法中,每發送一個buffer就會消耗一點credit,在消費端有空閑buffer可用時會發送AddCrdit消息。

class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMessage> {
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
		......
		if (msgClazz == AddCredit.class) {
				//增加 credit
				AddCredit request = (AddCredit) msg;
				outboundQueue.addCredit(request.receiverId, request.credit);
		}
		......
	}
}

class RequestReaderQueue extends ChannelInboundHandlerAdapter {
	void addCredit(InputChannelID receiverId, int credit) throws Exception {
		if (fatalError) {
			return;
		}
		NetworkSequenceViewReader reader = allReaders.get(receiverId);
		if (reader != null) {
			//增加 credit
			reader.addCredit(credit);
			//因為增加了credit,可能可以繼續處理資料,是以把 reader 加入隊列
			enqueueAvailableReader(reader);
		} else {
			throw new IllegalStateException("No reader for receiverId = " + receiverId + " exists.");
		}
	}
}
           

5、消費端處理流程:消費端即RemoteInputChannel一側,在網絡通信中對應NettyClient。同樣地,我們從ChannelHandler作為入口進行分析。

public interface NetworkClientHandler extends ChannelHandler {
	void addInputChannel(RemoteInputChannel inputChannel) throws IOException;
	void removeInputChannel(RemoteInputChannel inputChannel);
	void cancelRequestFor(InputChannelID inputChannelId);
	//通知有新的的 credit 可用
	void notifyCreditAvailable(final RemoteInputChannel inputChannel);
}
           

NetworkClientHanlder對應的實作類為CreditBasedPartitionRequestClientHandler,CreditBasedPartitionRequestClientHandler負責接收服務端通過Nettychannel發送的資料,解析資料後交給對應的RemoteInputChannle進行處理:

class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter implements NetworkClientHandler {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//從netty channel中接收到資料
		try {
			//解析消息
			decodeMsg(msg);
		} catch (Throwable t) {
			notifyAllChannelsOfErrorAndClose(t);
		}
	}

	private void decodeMsg(Object msg) throws Throwable {
		final Class<?> msgClazz = msg.getClass();

		// ---- Buffer --------------------------------------------------------
		if (msgClazz == NettyMessage.BufferResponse.class) {
			//正常的資料
			NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

			//根據 ID 定位到對應的 RemoteInputChannel
			RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
			if (inputChannel == null) {
				//如果沒有對應的 RemoteInputChannel
				bufferOrEvent.releaseBuffer();
				//取消對給定 receiverId 的訂閱
				cancelRequestFor(bufferOrEvent.receiverId);
				return;
			}
			//解析消息,是buffer還是event
			decodeBufferOrEvent(inputChannel, bufferOrEvent);

		} else if (msgClazz == NettyMessage.ErrorResponse.class) {
			// ---- Error ---------------------------------------------------------
			......
		} else {
			throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
		}
	}

	private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
		try {
			ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer();
			final int receivedSize = nettyBuffer.readableBytes();
			if (bufferOrEvent.isBuffer()) {
				// ---- Buffer ------------------------------------------------
				// Early return for empty buffers. Otherwise Netty's readBytes() throws an
				// IndexOutOfBoundsException.
				if (receivedSize == 0) {
					inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
					return;
				}
				//從對應的 RemoteInputChannel 中請求一個 Buffer
				Buffer buffer = inputChannel.requestBuffer();
				if (buffer != null) {
					//将接收的資料寫入buffer
					nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize);

					//通知對應的channel,backlog是生産者那邊堆積的buffer數量
					inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
				} else if (inputChannel.isReleased()) {
					cancelRequestFor(bufferOrEvent.receiverId);
				} else {
					throw new IllegalStateException("No buffer available in credit-based input channel.");
				}
			} else {
				// ---- Event -------------------------------------------------
				// TODO We can just keep the serialized data in the Netty buffer and release it later at the reader
				byte[] byteArray = new byte[receivedSize];
				nettyBuffer.readBytes(byteArray);

				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
				//是一個事件,不需要從 RemoteInputChannel 中申請 buffer
				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
				//通知對應的channel,backlog是生産者那邊堆積的buffer數量
				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
			}
		} finally {
			bufferOrEvent.releaseBuffer();
		}
	}
}
           

CreditBasedPartitionRequestClientHandler從網絡中讀取資料後交給RemoteInputChannel,RemoteInputChannel會将接收到的加入隊列中,并根據生産端的堆積申請floatingbuffer:

public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
	//接收到遠端 ResultSubpartition 發送的 Buffer
	public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
		boolean recycleBuffer = true;
		try {
			final boolean wasEmpty;
			synchronized (receivedBuffers) {
				if (isReleased.get()) {
					return;
				}
				//序号需要比對
				if (expectedSequenceNumber != sequenceNumber) {
					onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
					return;
				}
				//加入 receivedBuffers 隊列中
				wasEmpty = receivedBuffers.isEmpty();
				receivedBuffers.add(buffer);
				recycleBuffer = false;
			}

			++expectedSequenceNumber;

			if (wasEmpty) {
				//通知 InputGate,目前 channel 有新資料
				notifyChannelNonEmpty();
			}

			if (backlog >= 0) {
				//根據用戶端的積壓申請float buffer
				onSenderBacklog(backlog);
			}
		} finally {
			if (recycleBuffer) {
				buffer.recycleBuffer();
			}
		}
	}

	//backlog 是發送端的堆積 的 buffer 數量,
	//如果 bufferQueue 中 buffer 的數量不足,就去須從 LocalBufferPool 中請求 floating buffer
	//在請求了新的 buffer 後,通知生産者有 credit 可用
	void onSenderBacklog(int backlog) throws IOException {
		int numRequestedBuffers = 0;

		synchronized (bufferQueue) {
			// Similar to notifyBufferAvailable(), make sure that we never add a buffer
			// after releaseAllResources() released all buffers (see above for details).
			if (isReleased.get()) {
				return;
			}

			//需要的 buffer 數量是 backlog + initialCredit, backlog 是生産者目前的積壓
			numRequiredBuffers = backlog + initialCredit;
			while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
				//不停地請求新的 floating buffer
				Buffer buffer = inputGate.getBufferPool().requestBuffer();
				if (buffer != null) {
					//從 buffer poll 中請求到 buffer
					bufferQueue.addFloatingBuffer(buffer);
					numRequestedBuffers++;
				} else if (inputGate.getBufferProvider().addBufferListener(this)) {
					// buffer pool 沒有 buffer 了,加一個監聽,當 LocalBufferPool 中有新的 buffer 時會回調 notifyBufferAvailable
					// If the channel has not got enough buffers, register it as listener to wait for more floating buffers.
					isWaitingForFloatingBuffers = true;
					break;
				}
			}
		}

		if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
			//請求了新的floating buffer,要更新 credit
			notifyCreditAvailable();
		}
	}

	private void notifyCreditAvailable() {
		checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
		//通知目前 channel 有新的 credit
		partitionRequestClient.notifyCreditAvailable(this);
	}

	//LocalBufferPool 通知有 buffer 可用
	@Override
	public NotificationResult notifyBufferAvailable(Buffer buffer) {
		NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
		try {
			synchronized (bufferQueue) {
				checkState(isWaitingForFloatingBuffers,
					"This channel should be waiting for floating buffers.");

				// Important: make sure that we never add a buffer after releaseAllResources()
				// released all buffers. Following scenarios exist:
				// 1) releaseAllResources() already released buffers inside bufferQueue
				// -> then isReleased is set correctly
				// 2) releaseAllResources() did not yet release buffers from bufferQueue
				// -> we may or may not have set isReleased yet but will always wait for the
				// lock on bufferQueue to release buffers
				if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
					isWaitingForFloatingBuffers = false;
					return notificationResult;
				}

				//增加floating buffer
				bufferQueue.addFloatingBuffer(buffer);

				if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
					//bufferQueue中有足夠多的 buffer 了
					isWaitingForFloatingBuffers = false;
					notificationResult = NotificationResult.BUFFER_USED_NO_NEED_MORE;
				} else {
					//bufferQueue 中 buffer 仍然不足
					notificationResult = NotificationResult.BUFFER_USED_NEED_MORE;
				}
			}

			if (unannouncedCredit.getAndAdd(1) == 0) {
				notifyCreditAvailable();
			}
		} catch (Throwable t) {
			setError(t);
		}
		return notificationResult;
	}
}
           

一旦RemoteInputChannel申請到新的buffer,就需要通知生産者更新credit,這需要發送一條AddCredit消息:

class PartitionRequestClient {
	//交給 NetworkClientHandler 處理
	public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
		clientHandler.notifyCreditAvailable(inputChannel);
	}
}

class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter implements NetworkClientHandler {
	//有新的credit
	@Override
	public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
		//觸發一次自定義事件
		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
	}

	@Override
	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
		writeAndFlushNextMessageIfPossible(ctx.channel());
	}

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
		if (msg instanceof RemoteInputChannel) {
			//有新的credit會觸發
			boolean triggerWrite = inputChannelsWithCredit.isEmpty();

			//加入到隊列中
			inputChannelsWithCredit.add((RemoteInputChannel) msg);

			if (triggerWrite) {
				writeAndFlushNextMessageIfPossible(ctx.channel());
			}
		} else {
			ctx.fireUserEventTriggered(msg);
		}
	}

	private void writeAndFlushNextMessageIfPossible(Channel channel) {
		if (channelError.get() != null || !channel.isWritable()) {
			return;
		}

		//從隊列中取出 RemoteInputChannel, 發送消息
		while (true) {
			RemoteInputChannel inputChannel = inputChannelsWithCredit.poll();

			// The input channel may be null because of the write callbacks
			// that are executed after each write.
			if (inputChannel == null) {
				return;
			}

			//It is no need to notify credit for the released channel.
			if (!inputChannel.isReleased()) {
				//發送 AddCredit 的消息
				AddCredit msg = new AddCredit(
					inputChannel.getPartitionId(),
					inputChannel.getAndResetUnannouncedCredit(), //擷取并重置新增的credit
					inputChannel.getInputChannelId());

				// Write and flush and wait until this is done before
				// trying to continue with the next input channel.
				channel.writeAndFlush(msg).addListener(writeListener);
				return;
			}
		}
	}

	private class WriteAndFlushNextMessageIfPossibleListener implements ChannelFutureListener {
		@Override
		public void operationComplete(ChannelFuture future) throws Exception {
			try {
				if (future.isSuccess()) {
					writeAndFlushNextMessageIfPossible(future.channel());
				} else if (future.cause() != null) {
					notifyAllChannelsOfErrorAndClose(future.cause());
				} else {
					notifyAllChannelsOfErrorAndClose(new IllegalStateException("Sending cancelled by user."));
				}
			} catch (Throwable t) {
				notifyAllChannelsOfErrorAndClose(t);
			}
		}
	}
}
           

繼續閱讀