天天看點

spark2.2.0 kafka 0.10.2.1的createDirectStream第一次嘗試

1、這裡簡單記錄一下 kafka的簡單操作指令

     建立Topic

     $ bin/kafka-topics.sh --create --topic make2 --zookeeper make.spark.com:2181/kafka_10 --replication-factor 3 --partitions 3 --config max.message.bytes=12800000 --config flush.messages=1 --config segment.bytes=10240--partitions 5 --replication-factor 3 

     檢視目前kafka叢集中Topic的情況

     $ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --list

     檢視Topic的詳細資訊

     $ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --describe --topic make2

     修改Topic資訊

    $ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --alter --topic make1 --config max.message.bytes=128000

     $ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --alter --topic make1 --delete-config max.message.bytes

     $ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --alter --topic make1 --partitions 10 

    $ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --alter --topic make1 --partitions 3 ## 分區數量隻允許增加,不允許減少

     删除Topic(簡單的删除,隻是标記删除)

    $bin/kafka-topics.sh --delete --topic make1 --zookeeper make.spark.com:2181/kafka_10

    ## Note: This will have no impact if delete.topic.enable is not set to true.## 預設情況下,删除是标記删除,沒有實際删除這個Topic;如果運作删除Topic,兩種方式:

方式一:通過delete指令删除後,手動将本地磁盤以及zk上的相關topic的資訊删除即可  ls /kafka/brokers/topics

方式二:配置server.properties檔案,給定參數delete.topic.enable=true,表示允許進行Topic的删除

注意:一般來說,topic建立了之後就不要随意的删除和修改資訊

測試Kafka叢集的消息傳遞功能

1. 啟動服務

2. 啟動資料生産者

$ bin/kafka-console-producer.sh --broker-list make.spark.com:9092,make.spark.com:9093,make.spark.com:9094 --topic make1

3. 啟動資料消費者

$ bin/kafka-console-consumer.sh --topic make1 --zookeeper make.spark.com:2181/kafka_10

## 不接收consumer啟動前kafka中的資料

$ bin/kafka-console-consumer.sh --topic make1--zookeeper make.spark.com:2181/kafka_10

## 從頭開始接收kafka的資料(全部都接收)

$ bin/kafka-console-consumer.sh --topic make1--zookeeper make.spark.com:2181/kafka_10 --from-beginning 

2、這裡為我建立createDirectStream的具體代碼和測試

首先我們建立一個object -> PropertiesUtil_ka_str  放在我們的工具類下面 代碼如下

package Utils

import java.util.Properties

/**
  * Properties的工具類
  *
  * Created by make on 2018-08-07 23:30
  */
object PropertiesUtil_ka_str {

  /**
    *
    * 擷取配置檔案Properties對象
    *
    * @author make 
    * @return java.util.Properties
    */
  def getProperties() :Properties = {
    val properties = new Properties()
    //讀取源碼中resource檔案夾下的ka_str.properties配置檔案
    val reader = getClass.getResourceAsStream("/ka_str.properties")
    properties.load(reader)
    properties
  }

  /**
    *
    * 擷取配置檔案中key對應的字元串值
    *
    * @author make 
    * @return java.util.Properties
    */
  def getPropString(key : String) : String = {
    getProperties().getProperty(key)
  }

  /**
    *
    * 擷取配置檔案中key對應的整數值
    *
    * @author make 
    * @return java.util.Properties
    */
  def getPropInt(key : String) : Int = {
    getProperties().getProperty(key).toInt
  }

  /**
    *
    * 擷取配置檔案中key對應的布爾值
    *
    * @author make 
    * @return java.util.Properties
    */
  def getPropBoolean(key : String) : Boolean = {
    getProperties().getProperty(key).toBoolean
  }

}
           

我們的配置檔案ka_str.properties如下

# kafka configs
kafka.bootstrap.servers=make.spark.com:9092,make.spark.com:9093,make.spark.com:9094
kafka.topic.source=kafka_stream_01
#kafka.topic.sink=spark-sink-test
kafka.group.id=kafka_stream
           

然後為我們的 kafka_stream.scala的代碼 如下

package spark_stream

import Utils.{PropertiesUtil, PropertiesUtil_ka_str, SparkUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.LocationStrategies._

object kafka_stream {
  def main(args: Array[String]): Unit = {
    val sc =SparkUtil.createSparkContext(true,"kafka_stream")

    val ssc = new StreamingContext(sc,Seconds(5))


    //設定相關參數

   val  topics: String = PropertiesUtil_ka_str.getPropString("kafka.topic.source")
   val brokers = PropertiesUtil_ka_str.getPropString("kafka.bootstrap.servers")

    //具體寫法參照 http://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
    val topicarr = topics.split(",")
    val kafkaParams: Map[String, Object] = Map[String,Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> PropertiesUtil.getPropString("kafka.group.id"),
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    
    val kafka_streamDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      //Subscribe的構造函數為三個參數,但是可以省略offsets 源碼可以看到
      Subscribe[String,String](topicarr,kafkaParams))
    //最後的格式為((offset,partition,value),1),這樣的資料類型 
    // 可以看到每條資料的偏移量和所在的分區
    val resDStream: DStream[((Long, Int, String), Int)] = kafka_streamDStream.map(line =>
      (line.offset(), line.partition(), line.value())).flatMap(t =>{
        t._3.split(" ").map(word => (t._1,t._2,word))
    })
      .map(k => ((k._1,k._2,k._3),1))
      .reduceByKey(_ + _)

    resDStream.print()

    ssc.start()
    ssc.awaitTermination()

  }


}
           

3、我們在kafka 上建立一個和我們配置檔案相同的 topic 以及對應的 生産者   如下

bin/kafka-topics.sh --create --topic kafka_stream_01 --zookeeper make.spark.com:2181/kafka_10 --replication-factor 2 --partitions 3

bin/kafka-console-producer.sh --broker-list make.spark.com:9092,make.spark.com:9093,make.spark.com:9094 --topic kafka_stream_01

然後啟動我們的程式,可以看到我們的,實時處理的資料結果,達到了我們要求,可以看到偏移量,以及所在的分區

spark2.2.0 kafka 0.10.2.1的createDirectStream第一次嘗試

以上,初識kafka

繼續閱讀