1.下載下傳Kafka
2.啟動服務端
Kafka需要用到ZooKeepr,是以需要先啟動一個ZooKeepr服務端,如果沒有單獨的ZooKeeper服務端,可以使用Kafka自帶的腳本快速啟動一個單節點ZooKeepr執行個體
啟動Kafka服務端執行個體
3.建立一個Kafka Topic
建立一個名為test的topic,這個topic隻有一個分區和一個副本
建立以後就可以檢視了
另外,除了手動建立topics,可以配置Brokers自動建立topics
4.發送一些消息
Kafka帶有一個指令行工具kafka-console-producer.sh,可以從一個檔案或者标準輸入讀入資料然後以消息的方式發送到Kafka叢集。預設每行将被單獨作為消息發送。
5.啟動一個消費者
Kafka也自帶一個指令行工具用以消費Kafka叢集的消息
producer和consumer分别開兩個Linu終端,producer端輸入一些内容,就可以看到consumer端會實時顯示producer輸入的内容
6.建立一個multi-broker叢集
以上我們建立運作了單個Kafka broker,但是對于Kafka,單個broker隻是Kafka叢集的的一個成員,下面我們将擴充Kafka叢集到三個broker執行個體
主要修改幾個參數
broker.id
listeners
log.dirs
broker.id是一個Kafka broker執行個體在叢集中的唯一屬于,每個執行個體需要不同。
現在建立一個具有三個複制成員的topic
檢視叢集中每個broker的狀态
先列出所有分區的概要資訊,一個分區顯示一行
Leader 負責給定分區的所有讀寫操作,每個叢集節點會是所有分區中随機選取的分區的leader
Replicas 列出目前分區的複制節點,不管這些節點是否是Leader甚至是否目前存活
Isr in-sync複制集的子集,列出目前存活并且趕上leader的叢集節點
向新的topic發送一些消息
輸入一些消息
再開一個終端消費這些消息
可以看到輸入的消息很快就别消費掉了
現在測試搭建的Kafka Cluster的容錯能力
Leader:0 表示Broker 0 是leader,Broker 1和Broker 2是replica
現在我們把Broker 0 給kill 掉
再檢視叢集節點狀态
可以看到Leader已經切換到Broker 2了,Broker 0已經不在Isr這個子集中了,Replicas還是三個成員
再次執行消費者
可以看到之前生産者發送的消息依然可以被消費,雖然負責消息寫入的leader已經挂掉了
7.使用Kfaka Connect導入導出資料
從Linux終端讀入資料然後将資料輸出到終端友善了解Kafka,接下來使用Kafka從其他資料來源導入資料,然後導出資料到其他系統中去。對于很多系統來說,可以使用Kafka Connect來導入導出資料來取代重新編寫一些自定義代碼。Kafka Connect是一個Kafka自帶的工具用于導入導出資料。
先向一個檔案中寫入一些資料
然後,啟動兩個Kafka Connector以standalone模式運作
connect-standalone.properties
connect-file-source.properties
connect-file-sink.properties
一旦啟動Kafka Connect程序,source connector會從test.txt中讀入資料,然後将這些資料推送給connect-test這個topic,sink connector會從這個topic讀出資料并寫入資料到test.sink.txt檔案
消息是存儲在connect-test這個topic中的
connector的終端先不要斷開,繼續向test.txt檔案中追加幾行,看看console-consumer是否會有重新整理
可以看到test.sink.txt這個檔案也多了一行
8.Use Kafka Streams to process data
Kafka Streams是Kafka用于實時流資料處理和分析Kafka brokers中存儲的資料的用戶端庫。
輸出
參考文檔:
http://kafka.apache.org/documentation.html#quickstart