天天看點

flume+kafka+spark streaming(持續更新)

kafka

kafka中文教程

Kafka是一種高吞吐量的分布式釋出訂閱消息系統,它可以處理消費者規模的網站中的所有動作流資料。 kafka的設計初衷是希望作為一個統一的資訊收集平台,能夠實時的收集回報資訊,并需要能夠支撐較大的資料量,且具備良好的容錯能力.

Apache kafka是消息中間件的一種。

一 、術語介紹

Broker

Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker。

Topic

每條釋出到Kafka叢集的消息都有一個類别,這個類别被稱為Topic。(實體上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然儲存于一個或多個broker上但使用者隻需指定消息的Topic即可生産或消費資料而不必關心資料存于何處)。每個topic都具有這兩種模式:(隊列:消費者組(consumer group)允許同名的消費者組成員瓜分處理;釋出訂閱:允許你廣播消息給多個消費者組(不同名))。

Partition

Partition是實體上的概念,每個Topic包含一個或多個Partition.

Producer

負責釋出消息到Kafka broker,比如flume采集機就是Producer。

Consumer

消息消費者,向Kafka broker讀取消息的用戶端。比如Hadoop機器就是Consumer。

Consumer Group

每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于預設的group)。

二、使用場景

1、Messaging

對于一些正常的消息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴充性和性能優勢.不過到目前為止,我們應該很清楚認識到,kafka并沒有提供JMS中的”事務性”“消息傳輸擔保(消息确認機制)”“消息分組”等企業級特性;kafka隻能使用作為”正常”的消息系統,在一定程度上,尚未確定消息的發送與接收絕對可靠(比如,消息重發,消息發送丢失等)

2、Websit activity tracking

kafka可以作為”網站活性跟蹤”的最佳工具;可以将網頁/使用者操作等資訊發送到kafka中.并實時監控,或者離線統計分析等

3、Log Aggregation

kafka的特性決定它非常适合作為”日志收集中心”;application可以将記錄檔”批量”“異步”的發送到kafka叢集中,而不是儲存在本地或者DB中;kafka可以批量送出消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統化的存儲和分析系統.

4、它應用于2大類應用

建構實時的流資料管道,可靠地擷取系統和應用程式之間的資料。

建構實時流的應用程式,對資料流進行轉換或反應。

三、分布式

Log的分區被分布到叢集中的多個伺服器上。每個伺服器處理它分到的分區。 根據配置每個分區還可以複制到其它伺服器作為備份容錯。 每個分區有一個leader,零或多個follower。Leader處理此分區的所有的讀寫請求,而follower被動的複制資料。如果leader當機,其它的一個follower會被推舉為新的leader。 一台伺服器可能同時是一個分區的leader,另一個分區的follower。 這樣可以平衡負載,避免所有的請求都隻讓一台或者某幾台伺服器處理。

四、消息處理順序

Kafka保證消息的順序不變。 在這一點上Kafka做的更好,盡管并沒有完全解決上述問題。 Kafka采用了一種分而治之的政策:分區。 因為Topic分區中消息隻能由消費者組中的唯一一個消費者處理,是以消息肯定是按照先後順序進行處理的。但是它也僅僅是保證Topic的一個分區順序處理,不能保證跨分區的消息先後處理順序。 是以,如果你想要順序的處理Topic的所有消息,那就隻提供一個分區。

五、安裝

kafka安裝和啟動

六、Key和Value

Kafka是一個分布式消息系統,Producer生産消息并推送(Push)給Broker,然後Consumer再從Broker那裡取走(Pull)消息。Producer生産的消息就是由Message來表示的,對使用者來講,它就是鍵-值對。

kafka會根據傳進來的key計算其分區,但key可以不傳,可以為null,空的話,producer會把這條消息随機的發送給一個partition。

flume+kafka+spark streaming(持續更新)

MessageSet用來組合多條Message,它在每條Message的基礎上加上了Offset和MessageSize,其結構是:

MessageSet => [Offset MessageSize Message]
           

