文章目錄
-
- 第一章 概述
-
- 1.1 Kafka 的定義及特點
- 1.2 消息隊列的介紹
- 1.3 Kafka 的基礎架構
- 第二章 入門
-
- 2.1 Kafka 的安裝部署
- 2.2 Kafka 指令行操作
- 第三章 架構深入
-
- 3.1 Kafka 工作流程
- 3.2 Kafka 檔案存儲機制
- 3.3 Kafka 生産者
- 3.4 Kafka 消費者
- 3.5 Kafka 高效讀寫資料
- 3.6 Kafka 事務
- 第四章 API
- 第五章 監控
- 第六章 Flume 對接 Kafka
- 第七章 Kafka 面試題
第一章 概述
1.1 Kafka 的定義及特點
一個分布式的,基于 釋出/訂閱模式 的消息隊列(Massage Queue),主要應用于大資料實時處理領域;
Kafka 中資料是 有時效性地 儲存在 磁盤 中;
Kafka 由 Scala 編寫;
1.2 消息隊列的介紹
- 有兩種處理系統任務的方式:同步處理 和 異步處理。
- 同步處理:每一步必須等到前一步完成,才可執行。
- 異步處理:分批次處理,前一步未完成也可以開始下一步。
- 消息隊列為異步處理,使用消息隊列的好處:
- 解耦:隻要遵循相同接口,則可以獨立擴充或修改不同處理過程,提高可恢複性和健壯性。
- 緩沖:有助于控制和優化資料經過系統的速度,解決生産者和消費者速度不一緻的問題。衍生靈活性和削峰能力,以低成本應對突變的任務量。
- 異步通信:使用者需求,有時不需要立即處理,而是存儲在隊列中,需要時使用,如郵箱。
- 消息隊列的兩種模式:
-
點對點:
》消費者主動拉取資料,收到後隊 Queue 即删除該資料,無法消費曾消費過的資料;
》Queue 支援多個消費者,但是對于一個消息,隻能有一個消費者;
》消費者主動拉取資料,生産者不好控制推送速度。
-
釋出訂閱:
》消費者消費後的資料不會從 Queue 中删除;
》生産者生産消息到 topic 中,所有訂閱該 topic 的消費者都會收到該消息;
》隊列主動推送/消費者主動拉取,兩種傳遞模式。需要長期維持輪詢。
-
1.3 Kafka 的基礎架構
- 生産:
- ProducerA 生産消息,此消息屬于TopicA,被分為若幹分區,發送至 KafkaCluster 中的一個節點 Broker1 的TopicA.Partition0.Leader,消息的另一部分被發往 Broker2 節點的 TopicA.Partition1。
- 一個 broker 可以容納多個 topic;邏輯層的 Topic。
- 一個 topic 可分為多個 partition,partition 是一個有序隊列,實作擴充性;實體層的 Partition。
-
每個分區有對于的 Leader 和 Follower
Leader 是 producer 發送資料的對象、Comsumer 消費資料的對象,一個 Leader 對應多個 Follower;
Follower 用于存儲所屬分區的副本,實時從 Leader 中同步資料,Leader 故障時多個 Follower 選一個成為新的 Leader。
- 消費:
- 一個消費者組 CG 邏輯上是一個訂閱者,消費一個 Topic,消費者組間互不影響。
- 一個分區隻能由一個組中的一個消費者消費,一個消費者可以消費該組所屬 Topic 的多個分區,是以,當一個組中的消費者多餘該組對應 Topic 的 Partition 數時,無意義;而當兩者相等時,消費率最高。
- 每個消費者都屬于一個 CG。
- 消費者組中的成員,分别從 KafkaCluster 的不同節點拉取 Topic 的不同分區。
- 一個 CG 中的一個消費者,可以訂閱不同的 Topic。
- offset 與 Zookeeper
- Kafka 叢集和消費者都會向 ZK 注冊。0.9 版本前 offset 存在 ZK,0.9 及之後 offset 存在 KafkaCluster,因為消費者消費速度非常快,若加之于 ZK 的連接配接通信,這樣的高并發對 ZK 和 Kafka都不好。offset 用于存儲目前消費的資料在整體中的偏移量;
- offset 主題分為 50 個分區,副本數隻有一份;
- Kafka 叢集的 brokers 和 comsumers 都會向 Zookeeper 注冊自己。
- Replica
- 為保證資料的可靠性,每個分區都有若幹 Replica;
- Kafka 中的副本數不能超過目前的可用的 broker 數,多餘的副本會放在同一個節點上;
- 而 HDFS 中如果副本數大于 workers 數量,實際副本數為 workers 數量。
- 分區
- MapReduce 中的分區是為提高 ReduceTask 的并發度,提高計算效率;
- Hive 中的分區是為了查詢時,減少讀取的資料量,提高查詢效率;
- Kafka 中的分區是為了提高某個 Topic 的負載能力,提高叢集的負載均衡,提高資料傳輸并發度,和可擴充性(提高 brokers 的水準擴充,也可看作并發度的擴充)
-
有了分區,為何還要 Segment ?
若沒 Segment,一個 Partition 對應一個檔案,檔案會持續增大。Data Purge 定期執行時,需要把目前的和舊資料一并删除,然後建立一個塊新的檔案(實體位置);這不符合 Kafka 對資料在實體層順序寫入的優化初衷。而引入 Segment 後,每次 Data Purge 隻需把舊的 Segment 删除,保證目前資料在實體層始終是順序寫入,提高效率。
第二章 入門
2.1 Kafka 的安裝部署
- 下載下傳
-
官方下載下傳位址
http://kafka.apache.org/downloads.html
-
版本選擇
kafka_2.11-2.4.1.tgz
-
-
叢集規劃
---- hadoop102 ----|---- hadoop103 ----|---- hadoop104 ----
-------- zk ---------|--------- zk --------|-------- zk --------
------ kafka --------|------- kafka -------|------ kafka -------
- 安裝部署
# 解壓安裝包 tar -zxvf /opt/software/kafka_2.11-2.4.1.tgz -C /opt/module/ # 修改安裝目錄 mv kafka_2.11-2.4.1 kafka # 建立日志檔案夾,注意這裡也會存儲 topic 資料 cd $KAFKA_HOME; mkdir logs # 修改配置檔案,配置如下 vim $KAFKA_HOME/config/server.properties
# broker 的全局唯一編号,不能重複 broker.id=102 # 删除 topic 功能使能,目前版本預設 true,且隐藏此項 delete.topic.enable=true # 處理網絡請求的線程數量 num.network.threads=3 # 用來處理磁盤 IO 的線程數量 num.io.threads=8 # 發送套接字的緩沖區大小 socket.send.buffer.bytes=102400 # 接收套接字的緩沖區大小 socket.receive.buffer.bytes=102400 # 請求套接字的緩沖區大小 100M socket.request.max.bytes=104857600 # Kafka 運作日志存放路徑 log.dirs=/opt/module/kafka/logs # topic 在目前 broker 上的分區個數 num.partitions=1 # 用來恢複和清理 data 下資料的線程數量 num.recovery.threads.per.data.dir=1 # segment 檔案保留的最長時間,逾時将被删除 1week log.retention.hours=168 # 必須配置項 Zookeeper 叢集位址 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
# 配置環境變量 sudo vim /etc/profile.d/my_env.sh # KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:KAFKA_HOME/bin # 立即生效環境變量 source /etc/profile # 其他伺服器 - 分發 /opt/module/Kafka - 配置環境變量 - 修改 server.properties 中 broker.id
- 啟動和關閉
- 先啟動 Zookeeper,然後啟動 Kafka
# 各個節點分别啟動: kafka-server-start.sh -daemon config/server.properties -daemon 背景啟動 # 各個節點分别關閉: kafka-server-stop.sh stop
- 啟動腳本
#!/bin/bash case $1 in "start") for i in hadoop102 hadoop103 hadoop104 do echo "============== $i Kafka ==============" ssh $i /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties done ;; "stop") for i in hadoop102 hadoop103 hadoop104 do echo "============== $i Kafka ==============" ssh $i /opt/module/kafka/bin/kafka-server-stop.sh done ;; *) echo "Input Args Error..." ;; esac
-
關閉:
注意:關閉 Kafka 叢集的操作有一定的持續時間,如果在此時關閉了 Zookeeper,此時沒有關閉成功的 Kafka 服務隻能用 kill -9 來關閉了。
- 先啟動 Zookeeper,然後啟動 Kafka
2.2 Kafka 指令行操作
- 檢視所有 Topic
# 其中 hadoop102:9092 表示服務入口,可寫其他節點,可寫多個保證連接配接可靠性; # --bootstrap-server 表示資料偏移量存儲在 Kafka 中;如果參數是 --zookeeper 則資料偏移量存儲在 zookeeper 中;0.9 及以上版本 Kafka 為 前者; kafka-topics.sh --bootstrap-server hadoop102:9092 --list
- 建立 Topic
kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 1 --topic 主題名 --replication-factor 副本數 --partitions 分區數 --topic 主題名
- 删除 Topic
kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic 主題名
- 生産消息
# --broker-list 指定接收資料的服務端; kafka-console-producer.sh --broker-list hadoop102:9092 --topic 主題名 >hello world >hello kafka
- 消費消息
# 消費從此刻開始的新資料 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic 主題名 # 消費指定主題的所有資料 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic 主題名
- 檢視指定 Topic 詳情
kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic 主題名
- 修改分區數
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic 主題名 --partitions 修改後分區數
第三章 架構深入
3.1 Kafka 工作流程
1. 生産者向 Kafka 叢集中已有的 Topic 發送資料;
2. Topic 中資訊被分成多幹分區,每一條接收到的資料都存在 Partition 下的檔案中,并不斷追加到對應檔案末端,每條資料都有自己的 offset
3. 每個分區的 Leader 将資料副本發送至 Follower;
4. 消費者從指定主題的分區中消費資料,并記錄自己消費到的資料的 offset,以便出錯恢複時,從上一次的位置繼續消費。
3.2 Kafka 檔案存儲機制
- 由于生産者生産的消息會不斷追加到 log 檔案末尾,為防止 log 檔案過大導緻資料定位效率低下,Kafka 采用 分片 和 索引 機制,将每個 partition 分為多個 segment。
- 每個 segment 對應兩個檔案—— .index 和 .log 檔案。
- 一個分區對應一個檔案夾。檔案夾命名規則為:主題名+分區号。例如,first 這個主題有三個分區,其對應的檔案夾為:first-0,first-1,first-2;本文配置的資料存儲在 /opt/module/kafka/logs
00000000000000000000.index 00000000000000000000.log 00000000000000170410.index 00000000000000170410.log 00000000000000239430.index 00000000000000239430.log
- index 和 log 檔案以目前的 segmant 中的第一條消息的 offset 命名
3.3 Kafka 生産者
- 分區政策
-
為什麼要分區?
》友善資料在叢集中的擴充:每個 Partition 可以調節以适應所在的機器,整個叢集可以适應任意大小的資料;
》可以提高資料讀寫的并發度:以 Partition 的 Leader 為讀寫機關。
- 分區規則
// producer 推送的資料被封裝成一個 ProducerRecord 對象 // ProducerRecord 類的構造器 ProducerRecord(@NotNull String topic, Integer partition, Long timestamp, String key, String value, @Nullable Iterable<Header> headers) /* 1. 指明 parition 的情況下,直接将指明的值作為 partition 的值; 2. 沒指明 partition 但有 key,将 key 的 hash 值與 topic 的分區數進行求餘運算得 parition 值; 3. partition 與 key 都沒有指定,Kafka 采用 StickyPartition(粘性分區器),随機選擇一個分區,并盡可能的一直使用該分區,待該分區的 batch 資料批已滿或者超過時間間隔,Kafka 再随機一個分區進行寫入。 */
-
- 資料可靠性保證
-
生産者 pull 資料到 Leader 的可靠性保證
》每個 partition 收到 producer 的資料後,需要回複 ack;
》producer 收到目前消息的 ack 之後,才會繼續發送,否則重複發送;
》確定有 Follower 與 Leader 同步完成後,Leader 才回複 ack;這樣能確定 Leader 挂掉 Follower 替代 Leader 時沒有資料丢失;
》多少個 Follower 同步完成 Leader 回複 ack?方案1:半數以上;方案2:all
-
Topic Partition 儲存資料的可靠性
》Follower 副本同步
方案 特點 半數以上 延遲低 all 延遲高 m 個 broker 的叢集,需要 n 台節點的容錯能力,假設副本數為 x
—方案1:最壞的情況,n 台節點挂掉,并丢失 n 個副本,此時若要傳回 ack 則需要有 x/2<n+1 個 副本已經同步成功,即配置 x>= 2n+1 時,才能保證等于或低于 n 台節點挂掉,仍能使 ack 順利傳回,資料繼續發送;這種方案資料大量備援;
—方案2:最壞的情況,n 台節點挂掉,并丢失 n 個副本,此時若要傳回 ack 則需要 x 個副本全部同步成功,即配置任意副本數,都無法使 ack 順利傳回,資料無法繼續發送;
— Kafka 既沒有選擇 方案1,也沒有選擇方案2;
》ISR
》ack 應答級别
對于可靠性要求不高的資料,比如 前端埋點的資料能夠容忍資料的少量丢失,沒必要等 ISR 中的 Follower 全部接受成功。是以 Kafka 提供了三種可靠性級别,使用者根據對資料可靠性和延遲的要求,做出相應的配置。
-
- Exactly Once