天天看點

Spark streaming基于kafka 以Receiver方式擷取資料 原理和案例實戰

本博文講述的内容主要包括:

1,SparkStreaming on Kafka Receiver 工作原理機制

2,SparkStreaming on Kafka Receiver案例實戰

3,SparkStreaming on Kafka Receiver源碼解析

一:SparkStreaming on Kafka Receiver 簡介:

1、Spark-Streaming擷取kafka資料的兩種方式-Receiver與Direct的方式,可以從代碼中簡單了解成Receiver方式是通過zookeeper來連接配接kafka隊列,Direct方式是直接連接配接到kafka的節點上擷取資料了。

2、基于Receiver的方式:

這種方式使用Receiver來擷取資料。Receiver是使用Kafka的高層次Consumer API來實作的。receiver從Kafka中擷取的資料都是存儲在Spark Executor的記憶體中的,然後Spark Streaming啟動的job會去處理那些資料。

然而,在預設的配置下,這種方式可能會因為底層的失敗而丢失資料。如果要啟用高可靠機制,讓資料零丢失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地将接收到的Kafka資料寫入分布式檔案系統(比如HDFS)上的預寫日志中。是以,即使底層節點出現了失敗,也可以使用預寫日志中的資料進行恢複。

補充說明:

(1)、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。是以,在KafkaUtils.createStream()中,提高partition的數量,隻會增加一個Receiver中,讀取partition的線程的數量。不會增加Spark處理資料的并行度。

(2)、可以建立多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收資料。

(3)、如果基于容錯的檔案系統,比如HDFS,啟用了預寫日志機制,接收到的資料都會被複制一份到預寫日志中。是以,在KafkaUtils.createStream()中,設定的持久化級别是StorageLevel.MEMORY_AND_DISK_SER。

SparkStreaming on Kafka Receiver 工作原理圖如下所示:

Spark streaming基于kafka 以Receiver方式擷取資料 原理和案例實戰

二、SparkStreaming on Kafka Receiver案例實戰:

1、在進行SparkStreaming on Kafka Receiver案例的環境前提:

(1)spark 安裝成功,spark 1.6.0(local方式除外)

(2)zookeeper 安裝成功

(3)kafka 安裝成功

(4)啟動叢集和zookeeper和kafka

在這裡我采用local的方式進行試驗,代碼如下:

public class SparkStreamingOnKafkaReceiver {

    public static void main(String[] args) {
/*      第一步:配置SparkConf:
        1,至少兩條線程因為Spark Streaming應用程式在運作的時候至少有一條線程用于
        不斷地循環接受程式,并且至少有一條線程用于處理接受的資料(否則的話有線程用于處理資料,随着時間的推移記憶體和磁盤都會
        不堪重負)
        2,對于叢集而言,每個Executor一般肯定不止一個線程,那對于處理SparkStreaming
        應用程式而言,每個Executor一般配置設定多少Core比較合适?根據我們過去的經驗,5個左右的Core是最佳的(一個段子配置設定為奇數個Core表現最佳,例如3個,5個,7個Core等)
*/      
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparStreamingOnKafkaReceiver");
//      SparkConf conf = new //SparkConf().setMaster("spark://Master:7077").setAppName("        //SparStreamingOnKafkaReceiver");
/*      第二步:建立SparkStreamingContext,
        1,這個是SparkStreaming應用春香所有功能的起始點和程式排程的核心
        SparkStreamingContext的建構可以基于SparkConf參數也可以基于持久化的SparkStreamingContext的内容
//      來恢複過來(典型的場景是Driver崩潰後重新啟動,由于SparkStreaming具有連續7*24
    小時不間斷運作的特征,是以需要Driver重新啟動後繼續上一次的狀态,此時的狀态恢複需要基于曾經的Checkpoint))
    2,在一個Sparkstreaming 應用程式中可以建立若幹個SparkStreaming對象,使用下一個SparkStreaming
        之前需要把前面正在運作的SparkStreamingContext對象關閉掉,由此,我們擷取一個重大的啟發
        我們獲得一個重大的啟發SparkStreaming也隻是SparkCore上的一個應用程式而已,隻不過SparkStreaming架構想運作的話需要
*/      spark工程師寫業務邏輯
        @SuppressWarnings("resource")
        JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds());

/*      第三步:建立SparkStreaming輸入資料來源input Stream
        1,資料輸入來源可以基于File,HDFS,Flume,Kafka-socket等
        2,在這裡我們指定資料來源于網絡Socket端口,SparkStreaming連接配接上該端口并在運作時候一直監聽
        該端口的資料(當然該端口服務首先必須存在,并且在後續會根據業務需要不斷地資料産生當然對于SparkStreaming
        應用程式的而言,有無資料其處理流程都是一樣的);
        3,如果經常在每個5秒鐘沒有資料的話不斷地啟動空的Job其實會造成排程資源的浪費,因為并沒有資料發生計算
        是以實際的企業級生成環境的代碼在具體送出Job前會判斷是否有資料,如果沒有的話就不再送出資料
    在本案例中具體參數含義:
        第一個參數是StreamingContext執行個體,
        第二個參數是zookeeper叢集資訊(接受Kafka資料的時候會從zookeeper中擷取Offset等中繼資料資訊)
        第三個參數是Consumer Group
*/      第四個參數是消費的Topic以及并發讀取Topic中Partition的線程數