它的含義是MessageSet是個數組,數組的每個元素由三部分組成,分别是Offset,MessageSize和Message,它們的含義分别是:

flume+kafka+spark streaming(持續更新)

七、小例子

1.啟動ZooKeeper

進入kafka目錄,加上daemon表示在背景啟動,不占用目前的指令行視窗。

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

如果要關閉,下面這個

bin/zookeeper-server-stop.sh

ZooKeeper 的端口号是2181,輸入jps檢視程序号是QuorumPeerMain

2.啟動kafka

在server.properties中加入,第一個是保證你删topic可以删掉,第二個不然的話就報topic找不到的錯誤:

delete.topic.enable=true

listeners=PLAINTEXT://localhost:9092

然後:

bin/kafka-server-start.sh -daemon config/server.properties

如果要關閉,下面這個

bin/kafka-server-stop.sh

Kafka的端口号是9092,輸入jps檢視程序号是Kafka

3.建立一個主題(topic)

建立一個名為“test”的Topic,隻有一個分區和一個備份:

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

建立好之後,可以通過運作以下指令,檢視已建立了哪些topic:

bin/kafka-topics.sh –list –zookeeper localhost:2181

檢視具體topic的資訊:

bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test

4.發送消息

啟動kafka生産者:

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

5.接收消息

新開一個指令行視窗,啟動kafka消費者:

bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning

6.最後

在producer視窗中輸入消息,可以在consumer視窗中顯示:

flume+kafka+spark streaming(持續更新)
flume+kafka+spark streaming(持續更新)

spark streaming

spark中文學習指南

Spark Streaming是一種建構在Spark上的實時計算架構,它擴充了Spark處理大規模流式資料的能力。

Spark Streaming的優勢在于:

能運作在100+的結點上,并達到秒級延遲。

使用基于記憶體的Spark作為執行引擎,具有高效和容錯的特性。

能內建Spark的批處理和互動查詢。

為實作複雜的算法提供和批處理類似的簡單接口。

首先,Spark Streaming把實時輸入資料流以時間片Δt (如1秒)為機關切分成塊。Spark Streaming會把每塊資料作為一個RDD,并使用RDD操作處理每一小塊資料。每個塊都會生成一個Spark Job處理,最終結果也傳回多塊。

在Spark Streaming中,則通過操作DStream(表示資料流的RDD序列)提供的接口,這些接口和RDD提供的接口類似。

正如Spark Streaming最初的目标一樣,它通過豐富的API和基于記憶體的高速計算引擎讓使用者可以結合流式處理,批處理和互動查詢等應用。是以Spark Streaming适合一些需要曆史資料和實時資料結合分析的應用場合。當然,對于實時性要求不是特别高的應用也能完全勝任。另外通過RDD的資料重用機制可以得到更高效的容錯處理。

當一個上下文(context)定義之後,你必須按照以下幾步進行操作:

定義輸入源;

準備好流計算指令;

利用streamingContext.start()方法接收和處理資料;

處理過程将一直持續,直到streamingContext.stop()方法被調用。

可以利用已經存在的SparkContext對象建立StreamingContext對象:

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds())
           

視窗函數

對于spark streaming中的視窗函數,參見:

視窗函數解釋

對非(K,V)形式的RDD 視窗化reduce:

1.reduceByWindow(reduceFunc, windowDuration, slideDuration)

2.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)

對(K,V)形式RDD 按Key視窗化reduce:

1.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)

2.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc)

從效率來說,應選擇帶有invReduceFunc的方法。

可以通過在多個RDD或者批資料間重用連接配接對象做更進一步的優化。開發者可以保有一個靜态的連接配接對象池,重複使用池中的對象将多批次的RDD推送到外部系統,以進一步節省開支:

dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          // ConnectionPool is a static, lazily initialized pool of connections
          val connection = ConnectionPool.getConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      })
  })
           

spark執行時間是少了,但資料庫壓力比較大,會一直占資源。

