一、概述
1.1、簡介
Kafka最初是由LinkedIn公司開發,是一個分布式、分區的、多副本的、多訂閱者,基于zookeeper協調的分布式日志系統。Apache Kafka也是一個開源消息系統,由Scala寫成;Kafka是一個分布式消息隊列。Kafka對消息儲存是根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka叢集有多個kafka執行個體組成,每個執行個體(server)稱為broker。無論是kafka叢集,還是consumer都依賴于zookeeper叢集儲存一些meta(生産者,消費者,broker,topic等等)資訊,來保證系統可用性。kafka主要用做存儲系統與消息系統。
Kafka主要設計目标如下:
• 以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上資料也能保證常數時間的通路性能。
• 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條消息的傳輸。
• 支援Kafka Server間的消息分區,及分布式消費,同時保證每個partition内的消息順序傳輸。
• 同時支援離線資料處理和實時資料處理。
• Scale out:支援線上水準擴充
(引自:https://baike.baidu.com/item/Kafka/17930165?fr=aladdin)
1.2、消息隊列簡介
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISPrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdsATOfd3bkFGazxCMx8VesATMfhHLlN3XnxCMwEzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5SZxUjNzEDMwEmZ3MTMjFGNmJGMmhTYxQ2MwQDMmJmYk9CX2EzLcRDMwIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjL0M3Lc9CX6MHc0RHaiojIsJye.png)
消息隊列的兩種模式
1)點對點模式(一對一,消費者主動拉取資料,消息收到後消息清除)
點對點模型通常是一個基于拉取或者輪詢的消息傳送模型,這種模型從隊列中請求資訊,而不是将消息推送到用戶端。這個模型的特點是發送到隊列的消息被一個且隻有一個接收者接收處理,即使有多個消息監聽者也是如此。
2)釋出/訂閱模式(一對多)
釋出訂閱模型則是另一個消息傳送模型。釋出訂閱模型可以有多種不同的訂閱者,臨時訂閱者隻在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的所有消息,即使目前訂閱者不可用,處于離線狀态。
消息隊列的優勢
1)解耦
允許你獨立的擴充或修改兩邊的處理過程,隻要確定它們遵守同樣的接口限制。
2)備援
消息隊列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丢失風險。許多消息隊列所采用的"插入-擷取-删除"範式中,在把一個消息從隊列中删除之前,需要你的處理系統明确的指出該消息已經被處理完畢,進而確定你的資料被安全的儲存直到你使用完畢。
3)擴充性
因為消息隊列解耦了你的處理過程,是以增大消息入隊和處理的頻率是很容易的,隻要另外增加處理過程即可。
4)靈活性 & 峰值處理能力
在通路量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。如果為以能處理這類峰值通路為标準來投入資源随時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵元件頂住突發的通路壓力,而不會因為突發的超負荷的請求而完全崩潰。
5)可恢複性
系統的一部分元件失效時,不會影響到整個系統。消息隊列降低了程序間的耦合度,是以即使一個處理消息的程序挂掉,加入隊列中的消息仍然可以在系統恢複後被處理。
6)順序保證
在大多使用場景下,資料處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證資料會按照特定的順序來處理。(Kafka保證一個Partition内的消息的有序性)
7)緩沖
有助于控制和優化資料流經過系統的速度,解決生産消息和消費消息的處理速度不一緻的情況。
8)異步通信
很多時候,使用者不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許使用者把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然後在需要的時候再去處理它們。
1.3、kafka架構
kafka屬于消息隊列的釋出/訂閱模式
架構圖
詳細架構圖
1.4、kafka術語及解釋
Producer:生産者,允許一個應用程式釋出一串流式的資料到一個或者多個Kafka topic
Consumer:消費者,允許一個應用程式訂閱一個或多個 topic ,并且對釋出給他們的流式資料進行處理
Consumer Group:每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于預設的group,Consumer Group是kafka用來實作一個topic消息的廣播(發給所有的consumer)和單點傳播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會複制(不是真的複制,是概念上的)到所有的CG,但每個partion隻會把消息發給該CG中的一個consumer。如果需要實作廣播,隻要每個consumer有一個獨立的CG就可以了。要實作單點傳播隻要所有的consumer在同一個CG。用CG還可以将consumer進行自由的分組而不需要多次發送消息到不同的topic;
Broker :一台kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic;
Topic :主題,可以了解為一個隊列
Partition:為了實作擴充性,一個非常大的topic可以分布到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列,每個partition中的資料使用多個segment檔案存儲。partition中的每條消息都會被配置設定一個有序的id(offset)。kafka隻保證按一個partition中的順序将消息發給consumer,不保證一個topic的整體(多個partition間)的順序。分區是對Topic的分布式存儲。
offset:0.9版本以前,partition的offset是由zookeeper管理,0.9版本之後,offset存在brooker本地。
Leader:每個partition有多個副本,其中有且僅有一個作為Leader,Leader是目前負責資料的讀寫的partition。
Follower:Follower跟随Leader,所有寫請求都通過Leader路由,資料變更會廣播給所有Follower,Follower與Leader保持資料同步。如果Leader失效,則從Follower中選舉出一個新的Leader。當Follower與Leader挂掉、卡住或者同步太慢,leader會把這個follower從“in sync replicas”(ISR)清單中删除,重新建立一個Follower。Follower是分區的備份,備份數小于等于Broker數。
Ps:更多詳細見kafka中文文檔:http://kafka.apachecn.org/intro.html
二、叢集部署
前提:
①叢集要求半數以上節點存活,是以叢集數量最好是奇數個,最少是3個
②叢集每台伺服器必須安裝JDK
③各節點時間/時區同步
④叢集内部免密
⑤修改主機名并作本地解析
⑥已部署zk叢集
叢集規劃:在node1,node2,node3三個節點部署kafka
JDK版本 1.8
zk版本 3.4.9
kafka版本 2.11
2.1.1、解壓安裝包
tar -zxvf kafka_2.11-1.0.0.tgz -C /home/bigdata/
2.1.2、建立日志目錄
mkdir kafka_2.11-1.0.0/{kafka-logs,logs}
2.1.3、配置檔案
需作修改的配置檔案有兩個
①config/log4j.properties
将kafka日志的目錄更新為/home/bigdata/kafka_2.11-1.0.0/logs
②config/server.properties
修改以下内容
#broker的全局唯一編号,不能重複
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的現成數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=10485760
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=10485760
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#kafka運作日志存放的路徑
log.dirs=/home/bigdata/kafka_2.11-1.0.0/kafka-logs/
#topic在目前broker上的分區個數
num.partitions=1
#用來恢複和清理data下資料的線程數量
num.recovery.threads.per.data.dir=1
#segment檔案保留的最長時間,逾時将被删除
log.retention.hours=168
#日志大小(預設1G)
log.segment.bytes=1073741824
#配置連接配接Zookeeper叢集位址
zookeeper.connect=192.168.143.140:2181,192.168.143.141:2181,192.168.143.142:2181/kafka
#連接配接zk叢集的逾時時間
zookeeper.connection.timeout.ms=30000
#向生産者和消費者宣告kafka監聽的IP和端口
listeners=PLAINTEXT://192.168.143.140:9092
advertised.listeners=PLAINTEXT://192.168.143.140:9092
#設定kafka端口
port=9092
#不允許自建主題
auto.create.topics.enable=false
#啟用日志定期删除政策
log.cleanup.policy=delete
#消息正文的最大大小,機關位元組。
message.max.bytes=10485760
#副本每次擷取的最大資料大小
replica.fetch.max.bytes=20485760
2.1.4、環境變量配置
在/etc/profile中添加環境變量
#kafka
export KAFKA_HOME=/home/bigdata/kafka_2.11-1.0.0
export PATH=$KAFKA_HOME/bin:$PATH
重讀環境變量
source /etc/profile
2.1.5、啟動叢集
JMX_PORT=9191 nohup /home/bigdata/kafka_2.11-1.0.0/bin/kafka-server-start.sh /home/bigdata/kafka_2.11-1.0.0/config/server.properties > /dev/null 2>&1 &
Ps:開啟JMX輪詢
2.1.6、關閉叢集
/home/bigdata/kafka_2.11-1.0.0/bin/kafka-server-stop.sh stop
三、kafka指令行操作
3.1、建立topic
kafka-topics.sh '--create' '--zookeeper' ''192.168.143.140':2181/kafka' '--replication-factor' 1 '--partitions' 3 '--topic' test1
選項說明:
--topic 定義topic名
--replication-factor 定義副本數
--partitions 定義分區數
3.2、檢視伺服器目前所有的topic
[root@node1 bin]# kafka-topics.sh --zookeeper 192.168.143.140:2181/kafka --list
__consumer_offsets
test1
3.3、删除topic
需要server.properties中設定delete.topic.enable=true否則隻是标記删除或者直接重新開機
kafka-topics.sh --zookeeper 192.168.143.140:2181/kafka --delete --topic test1
3.4、檢視某個topic的詳情
[root@node1 kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper 192.168.143.140:2181/kafka --describe --topic test1
Topic:test1 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
3.5、生産者發送消息
[root@node1 kafka_2.11-1.0.0]# kafka-console-producer.sh --broker-list node1:9092 --topic test1
>hello kafka
>i am eric jia.
3.6、消費者消費消息
第一種:offset存在zk叢集
[root@node1 kafka_2.11-1.0.0]# kafka-console-consumer.sh --zookeeper node1:2181/kafka --from-beginning --topic test1
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
hello kafka
i am eric jia.
第二種:offset存在本地
[root@node1 kafka_2.11-1.0.0]# kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic test1
選項說明:
--from-beginning:會把主題中以往所有的資料都讀取出來
參考:
《尚矽谷大資料技術之Kafka》
http://kafka.apachecn.org/intro.html
https://www.cnblogs.com/qingyunzong/p/9004509.html
四、kafka-manager
Kafka-manager是目前最主流的kafka管理工具。該工具可以友善檢視叢集 主題分布情況,同時支援對 多個叢集的管理、分區平衡以及建立主題等操作。
特性:
• 管理多個叢集
• 輕松檢查群集狀态(主題,消費者,偏移,代理,副本分發,分區分發)
• 運作首選副本選舉
• 使用選項生成分區配置設定以選擇要使用的代理
• 運作分區重新配置設定(基于生成的配置設定)
• 使用可選主題配置建立主題(0.8.1.1具有與0.8.2+不同的配置)
• 删除主題(僅支援0.8.2+并記住在代理配置中設定delete.topic.enable = true)
• 主題清單現在訓示标記為删除的主題(僅支援0.8.2+)
• 批量生成多個主題的分區配置設定,并可選擇要使用的代理
• 批量運作重新配置設定多個主題的分區
• 将分區添加到現有主題
• 更新現有主題的配置
4.1、安裝
tar -zxvf kafka-manager-1.3.3.18 -C /home/bigdata/
4.2、修改配置檔案conf/application.conf
kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
修改為:
kafka-manager.zkhosts="192.168.143.140:2181"
4.3、啟動
nohup /home/bigdata/kafka-manager-1.3.3.18/bin/kafka-manager -Dconfig.file=$base_dir/kafka-manager-1.3.3.18/conf/application.conf -Dhttp.port=9998 > /dev/null 2>&1 &
-Dconfig.file:指明配置檔案
-Dhttp.port:指明服務監聽的端口(也可在配置檔案中指定)
4.4、界面管理
4.4.1、建立叢集
Cluster Name:叢集名
Cluster Zookeeper Hosts:zk叢集位址
Kafka Version:kafka版本
Enable JMX Polling:是否開啟 JMX 輪訓,該部分直接影響部分 kafka broker 和 topic 監控名額名額的擷取,生效的前提是 kafka 啟動時開啟了 JMX_PORT。
JMX Auth Username:JMX認證使用者名
JMX Auth Password:JMX認證密碼
JMX with SSL :JMX使用ssl認證
Poll consumer information :是否開啟擷取消費資訊,直接影響能夠在消費者頁面和 topic 頁面檢視消費資訊
Filter out inactive consumers :過濾掉不活躍的消費者
Enable Logkafka :開始日志
Enable Active OffsetCache:是否開啟 offset 緩存,決定 kafka-manager 是否緩存住 topic 的相關偏移量。
Display Broker and Topic Size:展示broker和topic的大小。
4.4.2、JMX輪詢名額檢視
Broker監控名額
Topic監控名額