本博文講述的内容主要包括:
1,SparkStreaming on Kafka Receiver 工作原理機制
2,SparkStreaming on Kafka Receiver案例實戰
3,SparkStreaming on Kafka Receiver源碼解析
一:SparkStreaming on Kafka Receiver 簡介:
1、Spark-Streaming擷取kafka資料的兩種方式-Receiver與Direct的方式,可以從代碼中簡單了解成Receiver方式是通過zookeeper來連接配接kafka隊列,Direct方式是直接連接配接到kafka的節點上擷取資料了。
2、基于Receiver的方式:
這種方式使用Receiver來擷取資料。Receiver是使用Kafka的高層次Consumer API來實作的。receiver從Kafka中擷取的資料都是存儲在Spark Executor的記憶體中的,然後Spark Streaming啟動的job會去處理那些資料。
然而,在預設的配置下,這種方式可能會因為底層的失敗而丢失資料。如果要啟用高可靠機制,讓資料零丢失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地将接收到的Kafka資料寫入分布式檔案系統(比如HDFS)上的預寫日志中。是以,即使底層節點出現了失敗,也可以使用預寫日志中的資料進行恢複。
補充說明:
(1)、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。是以,在KafkaUtils.createStream()中,提高partition的數量,隻會增加一個Receiver中,讀取partition的線程的數量。不會增加Spark處理資料的并行度。
(2)、可以建立多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收資料。
(3)、如果基于容錯的檔案系統,比如HDFS,啟用了預寫日志機制,接收到的資料都會被複制一份到預寫日志中。是以,在KafkaUtils.createStream()中,設定的持久化級别是StorageLevel.MEMORY_AND_DISK_SER。
SparkStreaming on Kafka Receiver 工作原理圖如下所示:

二、SparkStreaming on Kafka Receiver案例實戰:
1、在進行SparkStreaming on Kafka Receiver案例的環境前提:
(1)spark 安裝成功,spark 1.6.0(local方式除外)
(2)zookeeper 安裝成功
(3)kafka 安裝成功
(4)啟動叢集和zookeeper和kafka
在這裡我采用local的方式進行試驗,代碼如下:
public class SparkStreamingOnKafkaReceiver {
public static void main(String[] args) {
/* 第一步:配置SparkConf:
1,至少兩條線程因為Spark Streaming應用程式在運作的時候至少有一條線程用于
不斷地循環接受程式,并且至少有一條線程用于處理接受的資料(否則的話有線程用于處理資料,随着時間的推移記憶體和磁盤都會
不堪重負)
2,對于叢集而言,每個Executor一般肯定不止一個線程,那對于處理SparkStreaming
應用程式而言,每個Executor一般配置設定多少Core比較合适?根據我們過去的經驗,5個左右的Core是最佳的(一個段子配置設定為奇數個Core表現最佳,例如3個,5個,7個Core等)
*/
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparStreamingOnKafkaReceiver");
// SparkConf conf = new //SparkConf().setMaster("spark://Master:7077").setAppName(" //SparStreamingOnKafkaReceiver");
/* 第二步:建立SparkStreamingContext,
1,這個是SparkStreaming應用春香所有功能的起始點和程式排程的核心
SparkStreamingContext的建構可以基于SparkConf參數也可以基于持久化的SparkStreamingContext的内容
// 來恢複過來(典型的場景是Driver崩潰後重新啟動,由于SparkStreaming具有連續7*24
小時不間斷運作的特征,是以需要Driver重新啟動後繼續上一次的狀态,此時的狀态恢複需要基于曾經的Checkpoint))
2,在一個Sparkstreaming 應用程式中可以建立若幹個SparkStreaming對象,使用下一個SparkStreaming
之前需要把前面正在運作的SparkStreamingContext對象關閉掉,由此,我們擷取一個重大的啟發
我們獲得一個重大的啟發SparkStreaming也隻是SparkCore上的一個應用程式而已,隻不過SparkStreaming架構想運作的話需要
*/ spark工程師寫業務邏輯
@SuppressWarnings("resource")
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds());
/* 第三步:建立SparkStreaming輸入資料來源input Stream
1,資料輸入來源可以基于File,HDFS,Flume,Kafka-socket等
2,在這裡我們指定資料來源于網絡Socket端口,SparkStreaming連接配接上該端口并在運作時候一直監聽
該端口的資料(當然該端口服務首先必須存在,并且在後續會根據業務需要不斷地資料産生當然對于SparkStreaming
應用程式的而言,有無資料其處理流程都是一樣的);
3,如果經常在每個5秒鐘沒有資料的話不斷地啟動空的Job其實會造成排程資源的浪費,因為并沒有資料發生計算
是以實際的企業級生成環境的代碼在具體送出Job前會判斷是否有資料,如果沒有的話就不再送出資料
在本案例中具體參數含義:
第一個參數是StreamingContext執行個體,
第二個參數是zookeeper叢集資訊(接受Kafka資料的時候會從zookeeper中擷取Offset等中繼資料資訊)
第三個參數是Consumer Group
*/ 第四個參數是消費的Topic以及并發讀取Topic中Partition的線程數
Map<String,Integer> topicConsumerConcurrency = new HashMap<String,Integer>();
topicConsumerConcurrency.put("HelloKafakaFromSparkStreaming",);//這裡2個的話是指2個接受的線程
JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jsc,
"Master:2181,Worker1:2181,Worker2:2181",
"MyFirstConsumerGrou",
topicConsumerConcurrency);
/*
* 第四步:接下來就像對于RDD程式設計一樣,基于DStream進行程式設計!!!原因是Dstream是RDD産生的模闆(或者說類
* ),在SparkStreaming發生計算前,其實質是把每個Batch的Dstream的操作翻譯成RDD的操作
* 對初始的DTStream進行Transformation級别處理
* */
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>,String>(){ //如果是Scala,由于SAM裝換,可以寫成val words = lines.flatMap{line => line.split(" ")}
@Override
public Iterable<String> call(Tuple2<String,String> tuple) throws Exception {
return Arrays.asList(tuple._2.split(" "));//将其變成Iterable的子類
}
});
// 第四步:對初始DStream進行Transformation級别操作
//在單詞拆分的基礎上對每個單詞進行執行個體計數為1,也就是word => (word ,1 )
JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word,);
}
});
//對每個單詞事例技術為1的基礎上對每個單詞在檔案中出現的總次數
JavaPairDStream<String,Integer> wordsCount = pairs.reduceByKey(new Function2<Integer,Integer,Integer>(){
/**
*
*/
private static final long serialVersionUID = L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
/*
* 此處的print并不會直接出發Job的支援,因為現在一切都是在SparkStreaming的架構控制之下的
* 對于spark而言具體是否觸發真正的JOb運作是基于設定的Duration時間間隔的
* 諸位一定要注意的是Spark Streaming應用程式要想執行具體的Job,對DStream就必須有output Stream操作
* output Stream有很多類型的函數觸發,類print,savaAsTextFile,scaAsHadoopFiles等
* 其實最為重要的一個方法是foreachRDD,因為SparkStreaming處理的結果一般都會放在Redis,DB
* DashBoard等上面,foreach主要就是用來完成這些功能的,而且可以自定義具體的資料放在哪裡!!!
* */
wordsCount.print();
// SparkStreaming 執行引擎也就是Driver開始運作,Driver啟動的時候位于一條新線程中的,當然
// 其内部有消息接受應用程式本身或者Executor中的消息
jsc.start();
jsc.close();
}
}
2、SparkStreaming on Kafka Receiver運作在叢集上的步驟及結果:
1,首先啟動zookeeper服務:
2,接下來啟動Kafka服務
3,在eclipse上觀察結果:
三:SparkStreaming on Kafka Receiver源碼解析
1,首先看一下KafkaUtils(包含zookeeper的配置等等):
2、在這裡建立了KafkaInputDStream:
3、這裡證明KafkaInputStream為consumer
4、在這裡擁有線程池(處理topic)
5,不同的接受方式(第一個為wal方式)
補充說明:
使用Spark Streaming可以處理各種資料來源類型,如:資料庫、HDFS,伺服器log日志、網絡流,其強大超越了你想象不到的場景,隻是很多時候大家不會用,其真正原因是對Spark、spark streaming本身不了解。
博文内容源自DT大資料夢工廠Spark課程。相關課程内容視訊可以參考:
百度網盤連結:http://pan.baidu.com/s/1slvODe1(如果連結失效或需要後續的更多資源,請聯系QQ460507491或者微信号:DT1219477246 擷取上述資料)。