天天看點

Kafka消息隊列 入門到精通 看這一篇就夠了

文章目錄

    • 第一章 概述
      • 1.1 Kafka 的定義及特點
      • 1.2 消息隊列的介紹
      • 1.3 Kafka 的基礎架構
    • 第二章 入門
      • 2.1 Kafka 的安裝部署
      • 2.2 Kafka 指令行操作
    • 第三章 架構深入
      • 3.1 Kafka 工作流程
      • 3.2 Kafka 檔案存儲機制
      • 3.3 Kafka 生産者
      • 3.4 Kafka 消費者
      • 3.5 Kafka 高效讀寫資料
      • 3.6 Kafka 事務
    • 第四章 API
    • 第五章 監控
    • 第六章 Flume 對接 Kafka
    • 第七章 Kafka 面試題

第一章 概述

1.1 Kafka 的定義及特點

​ 一個分布式的,基于 釋出/訂閱模式 的消息隊列(Massage Queue),主要應用于大資料實時處理領域;

​ Kafka 中資料是 有時效性地 儲存在 磁盤 中;

​ Kafka 由 Scala 編寫;

1.2 消息隊列的介紹

  1. 有兩種處理系統任務的方式:同步處理 和 異步處理。
  2. 同步處理:每一步必須等到前一步完成,才可執行。
  3. 異步處理:分批次處理,前一步未完成也可以開始下一步。
  4. 消息隊列為異步處理,使用消息隊列的好處:
    • 解耦:隻要遵循相同接口,則可以獨立擴充或修改不同處理過程,提高可恢複性和健壯性。
    • 緩沖:有助于控制和優化資料經過系統的速度,解決生産者和消費者速度不一緻的問題。衍生靈活性和削峰能力,以低成本應對突變的任務量。
    • 異步通信:使用者需求,有時不需要立即處理,而是存儲在隊列中,需要時使用,如郵箱。
  5. 消息隊列的兩種模式:
    • 點對點:

      》消費者主動拉取資料,收到後隊 Queue 即删除該資料,無法消費曾消費過的資料;

      》Queue 支援多個消費者,但是對于一個消息,隻能有一個消費者;

      》消費者主動拉取資料,生産者不好控制推送速度。

    • 釋出訂閱:

      ​ 》消費者消費後的資料不會從 Queue 中删除;

      ​ 》生産者生産消息到 topic 中,所有訂閱該 topic 的消費者都會收到該消息;

      ​ 》隊列主動推送/消費者主動拉取,兩種傳遞模式。需要長期維持輪詢。

1.3 Kafka 的基礎架構

Kafka消息隊列 入門到精通 看這一篇就夠了
  • 生産:
    1. ProducerA 生産消息,此消息屬于TopicA,被分為若幹分區,發送至 KafkaCluster 中的一個節點 Broker1 的TopicA.Partition0.Leader,消息的另一部分被發往 Broker2 節點的 TopicA.Partition1。
    2. 一個 broker 可以容納多個 topic;邏輯層的 Topic。
    3. 一個 topic 可分為多個 partition,partition 是一個有序隊列,實作擴充性;實體層的 Partition。
    4. 每個分區有對于的 Leader 和 Follower

      Leader 是 producer 發送資料的對象、Comsumer 消費資料的對象,一個 Leader 對應多個 Follower;

      Follower 用于存儲所屬分區的副本,實時從 Leader 中同步資料,Leader 故障時多個 Follower 選一個成為新的 Leader。

  • 消費:
    1. 一個消費者組 CG 邏輯上是一個訂閱者,消費一個 Topic,消費者組間互不影響。
    2. 一個分區隻能由一個組中的一個消費者消費,一個消費者可以消費該組所屬 Topic 的多個分區,是以,當一個組中的消費者多餘該組對應 Topic 的 Partition 數時,無意義;而當兩者相等時,消費率最高。
    3. 每個消費者都屬于一個 CG。
    4. 消費者組中的成員,分别從 KafkaCluster 的不同節點拉取 Topic 的不同分區。
    5. 一個 CG 中的一個消費者,可以訂閱不同的 Topic。
  • offset 與 Zookeeper
    1. Kafka 叢集和消費者都會向 ZK 注冊。0.9 版本前 offset 存在 ZK,0.9 及之後 offset 存在 KafkaCluster,因為消費者消費速度非常快,若加之于 ZK 的連接配接通信,這樣的高并發對 ZK 和 Kafka都不好。offset 用于存儲目前消費的資料在整體中的偏移量;
    2. offset 主題分為 50 個分區,副本數隻有一份;
    3. Kafka 叢集的 brokers 和 comsumers 都會向 Zookeeper 注冊自己。
  • Replica
    1. 為保證資料的可靠性,每個分區都有若幹 Replica;
    2. Kafka 中的副本數不能超過目前的可用的 broker 數,多餘的副本會放在同一個節點上;
    3. 而 HDFS 中如果副本數大于 workers 數量,實際副本數為 workers 數量。
  • 分區
    1. MapReduce 中的分區是為提高 ReduceTask 的并發度,提高計算效率;
    2. Hive 中的分區是為了查詢時,減少讀取的資料量,提高查詢效率;
    3. Kafka 中的分區是為了提高某個 Topic 的負載能力,提高叢集的負載均衡,提高資料傳輸并發度,和可擴充性(提高 brokers 的水準擴充,也可看作并發度的擴充)
    4. 有了分區,為何還要 Segment ?

      若沒 Segment,一個 Partition 對應一個檔案,檔案會持續增大。Data Purge 定期執行時,需要把目前的和舊資料一并删除,然後建立一個塊新的檔案(實體位置);這不符合 Kafka 對資料在實體層順序寫入的優化初衷。而引入 Segment 後,每次 Data Purge 隻需把舊的 Segment 删除,保證目前資料在實體層始終是順序寫入,提高效率。