小例子:

package SparkStreaming

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._

object Spark_streaming_Test {
  def main(args: Array[String]): Unit = {
    //local[2]表示在本地建立2個working線程
    //當運作在本地,如果你的master URL被設定成了“local”,這樣就隻有一個核運作任務。這對程式來說是不足的,因為作為receiver的輸入DStream将會占用這個核,這樣就沒有剩餘的核來處理資料了。
    //是以至少得2個核,也就是local[2]
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    //時間間隔是1秒
    val ssc = new StreamingContext(conf, Seconds())
    //有滑動視窗時,必須有checkpoint
    ssc.checkpoint("F:\\checkpoint")
    //DStream是一個基類
    //ssc.socketTextStream() 将建立一個 SocketInputDStream;這個 InputDStream 的 SocketReceiver 将監聽伺服器 9999 端口
    //ssc.socketTextStream()将 new 出來一個 DStream 具體子類 SocketInputDStream 的執行個體。
    val lines = ssc.socketTextStream("192.168.1.66", , StorageLevel.MEMORY_AND_DISK_SER)
    //    val lines = ssc.textFileStream("F:\\scv")
    val words = lines.flatMap(_.split(" ")) // DStream transformation
    val pairs = words.map(word => (word, )) // DStream transformation
    //    val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation
    //每隔3秒鐘,計算過去5秒的詞頻,顯然一次計算的内容與上次是有重複的。如果不想重複,把2個時間設為一樣就行了。
    //    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(5), Seconds(3))
    val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, _ - _, Seconds(), Seconds())
    windowedWordCounts.filter(x => x._2 != ).print()
    //    wordCounts.print() // DStream output,列印每秒計算的詞頻
    //需要注意的是,當以上這些代碼被執行時,Spark Streaming僅僅準備好了它要執行的計算,實際上并沒有真正開始執行。在這些轉換操作準備好之後,要真正執行計算,需要調用如下的方法
    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    //在StreamingContext上調用stop()方法,也會關閉SparkContext對象。如果隻想僅關閉StreamingContext對象,設定stop()的可選參數為false
    //一個SparkContext對象可以重複利用去建立多個StreamingContext對象,前提條件是前面的StreamingContext在後面StreamingContext建立之前關閉(不關閉SparkContext)
    ssc.stop()
  }
}
           

1.啟動

start-dfs.sh

start-yarn.sh

flume+kafka+spark streaming(持續更新)

2.終端輸入:

nc -lk 9999

然後在IEDA中運作spark程式。由于9999端口中還沒有寫東西,是以運作是下圖:

flume+kafka+spark streaming(持續更新)

隻有時間,沒有列印出東西。然後在終端輸入下面的東西,也可以從其他地方複制進來。

hello world

hello hadoop

hadoop love

love cat

cat love rabbit

這時,IDEA的控制台就輸出下面的東西。

flume+kafka+spark streaming(持續更新)

3.下面運作帶時間視窗的,注意如果加了時間視窗就必須有checkpoint

輸入下面的,不要一次全輸入,一次輸個幾行。

checkpoint

hello world

hello hadoop

hadoop love

love cat

cat love rabbit

ni hao a

hello world

hello hadoop

hadoop love

love cat

cat love rabbit

hello world

hello hadoop

hadoop love

love cat

cat love rabbit

先是++–的那種:

flume+kafka+spark streaming(持續更新)
flume+kafka+spark streaming(持續更新)
flume+kafka+spark streaming(持續更新)
flume+kafka+spark streaming(持續更新)

再然後是不++–的那種:

flume+kafka+spark streaming(持續更新)
flume+kafka+spark streaming(持續更新)

++–的那種是因為把過去的RDD也帶進來計算了,是以出現了0這個情況,為了避免這種情況隻能在列印前過濾掉0的再列印。而沒有++–的那種情況是不需要這樣做的。

Checkpointing

