天天看點

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

kafka搭建叢集,我這裡就不寫了

https://www.cnblogs.com/wangxiaoheng/articles/10000791.html

我的環境是搭建了3台kafka的叢集:

下面是kafka的一些指令操作:

一、使用./kafka-topics.sh指令

1、使用下面的指令可以檢視,有那些指令

./kafka-topics.sh --list --zookeeper localhost:2181
           

2、建立topic

./kafka-topics.sh --create --zookeeper localhost:2181 --topic topic1 --partitions 3 --replication-factor 2
           

這個是建立主題是:topic1,分區:3個;副本數是:2個

2.1、執行:./kafka-topics.sh --create --zookeeper localhost:2181 --topic topic2 --partitions 4 --replication-factor 2

這個是建立分區為4個,能建立成功

2.2、執行:./kafka-topics.sh --create --zookeeper localhost:2181 --topic topic2 --partitions 2 --replication-factor 4

建立4個副本失敗,這是因為我的叢集機器總共就隻有3個

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

3、檢視topic的詳情

./kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181
           
kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

4、删除topic

./kafka-topics.sh --delete --zookeeper localhost:2181 --topic -kafkafrist
           
kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

下面那句話提示我們,删除topic:-kafkafrist的時候,如果沒有delete.topic.enable=true的時候,是沒有真正的删除

二、使用./kafka-console-producer.sh 指令

1、生産,這樣就可以

./kafka-console-producer.sh --topic topic1 --broker-list localhost:9092
           
kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

加上--from-begining可以從頭開始消費 ,

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

2、消費

./kafka-console-consumer.sh --topic topic1 --bootstrap-server localhost:9093
           

 會建立消費者的分區,預設是50個分區,均勻的分布在叢集節點上

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

加上--from-begining可以從頭開始消費 ,但是這個消息隻會存儲168個小時,是在kafka的配置檔案裡面設定的,把消費者的消費消息存儲在kafka伺服器上。

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

消費方式有兩種:

0.9版本及之後offset存儲本地,0.9版本之前offset存儲在ZK 

 ./kafka-console-consumer.sh --topic topic1 --zookeeper localhost:2181(這種方式可能在以後不支援了)

下面除了zookeeper都是kafka的資料:

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

三、kafka檔案存儲

3.1、存儲

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

由于生産者生産的消息會不斷追加到log檔案末尾,為防止log檔案過大導緻資料定位消息低下,kafka采取了分片和索引機制,将每個partition分為多個segment。每個segment對應兩個檔案--“.index”檔案和".log"檔案。這些檔案位于一個檔案夾下,該檔案夾的命名規則為:topic名稱+分區序号。例如,topic1的分區,則其對應的檔案夾為topic1-0,topic1-1。

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

index和log檔案以目前segment的第一條消息的offset命名,下圖為index檔案和log。(我們都是通過二分查找的方式找到索引的所對應的位址,和檔案大小,能過很快的擷取到消息)

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

“index”檔案存儲大量的索引消息,“log”檔案存儲大量的資料,索引檔案中的中繼資料指向對應資料檔案中的message的偏移量位址。

3.2、分區政策

分區的原因:

  • 1、友善在叢集中擴充,每個partition可以通過調整以适應它所在的機器,而一個topic又可以又多個partition組成,是以整個叢集就可以适應任意大小的資料了;
  • 2、可以提高并發,因為可以以partition為機關讀寫。
kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

分區的原則:

我們需要将producer發送的資料封裝成一個producerRecord對象。

  • 1、指明partition的情況下,直接将指明的值直接作為partition值;
  • 2、沒有指明partition值但又key的情況下,将key的hash值與topic的partition數進行取餘得到partition值;
  • 3、即沒有partition值又沒有key值的情況下,第一次條用時随機生成一個整數(後面每次調用在這個整數上自增),将這個值與topic可用的partition總數取餘得到partition值,也就是常說的round-robin算法(輪詢)。

key官方解釋:

A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the

 record is being received and an offset that points to the record in a Kafka partition.

我的了解

kafka0.9用戶端都以record為一條消息,進行發送,record包含一個鍵值對,分區和topic名。key像map中的key,隻是一條record的一個傳遞屬性,可有可無,你可以靈活的使用它,也可不使用。

3.3、資料可靠性保證,怎麼保證?

為保證producer發送的資料,能可靠的發送到指定的topic,topic的每個partition收到producer發送的資料後,都需要向producer發送ack(acknowledgement确認收到),如果producer收到ack,就會進行下一輪的發送,否則重新發送資料。

3.3.1、副本資料同步政策:

方案 有點 缺點
半數以上完成同步,就發送ack 延遲低 選舉新的leader時,容忍n台節點的故障,需要2n+1個副本
全部完成同步,才發送ack 選舉新的leader時,容忍n台節點的故障,需要n+1個副本 延遲高

