SparkStreaming+Kafka的兩種模式receiver模式和Direct模式
- Sparkstreming + kafka recevier模式了解
receiver模式了解:
在SparkStreaming程式運作起來後,Executor中會有receiver tasks接收kafka推送過來的資料。資料會被持久化,預設級别為MEMORY_AND_DISK_SER_2,這個級别也可以修改。receiver task對接收過來的資料進行存儲和備份,這個過程會有節點之間的資料傳輸。備份完成後去zookeeper中更新消費偏移量,然後向Driver中的receiver tracker彙報資料的位置。最後Driver根據資料本地化将task分發到不同節點上執行。
receiver模式中存在的問題:
當Driver程序挂掉後,Driver下的Executor都會被殺掉,當更新完zookeeper消費偏移量的時候,Driver如果挂掉了,就會存在找不到資料的問題,相當于丢失資料。
- dirct模式了解
- 簡化資料處理流程
- 自己定義offset存儲,保證資料0丢失,但是會存在重複消費問題。(解決消費等幂問題)
- 不用接收資料,自己去kafka中拉取
開發
- 引入maven依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}-${cdh.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}-${cdh.version}</version>
</dependency>
- KafkaManager 類代碼
package org.apache.spark.streaming.kafka
import com.alibaba.fastjson.TypeReference
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{Decoder, StringDecoder}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import scala.reflect.ClassTag
/**
* 包名說明 :KafkaCluster是私有類,隻能在spark包中使用,
* 是以包名保持和 KafkaCluster 一緻才能調用
* @param kafkaParams
* @param autoUpdateoffset
*/
class KafkaManager(val kafkaParams: Map[String, String],
val autoUpdateoffset:Boolean = true) extends Serializable with Logging{
@transient
private var cluster = new KafkaCluster(kafkaParams)
def kc(): KafkaCluster ={
if(cluster == null){
cluster = new KafkaCluster(kafkaParams);
}
cluster
}
/**
* 泛型流讀取器
* @param ssc
* @param topics kafka topics,多個topic按","分割
* @tparam K 泛型 K
* @tparam V 泛型 V
* @tparam KD scala泛型 KD <: Decoder[K] 說明KD 的類型必須是Decoder[K]的子類型 上下界
* @tparam VD scala泛型 VD <: Decoder[V] 說明VD 的類型必須是Decoder[V]的子類型 上下界
* @return
*/
def createDirectStream[K: ClassTag, V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag](ssc: StreamingContext , topics: Set[String]): InputDStream[(K, V)] = {
//擷取消費者組
val groupId = kafkaParams.get("group.id").getOrElse("default")
// 在zookeeper上讀取offsets前先根據實際情況更新offsets
setOrUpdateOffsets(topics, groupId)
//把所有的offsets處理完成,就可以從zookeeper上讀取offset開始消費message
val messages = {
//擷取kafka分區資訊 為了列印資訊
val partitionsE = kc.getPartitions(topics)
require(partitionsE.isRight,s"擷取 kafka topic ${topics}`s partition 失敗。" )
val partitions = partitionsE.right.get
println("列印分區資訊")
partitions.foreach(println(_))
//擷取分區的offset
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
require(consumerOffsetsE.isRight,s"擷取 kafka topic ${topics}`s consumer offsets 失敗。" )
val consumerOffsets = consumerOffsetsE.right.get
println("列印消費者分區偏移資訊")
consumerOffsets.foreach(println(_))
//讀取資料
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
if(autoUpdateoffset){
//更新offset
messages.foreachRDD(rdd => {
logInfo("RDD 消費成功,開始更新zookeeper上的偏移")
updateZKOffsets(rdd)
})
}
messages
}
/**
* 建立資料流前,根據實際消費情況更新消費offsets
*
* @param topics
* @param groupId
*/
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
//擷取kafka partions的節點資訊
val partitionsE = kc.getPartitions(Set(topic))
logInfo(partitionsE+"")
//檢測
require(partitionsE.isRight, s"擷取 kafka topic ${topic}`s partition 失敗。")
val partitions = partitionsE.right.get
//擷取最早的 partions offsets資訊
val earliestLeader = kc.getEarliestLeaderOffsets(partitions)
val earliestLeaderOffsets = earliestLeader.right.get
println("kafka中最早的消息偏移")
earliestLeaderOffsets.foreach(println(_))
//擷取最末的 partions offsets資訊
val latestLeader = kc.getLatestLeaderOffsets(partitions)
val latestLeaderOffsets = latestLeader.right.get
println("kafka中最末的消息偏移")
latestLeaderOffsets.foreach(println(_))
//擷取消費者組的 offsets資訊
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
//如果消費者offset存在
if (consumerOffsetsE.isRight) {
/**
* 如果zk上儲存的offsets已經過時了,即kafka的定時清理政策已經将包含該offsets的檔案删除。
* 針對這種情況,隻要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過時,
* 這時把consumerOffsets更新為earliestLeaderOffsets
*/
//如果earliestLeader 存在
if(earliestLeader.isRight) {
//擷取最早的offset 也就是最小的offset
val earliestLeaderOffsets = earliestLeader.right.get
//擷取消費者組的offset
val consumerOffsets = consumerOffsetsE.right.get
// 将 consumerOffsets 和 earliestLeaderOffsets 的offsets 做比較
// 可能隻是存在部分分區consumerOffsets過時,是以隻更新過時分區的consumerOffsets為earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case (tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
//如果消費者的偏移小于 kafka中最早的offset,那麽,將最早的offset更新到zk
if (n < earliestLeaderOffset) {
logWarning("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已經過時,更新為" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
//設定offsets
setOffsets(groupId, offsets)
}
} else {
// 消費者還沒有消費過 也就是zookeeper中還沒有消費者的資訊
if(earliestLeader.isLeft)
logError(s"${topic} hasConsumed but earliestLeaderOffsets is null。")
//看是從頭消費還是從末開始消費 smallest表示從頭開始消費
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase).getOrElse("smallest")
//建構消費者 偏移
var leaderOffsets: Map[TopicAndPartition, Long] = Map.empty
//從頭消費
if (reset.equals("smallest")) {
//分為 存在 和 不存在 最早的消費記錄 兩種情況
//如果kafka 最小偏移存在,則将消費者偏移設定為和kafka偏移一樣
if(earliestLeader.isRight){
leaderOffsets = earliestLeader.right.get.map {
case (tp, offset) => (tp, offset.offset)
}
}else{
// 如果不存在,則從新建構偏移全部為0 offsets
leaderOffsets = partitions.map(tp => (tp, 0L)).toMap
}
} else {
//直接擷取最新的offset
leaderOffsets = kc.getLatestLeaderOffsets(partitions).right.get.map {
case (tp, offset) => (tp, offset.offset)
}
}
//設定offsets
setOffsets(groupId, leaderOffsets)
}
})
}
/**
* 設定消費者組的offsets
* @param groupId
* @param offsets
*/
private def setOffsets(groupId: String, offsets: Map[TopicAndPartition, Long]): Unit ={
if(offsets.nonEmpty){
//更新offset
val o = kc.setConsumerOffsets(groupId, offsets)
logInfo(s"更新zookeeper中消費組為:${groupId} 的 topic offset資訊為: ${offsets}")
if (o.isLeft) {
logError(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
/**
* 通過spark的RDD 更新zookeeper上的消費offsets
* @param rdd
*/
def updateZKOffsets[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) : Unit = {
//擷取消費者組
val groupId = kafkaParams.get("group.id").getOrElse("default")
//spark使用kafka低階API進行消費的時候,每個partion的offset是儲存在 spark的RDD中,是以這裡可以直接在
//RDD的 HasOffsetRanges 中擷取倒offsets資訊。因為這個資訊spark不會把則個資訊存儲到zookeeper中,是以
//我們需要自己實作将這部分offsets資訊存儲到zookeeper中
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//列印出spark中儲存的offsets資訊
offsetsList.foreach(x=>{
println("擷取spark 中的偏移資訊"+x)
})
for (offsets <- offsetsList) {
//根據topic和partition 建構topicAndPartition
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
logInfo("将SPARK中的 偏移資訊 存到zookeeper中")
//将消費者組的offsets更新到zookeeper中
setOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
}
}
//(null,{"rksj":"1558178497","latitude":"24.000000","imsi":"000000000000000"})
//讀取kafka流,并将json資料轉為map
def createJsonToJMapObjectDirectStreamWithOffset(ssc:StreamingContext ,
topicsSet:Set[String]): DStream[java.util.Map[String,Object]] = {
//一個轉換器
val converter = {json:String =>
println(json)
var res : java.util.Map[String,Object] = null
try {
//JSON轉map的操作
res = com.alibaba.fastjson.JSON.parseObject(json,
new TypeReference[java.util.Map[String, Object]]() {})
} catch {
case e: Exception => logError(s"解析topic ${topicsSet}, 的記錄 ${json} 失敗。", e)
}
res
}
createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
}
/**
* 根據converter建立流資料
* @param ssc
* @param topicsSet
* @param converter
* @tparam T
* @return
*/
def createDirectStreamWithOffset[T:ClassTag](ssc:StreamingContext ,
topicsSet:Set[String], converter:String => T): DStream[T] = {
createDirectStream[String, String, StringDecoder, StringDecoder](ssc, topicsSet)
.map(pair =>converter(pair._2))
}
def createJsonToJMapDirectStreamWithOffset(ssc:StreamingContext ,
topicsSet:Set[String]): DStream[java.util.Map[String,String]] = {
val converter = {json:String =>
var res : java.util.Map[String,String] = null
try {
res = com.alibaba.fastjson.JSON.parseObject(json,
new TypeReference[java.util.Map[String, String]]() {})
} catch {
case e: Exception => logError(s"解析topic ${topicsSet}, 的記錄 ${json} 失敗。", e)
}
res
}
createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
}
/**
*
* @param ssc
* @param topicsSet
* @return
*/
def createJsonToJavaBeanDirectStreamWithOffset(ssc:StreamingContext ,
topicsSet:Set[String]): DStream[Object] = {
val converter = {json:String =>
var res : Object = null
try {
res = com.alibaba.fastjson.JSON.parseObject(json,
new TypeReference[Object]() {})
} catch {
case e: Exception => logError(s"解析topic ${topicsSet}, 的記錄 ${json} 失敗。", e)
}
res
}
createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
}
def createStringDirectStreamWithOffset(ssc:StreamingContext ,
topicsSet:Set[String]): DStream[String] = {
val converter = {json:String =>
json
}
createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
}
/**
* 讀取JSON的流 并将JSON流 轉為MAP流 并且這個流支援RDD向zookeeper中記錄消費資訊
* @param ssc spark ssc
* @param topicsSet topic 集合 支援從多個kafka topic同時讀取資料
* @return DStream[java.util.Map[String,String
*/
def createJsonToJMapStringDirectStreamWithOffset(ssc:StreamingContext , topicsSet:Set[String]): DStream[java.util.Map[String,String]] = {
val converter = {json:String =>
var res : java.util.Map[String,String] = null
try {
res = com.alibaba.fastjson.JSON.parseObject(json, new TypeReference[java.util.Map[String, String]]() {})
} catch {
case e: Exception => logError(s"解析topic ${topicsSet}, 的記錄 ${json} 失敗。", e)
}
res
}
createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
}
/**
* 讀取JSON的流 并将JSON流 轉為MAP流 并且這個流支援RDD向zookeeper中記錄消費資訊
* @param ssc spark ssc
* @param topicsSet topic 集合 支援從多個kafka topic同時讀取資料
* @return DStream[java.util.Map[String,String
*/
def createJsonToJMapStringDirectStreamWithoutOffset(ssc:StreamingContext , topicsSet:Set[String]): DStream[java.util.Map[String,String]] = {
val converter = {json:String =>
var res : java.util.Map[String,String] = null
try {
res = com.alibaba.fastjson.JSON.parseObject(json, new TypeReference[java.util.Map[String, String]]() {})
} catch {
case e: Exception => logError(s"解析topic ${topicsSet}, 的記錄 ${json} 失敗。", e)
}
res
}
createDirectStreamWithOffset(ssc, topicsSet, converter).filter(_ != null)
}
}
object KafkaManager extends Logging{
def apply(broker:String, groupId:String = "default",
numFetcher:Int = 1, offset:String = "smallest",
autoUpdateoffset:Boolean = true): KafkaManager ={
new KafkaManager(
createKafkaParam(broker, groupId, numFetcher, offset),
autoUpdateoffset)
}
def createKafkaParam(broker:String, groupId:String = "default",
numFetcher:Int = 1, offset:String = "smallest"): Map[String, String] ={
//建立 stream 時使用的 topic 名字集合
Map[String, String](
"metadata.broker.list" -> broker,
"auto.offset.reset" -> offset,
"group.id" -> groupId,
"num.consumer.fetchers" -> numFetcher.toString)
}
}
- 建構消費入口:
object Kafka2esStreaming extends Serializable with Logging{
//擷取資料類型
private val dataTypes: util.Set[String] = DataTypeProperties.dataTypeMap.keySet()
val kafkaConfig: Properties = ConfigUtil.getInstance().getProperties("kafka/kafka-server-config.properties")
def main(args: Array[String]): Unit = {
val topics = args(1).split(",")
val ssc = SparkContextFactory.newSparkStreamingContext("Kafka2esStreaming", java.lang.Long.valueOf(10))
//建構kafkaManager
val kafkaManager = new KafkaManager(
Spark_Kafka_ConfigUtil.getKafkaParam(kafkaConfig.getProperty("metadata.broker.list"), "TZ3")
)
//使用kafkaManager建立DStreaming流
val kafkaDS = kafkaManager.createJsonToJMapStringDirectStreamWithOffset(ssc, topics.toSet)
//添加一個日期分組字段
//如果資料其他的轉換,可以先在這裡進行統一轉換
.map(map=>{
map.put("index_date",TimeTranstationUtils.Date2yyyyMMddHHmmss(java.lang.Long.valueOf(map.get("collect_time")+"000")))
map
}).persist(StorageLevel.MEMORY_AND_DISK)
//使用par并發集合可以是任務并發執行。在資源充足的情況下
dataTypes.foreach(datatype=>{
//過濾出單個類别的資料種類
val tableDS = kafkaDS.filter(x=>{datatype.equals(x.get("table"))})
Kafka2EsJob.insertData2Es(datatype,tableDS)
})
ssc.start()
ssc.awaitTermination()
}
}