        Map<String,Integer> topicConsumerConcurrency = new HashMap<String,Integer>();
        topicConsumerConcurrency.put("HelloKafakaFromSparkStreaming",);//這裡2個的話是指2個接受的線程

        JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jsc,
                "Master:2181,Worker1:2181,Worker2:2181",
                "MyFirstConsumerGrou",
                topicConsumerConcurrency);
    /*
     * 第四步:接下來就像對于RDD程式設計一樣,基于DStream進行程式設計!!!原因是Dstream是RDD産生的模闆(或者說類
     * ),在SparkStreaming發生計算前,其實質是把每個Batch的Dstream的操作翻譯成RDD的操作
     * 對初始的DTStream進行Transformation級别處理
     * */
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>,String>(){ //如果是Scala,由于SAM裝換,可以寫成val words = lines.flatMap{line => line.split(" ")}

            @Override
            public Iterable<String> call(Tuple2<String,String> tuple) throws Exception {

                return Arrays.asList(tuple._2.split(" "));//将其變成Iterable的子類
            }
        });
//      第四步:對初始DStream進行Transformation級别操作
        //在單詞拆分的基礎上對每個單詞進行執行個體計數為1,也就是word => (word ,1 )
        JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word,);
            }

        });
        //對每個單詞事例技術為1的基礎上對每個單詞在檔案中出現的總次數

         JavaPairDStream<String,Integer> wordsCount = pairs.reduceByKey(new Function2<Integer,Integer,Integer>(){

            /**
             * 
             */
            private static final long serialVersionUID = L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                // TODO Auto-generated method stub
                return v1 + v2;
            }
        });
        /*
         * 此處的print并不會直接出發Job的支援,因為現在一切都是在SparkStreaming的架構控制之下的
         * 對于spark而言具體是否觸發真正的JOb運作是基于設定的Duration時間間隔的
         * 諸位一定要注意的是Spark Streaming應用程式要想執行具體的Job,對DStream就必須有output Stream操作
         * output Stream有很多類型的函數觸發,類print,savaAsTextFile,scaAsHadoopFiles等
         * 其實最為重要的一個方法是foreachRDD,因為SparkStreaming處理的結果一般都會放在Redis,DB
         * DashBoard等上面,foreach主要就是用來完成這些功能的,而且可以自定義具體的資料放在哪裡!!!
         * */
         wordsCount.print();

//       SparkStreaming 執行引擎也就是Driver開始運作,Driver啟動的時候位于一條新線程中的,當然
//       其内部有消息接受應用程式本身或者Executor中的消息
         jsc.start();
         jsc.close();
    }


}
           

2、SparkStreaming on Kafka Receiver運作在叢集上的步驟及結果:

1,首先啟動zookeeper服務:


2,接下來啟動Kafka服務


3,在eclipse上觀察結果:
           

三:SparkStreaming on Kafka Receiver源碼解析

1,首先看一下KafkaUtils(包含zookeeper的配置等等):

Spark streaming基于kafka 以Receiver方式擷取資料 原理和案例實戰
Spark streaming基于kafka 以Receiver方式擷取資料 原理和案例實戰

2、在這裡建立了KafkaInputDStream:

Spark streaming基于kafka 以Receiver方式擷取資料 原理和案例實戰

3、這裡證明KafkaInputStream為consumer

Spark streaming基于kafka 以Receiver方式擷取資料 原理和案例實戰

4、在這裡擁有線程池(處理topic)

Spark streaming基于kafka 以Receiver方式擷取資料 原理和案例實戰

5,不同的接受方式(第一個為wal方式)

Spark streaming基于kafka 以Receiver方式擷取資料 原理和案例實戰

補充說明:

使用Spark Streaming可以處理各種資料來源類型,如:資料庫、HDFS,伺服器log日志、網絡流,其強大超越了你想象不到的場景,隻是很多時候大家不會用,其真正原因是對Spark、spark streaming本身不了解。

博文内容源自DT大資料夢工廠Spark課程。相關課程内容視訊可以參考:

百度網盤連結:http://pan.baidu.com/s/1slvODe1(如果連結失效或需要後續的更多資源,請聯系QQ460507491或者微信号:DT1219477246 擷取上述資料)。

繼續閱讀