1、這裡簡單記錄一下 kafka的簡單操作指令
建立Topic
$ bin/kafka-topics.sh --create --topic make2 --zookeeper make.spark.com:2181/kafka_10 --replication-factor 3 --partitions 3 --config max.message.bytes=12800000 --config flush.messages=1 --config segment.bytes=10240--partitions 5 --replication-factor 3
檢視目前kafka叢集中Topic的情況
$ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --list
檢視Topic的詳細資訊
$ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --describe --topic make2
修改Topic資訊
$ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --alter --topic make1 --config max.message.bytes=128000
$ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --alter --topic make1 --delete-config max.message.bytes
$ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --alter --topic make1 --partitions 10
$ bin/kafka-topics.sh --zookeeper make.spark.com:2181/kafka_10 --alter --topic make1 --partitions 3 ## 分區數量隻允許增加,不允許減少
删除Topic(簡單的删除,隻是标記删除)
$bin/kafka-topics.sh --delete --topic make1 --zookeeper make.spark.com:2181/kafka_10
## Note: This will have no impact if delete.topic.enable is not set to true.## 預設情況下,删除是标記删除,沒有實際删除這個Topic;如果運作删除Topic,兩種方式:
方式一:通過delete指令删除後,手動将本地磁盤以及zk上的相關topic的資訊删除即可 ls /kafka/brokers/topics
方式二:配置server.properties檔案,給定參數delete.topic.enable=true,表示允許進行Topic的删除
注意:一般來說,topic建立了之後就不要随意的删除和修改資訊
測試Kafka叢集的消息傳遞功能
1. 啟動服務
2. 啟動資料生産者
$ bin/kafka-console-producer.sh --broker-list make.spark.com:9092,make.spark.com:9093,make.spark.com:9094 --topic make1
3. 啟動資料消費者
$ bin/kafka-console-consumer.sh --topic make1 --zookeeper make.spark.com:2181/kafka_10
## 不接收consumer啟動前kafka中的資料
$ bin/kafka-console-consumer.sh --topic make1--zookeeper make.spark.com:2181/kafka_10
## 從頭開始接收kafka的資料(全部都接收)
$ bin/kafka-console-consumer.sh --topic make1--zookeeper make.spark.com:2181/kafka_10 --from-beginning
2、這裡為我建立createDirectStream的具體代碼和測試
首先我們建立一個object -> PropertiesUtil_ka_str 放在我們的工具類下面 代碼如下
package Utils
import java.util.Properties
/**
* Properties的工具類
*
* Created by make on 2018-08-07 23:30
*/
object PropertiesUtil_ka_str {
/**
*
* 擷取配置檔案Properties對象
*
* @author make
* @return java.util.Properties
*/
def getProperties() :Properties = {
val properties = new Properties()
//讀取源碼中resource檔案夾下的ka_str.properties配置檔案
val reader = getClass.getResourceAsStream("/ka_str.properties")
properties.load(reader)
properties
}
/**
*
* 擷取配置檔案中key對應的字元串值
*
* @author make
* @return java.util.Properties
*/
def getPropString(key : String) : String = {
getProperties().getProperty(key)
}
/**
*
* 擷取配置檔案中key對應的整數值
*
* @author make
* @return java.util.Properties
*/
def getPropInt(key : String) : Int = {
getProperties().getProperty(key).toInt
}
/**
*
* 擷取配置檔案中key對應的布爾值
*
* @author make
* @return java.util.Properties
*/
def getPropBoolean(key : String) : Boolean = {
getProperties().getProperty(key).toBoolean
}
}
我們的配置檔案ka_str.properties如下
# kafka configs
kafka.bootstrap.servers=make.spark.com:9092,make.spark.com:9093,make.spark.com:9094
kafka.topic.source=kafka_stream_01
#kafka.topic.sink=spark-sink-test
kafka.group.id=kafka_stream
然後為我們的 kafka_stream.scala的代碼 如下
package spark_stream
import Utils.{PropertiesUtil, PropertiesUtil_ka_str, SparkUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.LocationStrategies._
object kafka_stream {
def main(args: Array[String]): Unit = {
val sc =SparkUtil.createSparkContext(true,"kafka_stream")
val ssc = new StreamingContext(sc,Seconds(5))
//設定相關參數
val topics: String = PropertiesUtil_ka_str.getPropString("kafka.topic.source")
val brokers = PropertiesUtil_ka_str.getPropString("kafka.bootstrap.servers")
//具體寫法參照 http://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
val topicarr = topics.split(",")
val kafkaParams: Map[String, Object] = Map[String,Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> PropertiesUtil.getPropString("kafka.group.id"),
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val kafka_streamDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
//Subscribe的構造函數為三個參數,但是可以省略offsets 源碼可以看到
Subscribe[String,String](topicarr,kafkaParams))
//最後的格式為((offset,partition,value),1),這樣的資料類型
// 可以看到每條資料的偏移量和所在的分區
val resDStream: DStream[((Long, Int, String), Int)] = kafka_streamDStream.map(line =>
(line.offset(), line.partition(), line.value())).flatMap(t =>{
t._3.split(" ").map(word => (t._1,t._2,word))
})
.map(k => ((k._1,k._2,k._3),1))
.reduceByKey(_ + _)
resDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
3、我們在kafka 上建立一個和我們配置檔案相同的 topic 以及對應的 生産者 如下
bin/kafka-topics.sh --create --topic kafka_stream_01 --zookeeper make.spark.com:2181/kafka_10 --replication-factor 2 --partitions 3
bin/kafka-console-producer.sh --broker-list make.spark.com:9092,make.spark.com:9093,make.spark.com:9094 --topic kafka_stream_01
然後啟動我們的程式,可以看到我們的,實時處理的資料結果,達到了我們要求,可以看到偏移量,以及所在的分區

以上,初識kafka