在容錯、可靠的檔案系統(HDFS、s3等)中設定一個目錄用于儲存checkpoint資訊。就可以通過streamingContext.checkpoint(checkpointDirectory)方法來做。

預設的間隔時間是批間隔時間的倍數,最少10秒。它可以通過dstream.checkpoint來設定。需要注意的是,随着 streaming application 的持續運作,checkpoint 資料占用的存儲空間會不斷變大。是以,需要小心設定checkpoint 的時間間隔。設定得越小,checkpoint 次數會越多,占用空間會越大;如果設定越大,會導緻恢複時丢失的資料和進度越多。一般推薦設定為 batch duration 的5~10倍。

package streaming

import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Administrator on 2017/3/12.
  */

object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {
    println("Creating new context") //如果沒有出現這句話,說明StreamingContext是從checkpoint裡面加載的
    val outputFile = new File(outputPath) //輸出檔案的目錄
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds()) //時間間隔是1秒
    ssc.checkpoint(checkpointDirectory) //設定一個目錄用于儲存checkpoint資訊

    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, )).reduceByKey(_ + _)
    val windowedWordCounts = wordCounts.reduceByKeyAndWindow(_ + _, _ - _, Seconds(), Seconds())
    windowedWordCounts.checkpoint(Seconds())//一般推薦設定為 batch duration 的5~10倍,即StreamingContext的第二個參數的5~10倍
    windowedWordCounts.print()
    Files.append(windowedWordCounts + "\n", outputFile, Charset.defaultCharset())
    ssc
  }

  def main(args: Array[String]): Unit = {
    if (args.length != ) {
      System.exit()
    }
    val ip = args()
    val port = args().toInt
    val checkpointDirectory = args()
    val outputPath = args()
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createContext(ip, port, outputPath, checkpointDirectory))
    ssc.start()
    ssc.awaitTermination()
  }
}
           

優化

1.資料接收的并行水準

建立多個輸入DStream并配置它們可以從源中接收不同分區的資料流,進而實作多資料流接收。是以允許資料并行接收,提高整體的吞吐量。