Kafka選擇了第二種方案,原因如下:

  • 1、同樣為了容忍n台節點的故障,第一種方案需要2n+1個副本,而第二種隻需要n+1個副本,而kafka的每個分區都有大量的資料,第一種方案會造成大量資料的備援。
  • 2、雖然第二種方案的網絡延遲會比較高,但網絡延遲對kafka的影響較小。

3.3.2、ISR機制

采用第二種方案之後,設想一下場景:leader收到資料,所有follower都開始同步資料,擔憂一個follower,因為某種故障,遲遲不能與leader進行同步,那leader就要一直等下去,直到它完成同步,才能發送ack。這個問題怎麼解決呢?

leader維護了一個動态的in-sync replica set(ISR),意味和leader保持同步的follower集合。當ISR中的follower完成資料的同步之後,leader就會給follower發送ack。如果follower長時間未向leader同步資料,則該follower将被踢出ISR,該時間閥值由replica.lag.time.max.ms參數設定。leader發生故障之後,就會從ISR中選舉新的leader。

3.3.3、ack應答機制

對應某些不太重要的資料,對資料的可靠性要求不是提高,能夠容忍資料的少量丢失,是以沒必要等ISR中的follower全部接受成功。

是以kafka為使用者提供了三種可靠性級别,使用者根據對可靠性和延遲的要求進行權衡,選擇以下的配置。

acks參數配置:

acks:

  • 0:producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一個接收到還沒有寫入磁盤就已經傳回,當broker故障時可能丢失資料;
  • 1:producer等待broker的ack,partition的leader落盤成功後傳回ack,如果在follower同步成功之前leader故障,那麼将會丢失資料;
  • kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲
  • -1(all):producer等待broker的ack,partition的leader和follower(這個是ISR的follower)全部落盤成功後才傳回ack。但是如果在follower同步完成後,broker發送ack之前,leader發送故障,那麼會造成資料重複。
  • kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲

3.3.4、資料一緻性問題

kafka的一些簡單操作一、使用./kafka-topics.sh指令二、使用./kafka-console-producer.sh 指令三、kafka檔案存儲
  • LEO(Log End Offset):指的是每個副本最大的offset
  • HW(High Watermark):指的是消費者能見到的最大的offset,ISR隊列中最小的LEO。(保證消費者的資料一緻性問題)

1、follower故障

follower發生故障後會被臨時踢出ISR,待該follower恢複後,follower會讀取本地磁盤記錄的上次的HW,并将log檔案高于HW的部分截取掉,從HW開始向leader進行同步。等到follower的LEO大于等于該partition的HW,即follower追上leader之後,就可以重新加入ISR了。

2、leader故障

leader發生故障之後,會從ISR中選出一個新的leader之後,為保證多個副本之間的資料一緻性,其餘的follower會将各自的log檔案高于HW的部分截掉,然後從新的leader同步資料。

⚠️注意:這隻能保證副本之間的資料一緻性,并不能保證資料不丢失或者不重複。

Exactly Once語義

将伺服器的ack級别設定為-1,可以保證producer到server之間不會丢失資料,即at least once語義。相對的,将伺服器ack級别設定為0,可以保證生産者每條消息隻會被發送一次,即at most once語義。

at least once可以保證資料不丢失,但是不能保證資料不重複;相對的atleast once可以保證資料不重複,但是不能保證資料的不丢失。但是,對于一些非常重要的資訊,比如說交易資料,下遊資料消費者要求資料即不重複也不丢失,即exactly語義。在0.11版本之前的kafka,對此是無能為力的,隻能保證資料不丢失,再在下遊消費者對資料做全局去重。對于多個下遊應用的情況,每個都需要單獨做全局去重,這就對性能造成了很大的影響。

0.11版本的kafka,引入了一項重大特性:幂等性。所謂的幂等性就是指producer不論向server發送多少次重複資料,server端都隻會持久化一條。幂等性結合At Least Once 語義,就構成了Kafka的Exactly Once 語義。即:

At Least Once + 幂等性=exactly Once

要開啟幂等性,隻需要将producer的參數enable.idompotence設定為true即可。kafka的幂等性實作其實就是将原來下遊需要的去重放在了資料上遊。開啟幂等性的producer在初始化的時候會被配置設定一個PID,發往同一個partition的消息會附帶sequence Number。而Broker端會對<PID,partition ,SeqNumber>做緩存,當具有相同主鍵的消息送出時,Broker隻會持久化一條。

但是PID重新開機就會變化,同時不同的partition也具有不同主鍵,是以幂等性無法保證跨分區回話的Exactly Once。