1 Spark Streaming讀取Kafka的兩種模式
Spark Streaming消費Kafka的資料有兩種模式:Receiver和Direct模式,學習時候重點關注下Direct即可,因為在最新讀取方式中已經不支援Receiver。
1.1 Receiver模式
在Spark 1.3之前,Spark Streaming消費Kafka中的資料采用基于Kafka進階消費API實作的Receiver模式,如圖1所示。首先是Receiver從Kafka中消費資料并存儲到Spark Executor中,然後由Spark Streaming啟動的Job将處理資料。為了保證高可用,可以啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL),将接收到的Kafka資料同步寫到外部存儲中(如HDFS)。當系統發生故障适合,可以重新将資料加載到系統中,進而避免出現資料丢失的問題。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLyUleOVTWE5UMNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zRQpkL3cDO1QDMxMjM3IDNwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
圖 1(來自百度圖檔)
1.2 Direct模式
在Spark 1.3及以後版本中,Spark推薦無Receiver的Direct模式,如圖2所示。首先Spark周期性地查詢Kafka中自己訂閱的Topic下每個分區的最新offset,通過對比上一批次的offset,進而擷取本批次offset的範圍。然後啟動處理資料的Job, 最後使用Kafka低級消費API去拉取本批次的資料。
圖 2(來自百度圖檔)
1.3 兩種接收方式的對比
在最新的API中,Receiver模式已經被廢棄。Spark的官網上是推薦Direct模式,理由如下:
1) 簡化并行度
Receiver模式中,Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。是以在KafkaUtils.createStream()中提高partition的數量,隻會增加一個Receiver中讀取partition的線程數,不會增加Spark處理資料的并行度
Direct模式中,将建立與要使用的Kafka分區一樣多的RDD分區,所有這些分區都将從Kafka并行讀取資料。是以,Kafka和RDD分區之間存在一對一的映射,這更易于了解和調整。
2) 高效率
Receiver模式中為了保證高可用,需要啟用WAL機制,在KafkaUtils.createStream()中設定的持久化級别是StorageLevel.MEMORY_AND_DISK_SER,将資料複制一份到外存中。在處理大資料情況下,很容易成為系統瓶頸。
Direct模式中,為了保證不丢失資料,不需要使用WAL這種重量級的儲存機制,隻需要将上一批次的Offset儲存即可。如果系統挂掉,Spark會重新讀取上一次儲存的Offset,進而保證資料的零丢失。
3) 占用資源低
Receiver不處理資料,隻是持續不斷的接受資料,與其他Exector是異步的,但卻會占用系統資源。在配置設定相同cup和記憶體資源的前提下,Receiver會占用Cpu時間,影響Spark的處理速度,如果資料量大,Receiver會占用更大記憶體。
2 使用Docker安裝實驗環境
以前部落客一般是用虛拟機來從頭到尾來安裝各種大資料元件,每次學半天都是在安裝,但是公司中大資料平台都是運維已經安裝好的,我們每天要做的更多是在平台上開發業務,是以學習時候使用Docker快速安裝就顯得很有意義。
2.1 安裝Reids
docker pull redislabs/rebloom
docker run -d -p 6379:6379 redislabs/rebloom
2.2 安裝Zookeeper
docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
2.3 安裝Kafka
docker pull wurstmeister/kafka
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.150:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.150:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
這裡2個ip分别是zk和kafka所在裝置的ip。
2.4 檢視運作的容器
2.5 注意事項
使用docker最好将鏡像倉庫配成阿裡雲,否詞下載下傳會很慢,配置方法網上很多,本文不再累述。
3 消費語義
在上Offset管理代碼之前,有個重要概念需要交代下,就是消費語義。Spark Streaming從Kafka broker中拉取資料時候,有三種消費語意:
3.1 至少消費一次
從字面上看就是會有重複資料。Spark從Kafka拉取資料進行處理後,如果offset還沒有儲存而系統挂掉時候,就會重新從上一次的Offset處開始消費,會造成至少消費一次。
3.2 恰好消費一次
Kafka broker存儲的資料隻會被消費一次。實作這種模式主要有2種方法
a 在至少消費一次的基礎上,下遊系統自己做幂等。比如有2條uuid一樣的資料被Spark處理且存在資料庫,下遊系統讀取的時候要根據uuid去重。
b 使用資料庫的事務。保證資料處理入庫和送出offset這2個操作處于同一個事務之中。
3.3 最多消費一次
簡單點說就是資料可能發生丢失。比如使用自動送出offset時,因為是定時送出offset,有可能剛拉去到一批資料,還沒進行處理,系統就自動幫你送出了offset,而你在處理過程中挂了,這就造成了資料丢失。
4 使用Redis管理Kafka Offset(支援多Topiic)
4.1 向Kafka中寫入資料
1)建立topic
import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 建立主題
*/
public class CreatTopicService {
public static final String brokerList = "192.168.2.50:9092";
public static final String topic = "first";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient client = AdminClient.create(properties);
NewTopic newTopic = new NewTopic(topic, 2, (short) 1);
CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
try {
result.all().get();
} catch (InterruptedException | ExecutionException e){
e.printStackTrace();
}finally{
if(client != null){
client.close() ;
}
}
}
}
2)向Kafka中寫入資料
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class ProducerService {
public static final String brokerList = "localhost:9092";
public static final String topic = "first";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
//自己直生産者用戶端參數并建立KafkaProducer執行個體
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//建構所需妥發送的消息
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello Spark!");
try {
//發送消息3種方法
//1)發後即忘
// producer.send(record);
//2)同步
// Future<RecordMetadata> future = producer.send(record);
// RecordMetadata metadata = future.get();
// System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
//3)異步
for (int i = 0; i < 200; i++){
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null){
exception.printStackTrace();
}else{
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
//關閉生産着用戶端執行個體
if(producer != null){
producer.close();
}
}
}
}
4.2 基于Jedis的Redis讀寫工具
package com.dhhy.redis
import redis.clients.jedis.{JedisPool, JedisPoolConfig,Jedis}
/**
* Redis連接配接工具
*/
object RedisUtil {
@volatile private var jedisPool:JedisPool = null
/**
* 擷取Jedis執行個體
*/
def getResource()={
if(jedisPool == null){
synchronized(
if(jedisPool == null){
val config = new JedisPoolConfig
config.setMaxTotal(5)
config.setMaxIdle(2)
config.setMaxWaitMillis(3000)
jedisPool = new JedisPool(config,"127.0.0.1",6379);
}
)
}
jedisPool.getResource
}
/**
* 釋放jedis連接配接資源
* @param jedis
*/
def returnResource(jedis: Jedis) = {
if (jedis != null) jedis.close()
}
/**
* 原子計數器
* @param key
* @param step
* @return
*/
def incrBy(key: String ,step: Long): Long ={
val jedis = getResource()
val count = jedis.incrBy(key, step)
returnResource(jedis)
count
}
4.3 Spark消費Kafka的代碼
import com.dhhy.redis.RedisUtil
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
/**
* 使用redis管理kafka Offset
*/
object KafkaOffseManageDemo2 {
val LOG = LoggerFactory.getLogger(KafkaOffseManageDemo.getClass)
/**
* 從Redis擷取上一次儲存的偏移量
* @param topics
* @return
*/
def getOffsets(topics: Array[String]): Map[TopicPartition, Long] = {
val fromOffsets = collection.mutable.Map.empty[TopicPartition, Long]
import scala.collection.JavaConversions._
val jedis = RedisUtil.getResource()
topics.foreach(topic => {
jedis.hgetAll(topic).foreach(kv =>{
fromOffsets += (new TopicPartition(topic, kv._1.toInt) -> kv._2.toLong)
})
})
RedisUtil.returnResource(jedis)
fromOffsets.toMap
}
def main(args: Array[String]): Unit = {
val LOG = LoggerFactory.getLogger(KafkaOffseManageDemo2.getClass)
//設定日志級别
Logger.getLogger("org").setLevel(Level.INFO)
//配置項
val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaOffseManageDemo")
//建立流式上下文,設定批處理間隔
val ssc = new StreamingContext(conf, Seconds(5))
//kafka配置參數
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092", //kafka叢集位址
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], //序列化類型,此處為字元類型
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], //序列化類型,此處為字元類型
ConsumerConfig.GROUP_ID_CONFIG -> "KafkaOffseManageDemo1",//Kafka消費組l
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", //讀取最新offset
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)// 是否向定時向zookeeper寫入每個分區的offse
)
//消費的topic
//如果消費多個topic,則val topics = Array(topic1,topic2,…)
val topics = Array("first")
val offsets = getOffsets(topics)
// 初始化輸入流
val stream = if(offsets.isEmpty){
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
}else{
KafkaUtils.createDirectStream(ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams, offsets))
}
stream.foreachRDD{rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val jedis = RedisUtil.getResource()
val pipeline = jedis.pipelined()
iter.foreach { msg =>
msg.value().split(" ").foreach (f =>
if (f == "Hello"){
pipeline.incr("Hello")
}
)
}
val o = offsetRanges(TaskContext.get.partitionId)
//列印偏移量資訊
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
//儲存Kafka的偏移量
pipeline.hset(o.topic, o.partition + "", o.untilOffset + "")
pipeline.sync()
RedisUtil.returnResource(jedis)
}
}
ssc.start()
ssc.awaitTermination()
}
}
4.4 測試結果
登陸redis的用戶端,檢視key為Hello的值,可以發現kafka 生産者一共寫入了200個”Hello”
5 參考文獻
1)Spark-Streaming擷取kafka資料的兩種方式-Receiver與Direct的方式
https://blog.csdn.net/kwu_ganymede/article/details/50314901
2) Apache spark官方文檔
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
3) 耿嘉安,Spark核心設計的藝術:架構設計與實作 第1版. 2018, 機械工業出版社.