val numStreams = 
val kafkaStreams = ( to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
           

多輸入流或者多receiver的可選的方法是明确地重新配置設定輸入資料流(利用inputStream.repartition()),在進一步操作之前,通過叢集的機器數配置設定接收的批資料。

2.任務序列化

運作kyro序列化任何可以減小任務的大小,進而減小任務發送到slave的時間。

val conf = new SparkConf().setAppName("analyse_domain_day").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
           

3.設定合适的批間隔時間(即批資料的容量)

批處理時間應該小于批間隔時間。如果時間間隔是1秒,但處理需要2秒,則處理趕不上接收,待處理的資料會越來越多,最後就嘣了。

找出正确的批容量的一個好的辦法是用一個保守的批間隔時間(5-10,秒)和低資料速率來測試你的應用程式。為了驗證你的系統是否能滿足資料處理速率,你可以通過檢查端到端的延遲值來判斷(可以在Spark驅動程式的log4j日志中檢視”Total delay”或者利用StreamingListener接口)。如果延遲維持穩定,那麼系統是穩定的。如果延遲持續增長,那麼系統無法跟上資料處理速率,是不穩定的。你能夠嘗試着增加資料處理速率或者減少批容量來作進一步的測試。

DEMO

spark流操作kafka有兩種方式:

一種是利用接收器(receiver)和kafaka的高層API實作。

一種是不利用接收器,直接用kafka底層的API來實作(spark1.3以後引入)。

相比基于Receiver方式有幾個優點:

1、不需要建立多個kafka輸入流,然後Union他們,而使用DirectStream,spark Streaming将會建立和kafka分區一樣的RDD的分區數,而且會從kafka并行讀取資料,Spark的分區數和Kafka的分區數是一一對應的關系。

2、第一種實作資料的零丢失是将資料預先儲存在WAL中,會複制一遍資料,會導緻資料被拷貝兩次:一次是被Kafka複制;另一次是寫入到WAL中。

Direct的方式是會直接操作kafka底層的中繼資料資訊,這樣如果計算失敗了,可以把資料重新讀一下,重新處理。即資料一定會被處理。拉資料,是RDD在執行的時候直接去拉資料。

3、Receiver方式讀取kafka,使用的是高層API将偏移量寫入ZK中,雖然這種方法可以通過資料儲存在WAL中保證資料的不對,但是可能會因為sparkStreaming和ZK中儲存的偏移量不一緻而導緻資料被消費了多次。

第二種方式不采用ZK儲存偏移量,消除了兩者的不一緻,保證每個記錄隻被Spark Streaming操作一次,即使是在處理失敗的情況下。如果想更新ZK中的偏移量資料,需要自己寫代碼來實作。

由于直接操作的是kafka,kafka就相當于你底層的檔案系統。這個時候能保證嚴格的事務一緻性,即一定會被處理,而且隻會被處理一次。

首先去maven的官網上下載下傳jar包

spark-streaming_2.10-1.6.2.jar

spark-streaming-kafka_2.10-1.6.2.jar

我的Scala是2.10的,spark是1.6.0的,下載下傳的spark.streaming和kafka版本要與之對應,spark-streaming_2.10-1.6.2.jar中2.10是Scala版本号,1.6.2是spark版本号。當然下載下傳1.6.1也行。

需要添加 kafka-clients-0.8.2.1.jar以及kafka_2.10-0.8.2.1.jar

這裡的2.10是Scala版本号,0.8.2.1是kafka的版本号。就下這個版本,别的版本不對應,會出錯。

在kafka的配置檔案裡面:

delete.topic.enable=true

host.name=192.168.1.66

zookeeper.connect=192.168.1.66:2181

我這裡寫主機名的話,各種報錯,是以幹脆就寫IP位址了。

啟動kafka以及ZK的步驟和kafka 1-2是一樣的。

進入/kafka_2.10-0.8.2.1 建立一個主題:

bin/kafka-topics.sh –create –zookeeper 192.168.1.66:2181 –replication-factor 1 –partitions 1 –topic test

啟動一個生産者:

bin/kafka-console-producer.sh –broker-list 192.168.1.66:9092 –topic test

在自己的電腦上運作spark程式後,在指令行輸入:

flume+kafka+spark streaming(持續更新)

在控制台會顯示:

flume+kafka+spark streaming(持續更新)
package SparkStreaming

//TopicAndPartition是對 topic和partition的id的封裝的一個樣例類
import kafka.common.TopicAndPartition
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import kafka.serializer.StringDecoder

object SparkStreaming_Kafka_Test {

  val kafkaParams = Map(
    //kafka broker的IP加端口号,這個是必須的
    "metadata.broker.list" -> "192.168.1.66:9092",
    // "group.id" -> "group1",
    /*此配置參數表示當此groupId下的消費者,
     在ZK中沒有offset值時(比如新的groupId,或者是zk資料被清空),
     consumer應該從哪個offset開始消費.largest表示接受接收最大的offset(即最新消息),
     smallest表示最小offset,即從topic的開始位置消費所有消息.*/
    "auto.offset.reset" -> "smallest"
  )

  val topicsSet = Set("test")

  //  val zkClient = new ZkClient("xxx:2181,xxx:2181,xxx:2181",Integer.MAX_VALUE,,ZKStringSerializer)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming_Kafka_Test")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds())
    ssc.checkpoint("F:\\checkpoint")
    /*
    KafkaUtils.createDirectStream[
       [key的資料類型], [value的資料類型], [key解碼的類], [value解碼的類] ](
       streamingContext, [Kafka配置的參數,是一個map], [topics的集合,是一個set])
       */
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2) //取value
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, )).reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

           

偏移量僅僅被ssc儲存在checkpoint中,消除了zk和ssc偏移量不一緻的問題。是以說checkpoint就已經可以保證容錯性了。

如果需要把偏移量寫入ZK,首先在工程中建立一個包:org.apache.spark.streaming.kafka,然後建一個KafkaCluster類:

package org.apache.spark.streaming.kafka

import kafka.api.OffsetCommitRequest
import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal

class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
  type Err = ArrayBuffer[Throwable]

  @transient private var _config: SimpleConsumerConfig = null

  def config: SimpleConsumerConfig = this.synchronized {
    if (_config == null) {
      _config = SimpleConsumerConfig(kafkaParams)
    }
    _config
  }

  def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    val meta = offsets.map {
      kv => kv._1 -> OffsetAndMetadata(kv._2)
    }
    setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
  }

  def setConsumerOffsetMetadata(groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    var result = Map[TopicAndPartition, Short]()
    val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
    val errs = new Err
    val topicAndPartitions = metadata.keySet
    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
      val resp = consumer.commitOffsets(req)
      val respMap = resp.commitStatus
      val needed = topicAndPartitions.diff(result.keySet)
      needed.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { err: Short =>
          if (err == ErrorMapping.NoError) {
            result += tp -> err
          } else {
            errs.append(ErrorMapping.exceptionFor(err))
          }
        }
      }
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
      }
    }
    val missing = topicAndPartitions.diff(result.keySet)
    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
    Left(errs)
  }

  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)(fn: SimpleConsumer => Any): Unit = {
    brokers.foreach { hp =>
      var consumer: SimpleConsumer = null
      try {
        consumer = connect(hp._1, hp._2)
        fn(consumer)
      } catch {
        case NonFatal(e) =>
          errs.append(e)
      } finally {
        if (consumer != null) {
          consumer.close()
        }
      }
    }
  }

  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)
}
           

然後在主函數中:

// 手動更新ZK偏移量,使得基于ZK偏移量的kafka監控工具可以使用
    messages.foreachRDD(rdd => {
      // 先處理消息
      val lines = rdd.map(_._2) //取value
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, )).reduceByKey(_ + _)
      wordCounts.foreach(println)
      // 再更新offsets
      //spark内部維護kafka偏移量資訊是存儲在HasOffsetRanges類的offsetRanges中
      //OffsetRange 包含資訊有:topic名字,分區Id,開始偏移,結束偏移。
      val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到該 rdd 對應 kafka 的消息的 offset
      val kc = new KafkaCluster(kafkaParams)
      for (offsets <- offsetsList) {
        val topicAndPartition = TopicAndPartition("test", offsets.partition)
        val o = kc.setConsumerOffsets("group1", Map((topicAndPartition, offsets.untilOffset)),)
        if (o.isLeft) {
          println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
        }
      }
    })
           

下面是用kafka的API自己寫一個程式讀取檔案,作為kafka的生産者,需要将Scala和kafka的所有的jar包都導入,lib檔案夾下面的都導入進去。

如果沒有2台電腦,可以開2個開發環境,IDEA作為消費者,eclipse作為生産者。

生産者代碼如下:

package spark_streaming_kafka_test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MakeRealtimeDate extends Thread {

    private Producer<Integer, String> producer;

    public MakeRealtimeDate() {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zk.connect", "192.168.1.66:2181");
        props.put("metadata.broker.list", "192.168.1.66:9092");
        ProducerConfig pc = new ProducerConfig(props);
        producer = new Producer<Integer, String>(pc);
    }

    public void run() {
        while (true) {
            File file = new File("C:\\Users\\Administrator\\Desktop\\wordcount.txt");
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new FileReader(file));
            } catch (FileNotFoundException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            String lineTxt = null;
            try {
                while ((lineTxt = reader.readLine()) != null) {
                    System.out.println(lineTxt);
                    producer.send(new KeyedMessage<Integer, String>("test", lineTxt));
                    try {
                        Thread.sleep();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {
        new MakeRealtimeDate().start();
    }

}
           

先啟動之前寫的sparkstreaming消費者統計單詞個數的程式,然後再啟動我們現在寫的這個生産者程式,最後就會在IDEA的控制台中看到實時結果。

繼續閱讀