第二章 入門

2.1 Kafka 的安裝部署

  • 下載下傳
    1. 官方下載下傳位址

      http://kafka.apache.org/downloads.html

    2. 版本選擇

      kafka_2.11-2.4.1.tgz

  • 叢集規劃

    ---- hadoop102 ----|---- hadoop103 ----|---- hadoop104 ----

    -------- zk ---------|--------- zk --------|-------- zk --------

    ------ kafka --------|------- kafka -------|------ kafka -------

  • 安裝部署
    # 解壓安裝包
    tar -zxvf /opt/software/kafka_2.11-2.4.1.tgz -C /opt/module/
    
    # 修改安裝目錄
    mv kafka_2.11-2.4.1 kafka
    
    # 建立日志檔案夾,注意這裡也會存儲 topic 資料
    cd $KAFKA_HOME; mkdir logs
    
    # 修改配置檔案,配置如下
    vim $KAFKA_HOME/config/server.properties 
               
    # broker 的全局唯一編号,不能重複
    broker.id=102
    
    # 删除 topic 功能使能,目前版本預設 true,且隐藏此項
    delete.topic.enable=true
    
    # 處理網絡請求的線程數量
    num.network.threads=3
    
    # 用來處理磁盤 IO 的線程數量
    num.io.threads=8
    
    # 發送套接字的緩沖區大小
    socket.send.buffer.bytes=102400
    
    # 接收套接字的緩沖區大小
    socket.receive.buffer.bytes=102400
    
    # 請求套接字的緩沖區大小 100M
    socket.request.max.bytes=104857600
    
    # Kafka 運作日志存放路徑
    log.dirs=/opt/module/kafka/logs
    
    # topic 在目前 broker 上的分區個數
    num.partitions=1
    
    # 用來恢複和清理 data 下資料的線程數量
    num.recovery.threads.per.data.dir=1
    
    # segment 檔案保留的最長時間,逾時将被删除 1week
    log.retention.hours=168
    
    # 必須配置項 Zookeeper 叢集位址
    zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
               
    # 配置環境變量
    sudo vim /etc/profile.d/my_env.sh
    
    # KAFKA_HOME
    export KAFKA_HOME=/opt/module/kafka
    export PATH=$PATH:KAFKA_HOME/bin
    
    # 立即生效環境變量
    source /etc/profile
    
    # 其他伺服器
    - 分發 /opt/module/Kafka
    - 配置環境變量
    - 修改 server.properties 中 broker.id
               
  • 啟動和關閉
    1. 先啟動 Zookeeper,然後啟動 Kafka
      # 各個節點分别啟動:
      	kafka-server-start.sh -daemon config/server.properties
      	-daemon 背景啟動
      
      # 各個節點分别關閉:
      	kafka-server-stop.sh stop
                 
    2. 啟動腳本
      #!/bin/bash
      case $1 in 
      "start")
      	for i in hadoop102 hadoop103 hadoop104
      	do
          	echo "============== $i Kafka =============="
          	ssh $i /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
          done
      ;;
      "stop")
      	for i in hadoop102 hadoop103 hadoop104
      	do
      		echo "============== $i Kafka =============="
      		ssh $i /opt/module/kafka/bin/kafka-server-stop.sh
      	done
      ;;
      *)
      	echo "Input Args Error..."
      ;;
      esac
                 
    3. 關閉:

      注意:關閉 Kafka 叢集的操作有一定的持續時間,如果在此時關閉了 Zookeeper,此時沒有關閉成功的 Kafka 服務隻能用 kill -9 來關閉了。

2.2 Kafka 指令行操作

  • 檢視所有 Topic
    # 其中 hadoop102:9092 表示服務入口,可寫其他節點,可寫多個保證連接配接可靠性;
    # --bootstrap-server 表示資料偏移量存儲在 Kafka 中;如果參數是 --zookeeper 則資料偏移量存儲在 zookeeper 中;0.9 及以上版本 Kafka 為 前者;
    kafka-topics.sh --bootstrap-server hadoop102:9092 --list
               
  • 建立 Topic
    kafka-topics.sh  --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 1 --topic 主題名
    
    --replication-factor 副本數
    --partitions 分區數
    --topic 主題名
               
  • 删除 Topic
    kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic 主題名
               
  • 生産消息
    # --broker-list 指定接收資料的服務端;
    	kafka-console-producer.sh --broker-list hadoop102:9092 --topic 主題名
    >hello world
    >hello kafka
               
  • 消費消息
    # 消費從此刻開始的新資料
    kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic 主題名
    
    # 消費指定主題的所有資料
    kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic 主題名
               
  • 檢視指定 Topic 詳情
    kafka-topics.sh	--bootstrap-server hadoop102:9092 --describe --topic 主題名
               
  • 修改分區數
    kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic 主題名 --partitions 修改後分區數
               

