天天看點

第12課:Spark Streaming源碼解讀之Executor容錯安全性

一、Spark Streaming 資料安全性的考慮:

  1. Spark Streaming不斷的接收資料,并且不斷的産生Job,不斷的送出Job給叢集運作。是以這就涉及到一個非常重要的問題資料安全性。
  2. Spark Streaming是基于Spark Core之上的,如果能夠確定資料安全可好的話,在Spark Streaming生成Job的時候裡面是基于RDD,即使運作的時候出現問題,那麼Spark Streaming也可以借助Spark Core的容錯機制自動容錯。
  3. 對Executor容錯主要是對資料的安全容錯
  4. 為啥這裡不考慮對資料計算的容錯:計算的時候Spark Streaming是借助于Spark Core之上的容錯的,是以天然就是安全可靠的。

Executor容錯方式: 

1. 最簡單的容錯是副本方式,基于底層BlockManager副本容錯,也是預設的容錯方式。 

2.WAL日志方式

3. 接收到資料之後不做副本,支援資料重放,所謂重放就是支援反複讀取資料。

BlockManager備份:

  1. 預設在記憶體中兩份副本,也就是Spark Streaming的Receiver接收到資料之後存儲的時候指定StorageLevel為MEMORY_AND_DISK_SER_2,底層存儲是交給BlockManager,BlockManager的語義確定了如果指定了兩份副本,一般都在記憶體中。是以至少兩個Executor中都會有資料。
第12課:Spark Streaming源碼解讀之Executor容錯安全性

  Receiver将資料交給BlockManger是由ReceiveredBlockHandler來處理的,有兩種 ReceiveredBlockHandler的實作: 1. WriteAheadLogBasedBlockHandler 2. BlockManagerBasedBlockHandler

第12課:Spark Streaming源碼解讀之Executor容錯安全性

這裡的storageLevel是建構InputDStream時傳入的, socketTextStream的 預設存儲級别是 StorageLevel.MEMORY_AND_DISK_SER_2  

第12課:Spark Streaming源碼解讀之Executor容錯安全性

  如果使用 WriteAheadLogBasedBlockHandler需要開啟WAL,預設并沒有開啟:  

第12課:Spark Streaming源碼解讀之Executor容錯安全性

    WAL日志方式:     這種方式會現将資料寫入日志檔案,就是checkpoint目錄,出現異常是,從checkpoint目錄重新讀取資料,進行恢複。啟動WAL時候,沒必要将副本數設定成大于1,也不需要序列化。  

第12課:Spark Streaming源碼解讀之Executor容錯安全性

  WAL會将資料同時寫入BlockManager和write ahead log,而且是并行的寫block,當然兩處的block存儲完成,才會傳回。  

第12課:Spark Streaming源碼解讀之Executor容錯安全性

  将Block 存入BlockManager:  

第12課:Spark Streaming源碼解讀之Executor容錯安全性

  将Block 存入WAL日志:  

第12課:Spark Streaming源碼解讀之Executor容錯安全性
看一下                WriteAheadLog的一個實作類FileBasedWriteAheadLog的write方法:      
根據不同時間擷取不同Writer将序列化結果寫入檔案,傳回一個                FileBasedWriteAheadLogSegment類型的對象fileSegment。      
第12課:Spark Streaming源碼解讀之Executor容錯安全性
讀資料:      
第12課:Spark Streaming源碼解讀之Executor容錯安全性
其中建立了一個FileBaseWriteAheadLogRandomReader對象,然後調用了該對象的read方法:      
第12課:Spark Streaming源碼解讀之Executor容錯安全性

支援資料重放。

在實際的開發中直接使用Kafka,因為不需要容錯,也不需要副本。 

Kafka有Receiver方式和Direct方式 

Receiver方式:是交給Zookeeper去管理資料的,也就是偏移量offSet.如果失效後,Kafka會基于offSet重新讀取,因為處理資料的時候中途崩潰,不會給Zookeeper發送ACK,此時Zookeeper認為你并沒有消息這個資料。但是在實際中越來用的越多的是Direct的方式直接操作offSet.而且還是自己管理offSet.

  1. DirectKafkaInputDStream會去檢視最新的offSet,并且把offSet放到Batch中。
  2. 在Batch每次生成的時候都會調用latestLeaderOffsets檢視最近的offSet,此時的offSet就會與上一個offSet相減獲得這個Batch的範圍。這樣就可以知道讀那些資料。

繼續閱讀