天天看點

[flink-001]flink的心跳機制1.心跳接口2.心跳機制運作

1.心跳接口

一個分布式叢集有多個節點。節點之間有心跳機制,以确認每個節點是正常工作的。心跳機制,就是每隔N秒,一個節點向其他節點發消息,其他節點收到後,回複一個消息。超過M秒沒有回複,視為TimeOut,節點被視為不能正常工作。

假設一個flink叢集有3個節點:A,B1,B2。A是master節點,B1和B2是worker節點。根據flink的心跳機制,A節點每3秒向B1和B2發起心跳請求,B1和B2收到請求後,向A傳回心跳應答。心跳請求和心跳應答,都會帶上Payload載荷,傳遞更多的業務資訊。

flink叢集有多種業務流程,每種業務流程都有自己的心跳機制。flink的心跳機制提供接口,具體功能由各業務流程實作。

flink的心跳機制代碼在flink/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat,實作三個接口:HeartbeatTarget,HeartbeatListener, HeartbeatManager。實作了三個類,HeartbeatManagerImpl, HeartbeatManagerSenderImpl, HeartbeatServices,這個三個類是示例接口的用法。

flink叢集的三個節點,A、B1、B2,在行為上而言,都有兩種動作:向某個節點發送請求,處理某個節點發來的請求。這兩個動作抽象成接口HeartbeatTarget,它有兩個函數receiveHeartbeat和requestHeartbeat。這兩個函數的參數也很簡單:分别是請求的發送放和接收方,還有Payload載荷,對于一個确定節點而言,接收的和發送的載荷是同一類型的。A、B1、B2都要實作這個接口。

A節點是master,它管理心跳機制,實作HeartbeatManger接口,這個接口繼承了HeartbeatTarget接口,理由如上段所述。除了HeartbeatTarget接口的函數之外,這個接口有4個函數:monitorTarget,把某個節點加入到心跳監控清單; unmonitorTarget,從心跳監控清單删除某個節點; stop,停止心跳管理服務,釋放資源; getLastHeartbeatFrom,擷取某個節點的最後一次心跳資料。

A節點的心跳管理類的一個成員對象,負責處理心跳結果。它實作了HearbeatListener接口,包括處理節點心跳逾時notifyHeartbeatTimeout,處理節點發來的Payload載荷reportPayload,擷取對某節點發下一次心跳請求的Payload載荷retrievePayLoad。

2.心跳機制運作

HearbeatManagerImpl類示範這三個接口的用法:

HearbeatManagerImpl構造函數,傳入參數有:心跳間隔時間,機關是毫秒; HearbeatManager的id,辨別身份; HeartbeatListener對象,處理心跳結果; Executor對象,它負責周期性發起心跳。

monitorTarget函數,把一個節點加入到心跳監控清單,傳入參數有:ResourceId和HearbeatTarget,這兩個參數是對應關系,monitorTarget根據這兩個參數,生成一個HeartbeatMonitor對象,然後把這個對象跟ResrouceId做kv關聯,存入到heartbeatTargets。 一個節點可能參與多個業務流程,是以一個節點參與多個心跳流程,一個節點上運作多個不同類型的HearbeatTarget,是以一個ResourcID可能會跟不同類型的HearbeatTarget對象,分别加入到多個HeartbeatManager,進行不同類型的心跳監控。也是以這個函數入參是兩個參數。

unmonitorTarget,stop,getLastHeartbeatFrom,getMainThreadExecutor, 這個4個函數比較簡單,不做解釋。

requestHeartbeat函數,函數實作如下:

@Override
	public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {
		if (!stopped) {
			log.debug("Received heartbeat request from {}.", requestOrigin);

			final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);

			if (heartbeatTarget != null) {
				if (heartbeatPayload != null) {
					heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
				}

				heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
			}
		}
	}
           

HeartbeatManger自己,或者第三方服務調用HeartbeatManager,調用requestHeartbeart函數,向requestOrigin節點發起一次心跳請求,載荷是heartbeatPayLoad。reportHeartbeat函數的作用是,記錄發起請求的這個時間點,然後建立一個ScheduleFuture,如果3秒後,requestOrigin沒有作出響應,那麼就将requestOrigin節點對應的HeartbeatMonitor的state設定成TIMEOUT狀态,如果3秒内requestOrigin響應了,ScheduleFuture會被取消,HeartbeatMonitor的state仍然是RUNNING。reportPayload函數,把requestOrigin節點的最新的heartbeatPayload通知給heartbeatListern。heartbeatListern是外部傳入的,它根據所有節點的心跳記錄做監聽管理。receiveHearbeat函數,模拟ResourceId節點上的HeartbeatTarget對象接受到HeartbeatManger發來的心跳請求。前面已經說了,HearbeatManagerImpl類是一個輔助類,示範接口使用說明,是以在這裡直接把HeartbeatTarget的接受動作也完成了,在實際場景裡更複雜一些。

receiveHearbeat函數,HearbeatManager收到了heartbeatOrigin發來的心跳響應,把前一個ScheduleFuture取消掉,然後再建立一個新的ScheduleFuture,等待下一輪的心跳,同時,reprotPayload把心跳響應的Palyload通知給heartbeatListener處理。

HeartbeatManagerImple實作了主要業務邏輯。

HeartbeatManagerSenderImpl繼承HearbeatManagerImpl,實作了run函數,在run函數,定時發起心跳請求。也就是說,它可以作為一個單獨線程運作。

繼續閱讀