park streaming流式處理kafka中的資料,第一步是先把資料接收過來,轉換為spark streaming中的資料結構Dstream。接收資料的方式有兩種:1.利用Receiver接收資料,2.直接從kafka讀取資料。
一、Receiver方式消費kafka
這種方式利用接收器(Receiver)來接收kafka中的資料,其最基本是使用Kafka高階使用者API接口。對于所有的接收器,receiver從kafka接收來的資料會存儲在spark的executor記憶體中(如果突然資料暴增,大量batch堆積,很容易出現記憶體溢出的問題),之後spark streaming送出的job會處理這些資料。
在預設的配置下,這種方式可能會因為底層的失敗而丢失資料。如果要啟用高可靠機制,讓資料零丢失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地将接收到的Kafka資料寫入分布式檔案系統(比如HDFS)上的預寫日志中。是以,即使底層節點出現了失敗,也可以使用預寫日志中的資料進行恢複。
Receiver方式如下圖:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczX0xiRGZkRGZ0Xy9GbvNGL2EzXlpXazxSPV1GZ2h3RilGZtJGasNDTwYVbiVHNHpleO1GTulzRilWO5xkNNh0YwIFSh9Fd4VGdsATMfd3bkFGazxyaHRGcWdUYuVzVa9GczoVdG1mWfVGc5RHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5COyAjMxMjM4EDNwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
使用Receiver方式消費kafka需要注意的點:
- 1、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。是以,在KafkaUtils.createStream()中,提高partition的數量,隻會增加一個Receiver中,讀取partition的線程的數量(隻是增加資料拉取的并行度,不是資料處理)。不會增加Spark處理資料的并行度。
- 2、可以建立多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收資料,之後可以利用union來統一成一個Dstream。
- 3、如果基于容錯的檔案系統,比如HDFS,啟用了預寫日志機制,接收到的資料都會被複制一份到預寫日志中。是以,在KafkaUtils.createStream()中,設定的持久化級别是StorageLevel.MEMORY_AND_DISK_SER。
二、Direct方式消費kafka
在spark1.3之後,引入了Direct方式。不同于Receiver的方式,Direct方式沒有receiver這一層,其會周期性的擷取Kafka中每個topic的每個partition中的最新offsets,進而定義每個batch的offset的範圍。當處理資料的job啟動時,就會使用Kafka的簡單consumer api來擷取Kafka指定offset範圍的資料。
Direct方式如下圖:
使用Direct方式的優勢:
- 1、簡化并行讀取:如果要讀取多個partition,不需要建立多個輸入DStream然後對它們進行union操作。Spark會建立跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取資料。是以Direct方式在Kafka partition和RDD partition之間,有一個一對一的映射關系。
- 2、高性能:如果要保證零資料丢失,在基于receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為資料實際上被複制了兩份,Kafka自己本身就有高可靠的機制,會對資料複制一份,而這裡又會複制一份到WAL中。而基于direct的方式,不依賴Receiver,不需要開啟WAL機制,隻要Kafka中作了資料的複制,那麼就可以通過Kafka的副本進行恢複。
- 3、一次且僅一次的事務機制:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中擷取offset值,這也是傳統的從Kafka中讀取資料的方式,但由于Spark Streaming消費的資料和Zookeeper中記錄的offset不同步,這種方式偶爾會造成資料重複消費。而第二種方式,直接使用了簡單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進行記錄,消除了這種不一緻性。(當然,offset自己記錄的話,可以利用checkpoint、資料庫或檔案記錄或者回寫到zookeeper中或者調用API寫入Kafka的topic中進行記錄)
三、消費kafka時offset送出
首先是思路,有好幾種方式:
- 1、将offset手動維護到外部媒體中,如Zookeeper、mysql、redis。
- 2、将offset通過foreachRDD的方式維護到kafka中。
- 3、通過監聽的方式将offset維護到kafka中,當然也可以是Zookeeper、mysql、redis等外部媒體。
11SparkStreaming消費kafka以及offset送出一、Receiver方式消費kafka二、Direct方式消費kafka三、消費kafka時offset送出四、手動維護offset簡單代碼示範維護到zookeeper五、kafka中earliest/latest詳解 11SparkStreaming消費kafka以及offset送出一、Receiver方式消費kafka二、Direct方式消費kafka三、消費kafka時offset送出四、手動維護offset簡單代碼示範維護到zookeeper五、kafka中earliest/latest詳解 建議使用監聽的方式進行維護,因為kafka和SparkStreaming中維護的分區的對應關系,直接使用foreachRDD的當時也可以,但是這樣會将業務邏輯置于RDD中進行處理,喪失了SparkStreaming特有的算子特性,例如視窗算子之類的,使用監聽方式,繼承Listene中的onBatchCompleted方法,在該方法中實作壽佛那個維護offset即可。
注意:手動維護offset的時候,最好做一下任務是否有報錯的判斷,防止丢數,當任務有失敗時,不送出offset。
四、手動維護offset簡單代碼示範
維護到zookeeper
- 使用到了commitAsync() api直接操作Zookeeper
在Kafka 0.10+版本中,offset的預設存儲由ZooKeeper移動到了一個自帶的topic中,名為__consumer_offsets。Spark Streaming也專門提供了commitAsync() API用于送出offset。使用方法如下。
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 確定結果都已經正确且幂等地輸出了
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
維護到zookeeper
- 使用到了KafkaCluster 中的api直接操作Zookeeper
package com.bigdata.spark
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
*
* kafka對接sparkstreaming,手動維護offset到zookeeper
*/
object KafkaStreaming {
def main(args: Array[String]): Unit = {
//初始化ssc
val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
ssc.sparkContext.setLogLevel("ERROR")
//kafka參數
val brokers = "linux1:9092,linux2:9092,linux3:9092"
val topic = "first"
val group = "bigdata"
val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
val kafkaParams = Map(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization)
//建立KafkaCluster對象,維護offset
val cluster = new KafkaCluster(kafkaParams)
//擷取初始偏移量
val fromOffset: Map[TopicAndPartition, Long] = getOffset(cluster, group, topic)
//建立流
val kafkaStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc, kafkaParams, fromOffset, (mess: MessageAndMetadata[String, String]) => mess.message())
//轉換邏輯
kafkaStream.map((_, 1)).reduceByKey(_ + _).print()
//送出offset
setOffset(cluster, kafkaStream, group)
ssc.start()
ssc.awaitTermination()
}
/**
* 擷取偏移量
*
* @param cluster
* @param group
* @param topic
* @return
*/
def getOffset(cluster: KafkaCluster, group: String, topic: String) = {
var partitionToLong = new mutable.HashMap[TopicAndPartition, Long]()
//擷取所有主題的分區
val topicAndPartition: Either[Err, Set[TopicAndPartition]] = cluster.getPartitions(Set(topic))
val partitions: Set[TopicAndPartition] = topicAndPartition.right.get
//擷取偏移量資訊
val offsetInfo: Either[Err, Map[TopicAndPartition, Long]] = cluster.getConsumerOffsets(group, partitions)
if (offsetInfo.isRight) {
// 如果有offset資訊則存儲offset
val offsets: Map[TopicAndPartition, Long] = offsetInfo.right.get
for (offset <- offsets) {
partitionToLong += offset
}
} else {
//如果沒有則設定為0
for (p <- partitions) {
partitionToLong += (p -> 0L)
}
}
partitionToLong.toMap
}
/**
* 送出偏移量
*
* @param cluster
* @param kafkaStream
* @param group
*/
def setOffset(cluster: KafkaCluster, kafkaStream: InputDStream[String], group: String): Unit = {
kafkaStream.foreachRDD { rdd =>
val offsetRangeArray = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offset <- offsetRangeArray) {
val ack: Either[Err, Map[TopicAndPartition, Short]] = cluster.setConsumerOffsets(group, Map(offset.topicAndPartition() -> offset.untilOffset))
if (ack.isRight) {
println(s"成功更新了消費kafka的偏移量:${offset.untilOffset}")
} else {
println(s"失敗更新消費kafka的偏移量:${ack.left.get}")
}
}
}
}
}
維護到mysql
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{OffsetRange, _}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.kafka.common.TopicPartition
import java.sql.{DriverManager, ResultSet}
import scala.collection.mutable
/**
* 手動維護偏移量offset到MySQL資料庫中
*/
object SparkKafkaOffset {
def main(args: Array[String]): Unit = {
//1.準備環境
val conf = new SparkConf().setAppName("offset").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//五秒中切分一次資料形成一個RDD
val ssc = new StreamingContext(sc,Seconds(5))
//設定連接配接Kafka的參數
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "cdh01:9092,cdh02:9092,cdh03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SparkKafkaOffset",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark_kafka")
//2.使用KafkaUtil連接配接Kafak擷取資料
val offsetMap: mutable.Map[TopicPartition, Long] = OffsetUtil.getOffsetMap("SparkKafkaOffset","spark_kafka")
val recordDStream: InputDStream[ConsumerRecord[String, String]] = if(offsetMap.size > 0){
//有記錄offset,從該offset處開始消費
KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,//位置政策:該政策,會讓Spark的Executor和Kafka的Broker均勻對應
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetMap))//消費政策
}else{
//MySQL中沒有記錄offset,則直接連接配接,從latest開始消費
KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
}
//3.操作資料
//注意:我們要自己手動維護偏移量,也就意味着,消費了一小批資料就應該送出一次offset
//而這一小批資料在DStream的表現形式就是RDD,是以我們需要對DStream中的RDD進行操作
//而對DStream中的RDD進行操作的API有transform(轉換)和foreachRDD(動作)
recordDStream.foreachRDD(rdd=>{
if(rdd.count() > 0){//目前這一時間批次有資料
rdd.foreach(record => println("接收到的Kafk發送過來的資料為:" + record))
//接收到的Kafk發送過來的資料為:ConsumerRecord(topic = spark_kafka, partition = 1, offset = 6, CreateTime = 1565400670211, checksum = 1551891492, serialized key size = -1, serialized value size = 43, key = null, value = hadoop spark ...)
//注意:通過列印接收到的消息可以看到,裡面有我們需要維護的offset,和要處理的資料
//接下來可以對資料進行處理....或者使用transform傳回和之前一樣處理
//維護offset:為了友善我們對offset的維護/管理,spark提供了一個類,幫我們封裝offset的資料
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges){
println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset}")
}
//手動送出offset,預設送出到Checkpoint中
//recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
OffsetUtil.saveOffsetRanges("SparkKafkaDemo",offsetRanges)
}
})
/* val lineDStream: DStream[String] = recordDStream.map(_.value())//_指的是ConsumerRecord
val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是發過來的value,即一行資料
val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1))
val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
result.print()*/
ssc.start()//開啟
ssc.awaitTermination()//等待優雅停止
}
/*
手動維護offset的工具類
首先在MySQL建立如下表
CREATE TABLE `t_offset` (
`topic` varchar(255) NOT NULL,
`partition` int(11) NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint(20) DEFAULT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
*/
object OffsetUtil {
/**
* 從資料庫讀取偏移量
*/
def getOffsetMap(groupid: String, topic: String) = {
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?characterEncoding=UTF-8", "root", "123456")
val pstmt = connection.prepareStatement("select * from t_offset where groupid=? and topic=?")
pstmt.setString(1, groupid)
pstmt.setString(2, topic)
val rs: ResultSet = pstmt.executeQuery()
val offsetMap = mutable.Map[TopicPartition, Long]()
while (rs.next()) {
offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset")
}
rs.close()
pstmt.close()
connection.close()
offsetMap
}
/**
* 将偏移量儲存到資料庫
*/
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?characterEncoding=UTF-8", "root", "root")
//replace into表示之前有就替換,沒有就插入
val pstmt = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)")
for (o <- offsetRange) {
pstmt.setString(1, o.topic)
pstmt.setInt(2, o.partition)
pstmt.setString(3, groupid)
pstmt.setLong(4, o.untilOffset)
pstmt.executeUpdate()
}
pstmt.close()
connection.close()
}
}
}
維護到redis
-
使用資料結構string,其中key為topic:partition,value為offset。
例如bobo這個topic下有3個分區,則key-value結構如下:
bobo:0的偏移量為10
bobo:1的偏移量為12
bobo:2的偏移量為11
代碼如下:
- 消費時指定offset
/**
* kakfa參數
*/
private val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
// 注意這裡是none。
"auto.offset.reset" -> "none",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// `bobo`topic下有3個分區
private val topicPartitions = Map[String, Int]("bobo" -> 3)
// 從redis中擷取offsets
def getOffsets: Map[TopicPartition, Long] = {
val jedis = InternalRedisClient.getResource
// 設定每個分區起始的offset
val offsets = mutable.Map[TopicPartition, Long]()
topicPartitions.foreach { it =>
val topic = it._1
val partitions = it._2
// 周遊分區,設定每個topic下對應partition的offset
for (partition <- 0 until partitions) {
val topicPartitionKey = topic + ":" + partition
var lastOffset = 0L
val lastSavedOffset = jedis.get(topicPartitionKey)
if (null != lastSavedOffset) {
try {
lastOffset = lastSavedOffset.toLong
} catch {
case e: Exception =>
log.error("get lastSavedOffset error", e)
System.exit(1)
}
}
log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)
// 添加
offsets += (new TopicPartition(topic, partition) -> lastOffset)
}
}
InternalRedisClient.returnResource(jedis)
offsets.toMap
}
/**
* 建立kakfa流
*
* @param ssc StreamingContext
* @return InputDStream
*/
def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
val offsets = getOffsets
// 建立kafka stream
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)
)
stream
}
其中:核心是通過ConsumerStrategies.Assign方法來指定topic下對應partition的offset資訊。
- 更新offset到redis
/**
* 消費
*
* @param stream InputDStream
*/
def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
stream.foreachRDD { rdd =>
// 擷取offset資訊
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 計算相關名額,這裡就統計下條數了
val total = rdd.count()
val jedis = InternalRedisClient.getResource
val pipeline = jedis.pipelined()
// 會阻塞redis
pipeline.multi()
// 更新相關名額
pipeline.incrBy("totalRecords", total)
// 更新offset
offsetRanges.foreach { offsetRange =>
log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)
val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition
pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")
}
// 執行,釋放
pipeline.exec()
pipeline.sync()
pipeline.close()
InternalRedisClient.returnResource(jedis)
}
}
其中使用到了:jedis.pipelined() 、pipeline.multi()。
五、kafka中earliest/latest詳解
-
earliest
當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,從頭開始消費
-
latest
當各分區下有已送出的offset時,從送出的offset開始消費;無送出的offset時,消費新産生的該分區下的資料