1.重要的配置參數
- 存儲
- log.dir
- log.dirs
- topic
- auto.create.topics.enable:是否允許自動建立Topic
- unclean.leader.election.enable:是否允許Unclean Leader選舉
- auto.leader.rebalance.enable:是否允許定期進行Leader選舉
- retention.ms:規定了該Topic消息被儲存的時長。預設是7天
2.生産者消息分區機制
- Kafka的消息組織方式實際上是三級結構:主題-分區-消息
- 分區是實作負載均衡以及高吞吐量的關鍵
- 分區政策:輪訓、随機、按消息鍵保序
3.生産者壓縮算法
- 壓縮發生場景:producer端、broker端
- broker端發生壓縮的場景
- broker端配置了和producer端不一樣的壓縮算法
- broker端發生了消息格式轉化:主要是為了相容老版本的消費者程式,這個過程中會涉及消息的解壓縮和重新壓縮
- 壓縮算法
- 在吞吐量方面:LZ4 > Snappy > zstd和GZIP;
- 而在壓縮比方面,zstd > LZ4 > GZIP > Snappy
4.無消息丢失怎麼配置
1、不要使用producer.send(msg),而要使用producer.send(msg, callback)。記住,一定要使用帶有回調通知的send方法。
2、設定acks = all。
- acks是Producer的一個參數,代表了你對“已送出”消息的定義。如果設定成all,則表明所有副本Broker都要接收到消息,該消息才算是“已送出”。這是最高等級的“已送出”定義。
- 0:發出去就算成功
- 1:隻要Partition Leader接收到消息而且寫入本地磁盤了,就認為成功了,不管他其他的Follower有沒有同步過去這條消息
- all:Partition Leader接收到消息之後,還必須要求ISR清單裡跟Leader保持同步的那些Follower都要把消息同步過去,才能認為這條消息是寫入成功
- acks=all 就可以代表資料一定不會丢失了嗎? 當然不是,如果你的Partition隻有一個副本,也就是一個Leader,任何Follower都沒有,你認為acks=all有用嗎? 當然沒用了,因為ISR裡就一個Leader,他接收完消息後當機,也會導緻資料丢失。 是以說,這個acks=all,必須跟ISR清單裡至少有2個以上的副本配合使用,起碼是有一個Leader和一個Follower才可以
3、設定retries為一個較大的值。這裡的retries同樣是Producer的參數,對應前面提到的Producer自動重試。當出現網絡的瞬時抖動時,消息發送可能會失敗,此時配置了retries > 0的Producer能夠自動重試消息發送,避免消息丢失。
4、設定unclean.leader.election.enable = false。這是Broker端的參數,它控制的是哪些Broker有資格競選分區的Leader。如果一個Broker落後原先的Leader太多,那麼它一旦成為新的Leader,必然會造成消息的丢失。故一般都要将該參數設定成false,即不允許這種情況的發生。
5、設定replication.factor >= 3。這也是Broker端的參數。其實這裡想表述的是,最好将消息多儲存幾份,畢竟目前防止消息丢失的主要機制就是備援。
6、設定min.insync.replicas > 1。這依然是Broker端參數,控制的是消息至少要被寫入到多少個副本才算是“已送出”。設定成大于1可以提升消息持久性。在實際環境中千萬不要使用預設值1。
7、確定replication.factor > min.insync.replicas。如果兩者相等,那麼隻要有一個副本挂機,整個分區就無法正常工作了。我們不僅要改善消息的持久性,防止資料丢失,還要在不降低可用性的基礎上完成。推薦設定成replication.factor = min.insync.replicas + 1。
8、確定消息消費完成再送出。Consumer端有個參數enable.auto.commit,最好把它設定成false,并采用手動送出位移的方式。就像前面說的,這對于單Consumer多線程處理的場景而言是至關重要的。
5.生成者管理TCP連接配接
- KafkaProducer執行個體建立時啟動Sender線程,進而建立與bootstrap.servers中所有Broker的TCP連接配接。
- KafkaProducer執行個體首次更新中繼資料資訊之後,還會再次建立與叢集中所有Broker的TCP連接配接。
- 如果Producer端發送消息到某台Broker時發現沒有與該Broker的TCP連接配接,那麼也會立即建立連接配接。
- 如果設定Producer端connections.max.idle.ms參數大于0,則步驟1中建立的TCP連接配接會被自動關閉;如果設定該參數=-1,那麼步驟1中建立的TCP連接配接将無法被關閉,進而成為“僵屍”連接配接。
6.幂等生産者與事務生産者
- 幂等性Producer隻能保證單分區、單會話上的消息幂等性
- 事務型Producer能夠保證将消息原子性地寫入到多個分區中。這批消息要麼全部寫入成功,要麼全部失敗。另外,事務型Producer也不懼程序的重新開機。Producer重新開機回來後,Kafka依然保證它們發送消息的精确一次處理實際上即使寫入失敗,Kafka也會把它們寫入到底層的日志中,也就是說Consumer還是會看到這些消息
- 事務生産者一文讀懂 kafka 的事務機制
- transaction coordinator
- transaction marker
- transaction log
- 兩階段送出在兩階段送出協定的第一階段,transactional coordinator 更新記憶體中的事務狀态為 “prepare_commit”,并将該狀态持久化到 transaction log 中;
- 在兩階段送出協定的第二階段, coordinator 首先寫 transaction marker 标記到目标 topic 的目标 partition,這裡的 transaction marker,就是我們上文說的控制消息,控制消息共有兩種類型:commit 和 abort,分别用來表征事務已經成功送出或已經被成功終止;
- 在兩階段送出協定的第二階段, coordinator 在向目标 topic 的目标 partition 寫完控制消息後,會更新事務狀态為 “commited” 或 “abort”, 并将該狀态持久化到 transaction log 中;
7.消費者組到底是什麼
- Consumer Group是Kafka提供的可擴充且具有容錯性的消費者機制
- Consumer Group下可以有一個或多個Consumer執行個體。這裡的執行個體可以是一個單獨的程序,也可以是同一程序下的線程。在實際場景中,使用程序更為常見一些。
- Group ID是一個字元串,在一個Kafka叢集中,它辨別唯一的一個Consumer Group。
- Consumer Group下所有執行個體訂閱的主題的單個分區,隻能配置設定給組内的某個Consumer執行個體消費。這個分區當然也可以被其他的Group消費。
- Rebalance本質上是一種協定,規定了一個Consumer Group下的所有Consumer如何達成一緻,來配置設定訂閱Topic的每個分區
- 組成員數發生變更。比如有新的Consumer執行個體加入組或者離開組,抑或是有Consumer執行個體崩潰被“踢出”組。
- 訂閱主題數發生變更。Consumer Group可以使用正規表達式的方式訂閱主題,比如consumer.subscribe(Pattern.compile(“t.*c”))就表明該Group訂閱所有以字母t開頭、字母c結尾的主題。在Consumer Group的運作過程中,你新建立了一個滿足這樣條件的主題,那麼該Group就會發生Rebalance。
- 訂閱主題的分區數發生變更。Kafka目前隻能允許增加一個主題的分區數。當分區數增加時,就會觸發訂閱該主題的所有Group開啟Rebalance。
8.消費者重平衡過程
消費者端,重平衡分為兩個步驟:分别是加入組和等待上司者消費者(Leader Consumer)配置設定方案。這兩個步驟分别對應兩類特定的請求:JoinGroup請求和SyncGroup請求
- 當組内成員加入組時,它會向協調者發送JoinGroup請求。在該請求中,每個成員都要将自己訂閱的主題上報,這樣協調者就能收集到所有成員的訂閱資訊。一旦收集了全部成員的JoinGroup請求後,協調者會從這些成員中選擇一個擔任這個消費者組的上司者。通常情況下,第一個發送JoinGroup請求的成員自動成為上司者。你一定要注意區分這裡的上司者和之前我們介紹的上司者副本,它們不是一個概念。這裡的上司者是具體的消費者執行個體,它既不是副本,也不是協調者。上司者消費者的任務是收集所有成員的訂閱資訊,然後根據這些資訊,制定具體的分區消費配置設定方案。
- 選出上司者之後,協調者會把消費者組訂閱資訊封裝進JoinGroup請求的響應體中,然後發給上司者,由上司者統一做出配置設定方案後,進入到下一步:發送SyncGroup請求。
- 在這一步中,上司者向協調者發送SyncGroup請求,将剛剛做出的配置設定方案發給協調者。值得注意的是,其他成員也會向協調者發送SyncGroup請求,隻不過請求體中并沒有實際的内容。這一步的主要目的是讓協調者接收配置設定方案,然後統一以SyncGroup響應的方式分發給所有成員,這樣組内所有成員就都知道自己該消費哪些分區了