第三章 架構深入

3.1 Kafka 工作流程

1. 生産者向 Kafka 叢集中已有的 Topic 發送資料;
	2. Topic 中資訊被分成多幹分區,每一條接收到的資料都存在 Partition 下的檔案中,并不斷追加到對應檔案末端,每條資料都有自己的 offset
	3. 每個分區的 Leader 将資料副本發送至 Follower;
	4. 消費者從指定主題的分區中消費資料,并記錄自己消費到的資料的 offset,以便出錯恢複時,從上一次的位置繼續消費。
           

3.2 Kafka 檔案存儲機制

Kafka消息隊列 入門到精通 看這一篇就夠了
  1. 由于生産者生産的消息會不斷追加到 log 檔案末尾,為防止 log 檔案過大導緻資料定位效率低下,Kafka 采用 分片 和 索引 機制,将每個 partition 分為多個 segment。
  2. 每個 segment 對應兩個檔案—— .index 和 .log 檔案。
  3. 一個分區對應一個檔案夾。檔案夾命名規則為:主題名+分區号。例如,first 這個主題有三個分區,其對應的檔案夾為:first-0,first-1,first-2;本文配置的資料存儲在 /opt/module/kafka/logs
    00000000000000000000.index
    00000000000000000000.log
    00000000000000170410.index
    00000000000000170410.log
    00000000000000239430.index
    00000000000000239430.log
               
  4. index 和 log 檔案以目前的 segmant 中的第一條消息的 offset 命名
Kafka消息隊列 入門到精通 看這一篇就夠了

3.3 Kafka 生産者

  • 分區政策
    1. 為什麼要分區?

      》友善資料在叢集中的擴充:每個 Partition 可以調節以适應所在的機器,整個叢集可以适應任意大小的資料;

      ​ 》可以提高資料讀寫的并發度:以 Partition 的 Leader 為讀寫機關。

    2. 分區規則
      // producer 推送的資料被封裝成一個 ProducerRecord 對象
      
      // ProducerRecord 類的構造器
      ProducerRecord(@NotNull String topic, Integer partition, Long timestamp, String key, String value, @Nullable Iterable<Header> headers)
      
      /*
      1. 指明 parition 的情況下,直接将指明的值作為 partition 的值;
      2. 沒指明 partition 但有 key,将 key 的 hash 值與 topic 的分區數進行求餘運算得 parition 值;
      3. partition 與 key 都沒有指定,Kafka 采用 StickyPartition(粘性分區器),随機選擇一個分區,并盡可能的一直使用該分區,待該分區的 batch 資料批已滿或者超過時間間隔,Kafka 再随機一個分區進行寫入。
      */
                 
  • 資料可靠性保證
    1. 生産者 pull 資料到 Leader 的可靠性保證

      》每個 partition 收到 producer 的資料後,需要回複 ack;

      》producer 收到目前消息的 ack 之後,才會繼續發送,否則重複發送;

      》確定有 Follower 與 Leader 同步完成後,Leader 才回複 ack;這樣能確定 Leader 挂掉 Follower 替代 Leader 時沒有資料丢失;

      》多少個 Follower 同步完成 Leader 回複 ack?方案1:半數以上;方案2:all

    2. Topic Partition 儲存資料的可靠性

      》Follower 副本同步

      方案 特點
      半數以上 延遲低
      all 延遲高

      m 個 broker 的叢集,需要 n 台節點的容錯能力,假設副本數為 x

      —方案1:最壞的情況,n 台節點挂掉,并丢失 n 個副本,此時若要傳回 ack 則需要有 x/2<n+1 個 副本已經同步成功,即配置 x>= 2n+1 時,才能保證等于或低于 n 台節點挂掉,仍能使 ack 順利傳回,資料繼續發送;這種方案資料大量備援;

      —方案2:最壞的情況,n 台節點挂掉,并丢失 n 個副本,此時若要傳回 ack 則需要 x 個副本全部同步成功,即配置任意副本數,都無法使 ack 順利傳回,資料無法繼續發送;

      — Kafka 既沒有選擇 方案1,也沒有選擇方案2;

      》ISR

      》ack 應答級别

      對于可靠性要求不高的資料,比如 前端埋點的資料能夠容忍資料的少量丢失,沒必要等 ISR 中的 Follower 全部接受成功。是以 Kafka 提供了三種可靠性級别,使用者根據對資料可靠性和延遲的要求,做出相應的配置。

  • Exactly Once

3.4 Kafka 消費者

3.5 Kafka 高效讀寫資料

3.6 Kafka 事務

第四章 API

第五章 監控

第六章 Flume 對接 Kafka

第七章 Kafka 面試題

繼續閱讀