Receiver
receiver是使用kafka的進階consumer API來實作的,Receiver每個一段batch時間去kafka擷取這段時間裡最新的相應topic資料,從kafka哪裡擷取來的資料都存在Spark Excutor的記憶體中,然後Spark Streaming啟動job去處理這些資料。
其中,誰來消費分區不是由SparkStreaming來決定的,而是由高階消費者決定的,它決定分區向消費者的配置設定,即高階消費者Api決定消費者消費哪個分區。高階消費者會根據配置參數決定,消費者讀取資料後什麼時候送出offset。
這會引起一個問題,當Spark Streaming中的Receiver讀取Kafka分區資料時,假設讀取了100條資料,高階消費者API會執行offset的送出,例如每隔3秒,這100條資料就是RDD,假設此RDD還沒有處理完,高階消費者API執行了offset送出,但是Spark Streaming挂掉了,由于RDD在記憶體中,那麼RDD的資料就丢失了,如果想重新拿資料,從哪裡去拿不是由Spark Streaming說了算的,是由高階API決定的,由于offset已經送出,高階API認為這個資料Spark Streaming已經拿過了,再拿要拿100條以後的資料,那麼之前丢失的100條資料就永遠丢失了。
針對這一問題,Spark Streaming設計了一個規則,即Spark Streaming預寫日志規則(Write Ahead Log,WAL),每讀取一批資料,會寫一個WAL檔案,在WAL檔案中,讀了多少條就寫多少條,WAL檔案存儲于HDFS上。假設RDD中有100條資料,那麼WAL檔案中也有100條資料,此時如果Spark Streaming挂掉,那麼回去讀取HDFS上的WAL檔案,把WAL檔案中的100條資料取出再生成RDD,然後再去消費。由于這一設計需要寫HDFS,會對整體性能造成影響。
假設有6個分區,高階消費者的話會在Spark叢集的Worker上啟動Receiver,有6個分區則會用6個線程去讀取分區資料,這是在一個Worker的一個Receiver中有6個線程同時讀取6個分區的資料,随着資料量越來越大,資料讀取會成為瓶頸,此時可以建立多個Receiver分散讀取分區資料,然後每個Receiver建立一個Dstream,再把這些流全部都合并起來,然後進行計算。讀取時,一方面把RDD放在記憶體中,一方面寫HDFS中的WAL檔案。
根據上面的情景,又要建立多個Receiver,又要進行合并,又要在記憶體中存儲RDD,又要寫HDFS上的WAL檔案,進階API的缺點還是比較多的。
高階消費者是由高階消費者API自己送出offset到ZooKeeper中。
Direct
低階消費者需要自己維護offset, Spark Streaming從分區裡讀一部分資料,然後将offset儲存到CheckpointPath目錄中,比如幾秒生成一個Spark Streaming job(每個action操作啟動一次job),每個job生成的時候,會寫一次CheckpointPath下的檔案,Checkpoint中有job資訊和offset資訊(當然還有RDD依賴關系等其他資訊),即儲存了未完成的job和分區讀取的offset,一旦Spark Streaming挂掉後重新開機,可以通過從CheckpointPath中的檔案中反序列化來讀取Checkpoint的資料。