天天看點

kafka基本原理

下載下傳安裝位址:http://kafka.apache.org/downloads.html

 原文連結:http://www.jasongj.com/2015/01/02/Kafka深度解析

Kafka主要術語直覺解釋

  • Broker

    Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker

  • Topic

    每條釋出到Kafka叢集的消息都有一個類别,這個類别被稱為topic。(實體上不同topic的消息分開存儲,邏輯上一個topic的消息雖然儲存于一個或多個broker上但使用者隻需指定消息的topic即可生産或消費資料而不必關心資料存于何處)

  • Partition

    parition是實體上的概念,每個topic包含一個或多個partition,建立topic時可指定parition數量。每個partition對應于一個檔案夾,該檔案夾下存儲該partition的資料和索引檔案

  • Producer

    負責釋出消息到Kafka broker

  • Consumer

    消費消息。每個consumer屬于一個特定的consumer group(可為每個consumer指定group name,若不指定group name則屬于預設的group)。使用consumer high level API時,同一topic的一條消息隻能被同一個consumer group内的一個consumer消費,但多個consumer group可同時消費這一消息。

Kafka架構

kafka基本原理

如上圖所示,一個典型的kafka叢集中包含若幹producer(可以是web前端産生的page view,或者是伺服器日志,系統CPU、memory等),若幹broker(Kafka支援水準擴充,一般broker數量越多,叢集吞吐率越高),若幹consumer group,以及一個Zookeeper叢集。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式将消息釋出到broker,consumer使用pull模式從broker訂閱并消費消息。   

Topic & Partition

  Topic在邏輯上可以被認為是一個queue。每條消費都必須指定它的topic,可以簡單了解為必須指明把這條消息放進哪個queue裡。為了使得Kafka的吞吐率可以水準擴充,實體上把topic分成一個或多個partition,每個partition在實體上對應一個檔案夾,該檔案夾下存儲這個partition的所有消息和索引檔案。

  

kafka基本原理

  每個日志檔案都是“log entries”序列,每一個

log entry

包含一個4位元組整型數(值為N),其後跟N個位元組的消息體。每條消息都有一個目前partition下唯一的64位元組的offset,它指明了這條消息的起始位置。磁盤上存儲的消息格式如下:

  message length : 4 bytes (value: 1+4+n)

  “magic” value : 1 byte

  crc : 4 bytes

  payload : n bytes

  這個“log entries”并非由一個檔案構成,而是分成多個segment,每個segment名為該segment第一條消息的offset和“.kafka”組成。另外會有一個索引檔案,它标明了每個segment下包含的

log entry

的offset範圍,如下圖所示。

kafka基本原理

  因為每條消息都被append到該partition中,是順序寫磁盤,是以效率非常高(經驗證,順序寫磁盤效率比随機寫記憶體還要高,這是Kafka高吞吐率的一個很重要的保證)。

kafka基本原理

  每一條消息被發送到broker時,會根據paritition規則選擇被存儲到哪一個partition。如果partition規則設定的合理,所有消息可以均勻分布到不同的partition裡,這樣就實作了水準擴充。(如果一個topic對應一個檔案,那這個檔案所在的機器I/O将會成為這個topic的性能瓶頸,而partition解決了這個問題)。在建立topic時可以在

$KAFKA_HOME/config/server.properties

中指定這個partition的數量(如下所示),當然也可以在topic建立之後去修改parition數量。

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3      

  在發送一條消息時,可以指定這條消息的key,producer根據這個key和partition機制來判斷将這條消息發送到哪個parition。paritition機制可以通過指定producer的paritition. class這一參數來指定,該class必須實作

kafka.producer.Partitioner

接口。本例中如果key可以被解析為整數則将對應的整數與partition總數取餘,該消息會被發送到該數對應的partition。(每個parition都會有個序号) 

import kafka.utils.VerifiableProperties;
public class JasonPartitioner<T> implements Partitioner {
    public JasonPartitioner(VerifiableProperties verifiableProperties) {}
   
    @Override
    public int partition(Object key, int numPartitions) {
        try {
            int partitionNum = Integer.parseInt((String) key);
            return Math.abs(Integer.parseInt((String) key) % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}
如果将上例中的class作為partition.class,并通過如下代碼發送20條消息(key分别為0,1,2,3)至topic2(包含4個partition)。
public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
        List messageList = new ArrayList<KeyedMessage<String, String>>();
        for(int j = 0; j < 4; j++){
            messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
        }
        producer.send(messageList);
    }
  producer.close();
}
 則key相同的消息會被發送并存儲到同一個partition裡,而且key的序号正好和partition序号相同。(partition序号從0開始,本例中的key也正好從0開始)。如下圖所示。       
kafka基本原理

  對于傳統的message queue而言,一般會删除已經被消費的消息,而Kafka叢集會保留所有的消息,無論其被消費與否。當然,因為磁盤限制,不可能永久保留所有資料(實際上也沒必要),是以Kafka提供兩種政策去删除舊資料。一是基于時間,二是基于partition檔案大小。例如可以通過配置

$KAFKA_HOME/config/server.properties

,讓Kafka删除一周前的資料,也可通過配置讓Kafka在partition檔案超過1GB時删除舊資料,如下所示。

  ############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to 
#just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs 
#can then be marked for log compaction.
log.cleaner.enable=false      

  這裡要注意,因為Kafka讀取特定消息的時間複雜度為O(1),即與檔案大小無關,是以這裡删除檔案與Kafka性能無關,選擇怎樣的删除政策隻與磁盤以及具體的需求有關。另外,Kafka會為每一個consumer group保留一些metadata資訊–目前消費的消息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條消息後線性增加這個offset。當然,consumer也可将offset設成一個較小的值,重新消費一些消息。因為offet由consumer控制,是以Kafka broker是無狀态的,它不需要标記哪些消息被哪些consumer過,不需要通過broker去保證同一個consumer group隻有一個consumer能消費某一條消息,是以也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。 

Replication & Leader election

  Kafka從0.8開始提供partition級别的replication,replication的數量可在

$KAFKA_HOME/config/server.properties

中配置。

default.replication.factor = 1      

  該 Replication與leader election配合提供了自動的failover機制。replication對Kafka的吞吐率是有一定影響的,但極大的增強了可用性。預設情況下,Kafka的replication數量為1。  每個partition都有一個唯一的leader,所有的讀寫操作都在leader上完成,leader批量從leader上pull資料。一般情況下partition的數量大于等于broker的數量,并且所有partition的leader均勻分布在broker上。follower上的日志和其leader上的完全一樣。 

  和大部分分布式系統一樣,Kakfa處理失敗需要明确定義一個broker是否alive。

  對于Kafka而言,Kafka存活包含兩個條件,

    一是它必須維護與Zookeeper的session(這個通過Zookeeper的heartbeat機制來實作)。

    二是follower必須能夠及時将leader的writing複制過來,不能“落後太多”。

leader會track“in sync”的node list。如果一個follower當機,或者落後太多,leader将把它從”in sync” list中移除。這裡所描述的“落後太多”指follower複制的消息落後于leader後的條數超過預定值,該值可在

$KAFKA_HOME/config/server.properties

中配置

#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead
replica.lag.max.messages=4000

#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead
replica.lag.time.max.ms=10000      

  需要說明的是,Kafka隻解決”fail/recover”,不處理“Byzantine”(“拜占庭”)問題。 

  一條消息隻有被“in sync” list裡的所有follower都從leader複制過去才會被認為已送出。這樣就避免了部分資料被寫進了leader,還沒來得及被任何follower複制就當機了,而造成資料丢失(consumer無法消費這些資料)。而對于producer而言,它可以選擇是否等待消息commit,這可以通過

request.required.acks

來設定。這種機制確定了隻要“in sync” list有一個或以上的flollower,一條被commit的消息就不會丢失。

  這裡的複制機制即不是同步複制,也不是單純的異步複制。事實上,同步複制要求“活着的”follower都複制完,這條消息才會被認為commit,這種複制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而異步複制方式下,follower異步的從leader複制資料,資料隻要被leader寫入log就被認為已經commit,這種情況下如果follwer都落後于leader,而leader突然當機,則會丢失資料。而Kafka的這種使用“in sync” list的方式則很好的均衡了確定資料不丢失以及吞吐率。follower可以批量的從leader複制資料,這樣極大的提高複制性能(批量寫磁盤),極大減少了follower與leader的差距(前文有說到,隻要follower落後leader不太遠,則被認為在“in sync” list裡)。

  上文說明了Kafka是如何做replication的,另外一個很重要的問題是當leader當機了,怎樣在follower中選舉出新的leader。因為follower可能落後許多或者crash了,是以必須確定選擇“最新”的follower作為新的leader。一個基本的原則就是,如果leader不在了,新的leader必須擁有原來的leader commit的所有消息。這就需要作一個折衷,如果leader在标明一條消息被commit前等待更多的follower确認,那在它die之後就有更多的follower可以作為新的leader,但這也會造成吞吐率的下降。

  一種非常常用的選舉leader的方式是“majority vote”(“少數服從多數”),但Kafka并未采用這種方式。這種模式下,如果我們有2f+1個replica(包含leader和follower),那在commit之前必須保證有f+1個replica複制完消息,為了保證正确選出新的leader,fail的replica不能超過f個。因為在剩下的任意f+1個replica裡,至少有一個replica包含有最新的所有消息。這種方式有個很大的優勢,系統的latency隻取決于最快的幾台server,也就是說,如果replication factor是3,那latency就取決于最快的那個follower而非最慢那個。majority vote也有一些劣勢,為了保證leader election的正常進行,它所能容忍的fail的follower個數比較少。如果要容忍1個follower挂掉,必須要有3個以上的replica,如果要容忍2個follower挂掉,必須要有5個以上的replica。也就是說,在生産環境下為了保證較高的容錯程度,必須要有大量的replica,而大量的replica又會在大資料量下導緻性能的急劇下降。這就是這種算法更多用在Zookeeper這種共享叢集配置的系統中而很少在需要存儲大量資料的系統中使用的原因。例如HDFS的HA feature是基于majority-vote-based journal,但是它的資料存儲并沒有使用這種expensive的方式。

  實際上,leader election算法非常多,比如Zookeper的Zab, Raft和Viewstamped Replication。而Kafka所使用的leader election算法更像微軟的PacificA算法。

  Kafka在Zookeeper中動态維護了一個ISR(in-sync replicas) set,這個set裡的所有replica都跟上了leader,隻有ISR裡的成員才有被選為leader的可能。在這種模式下,對于f+1個replica,一個Kafka topic能在保證不丢失已經ommit的消息的前提下容忍f個replica的失敗。在大多數使用場景中,這種模式是非常有利的。事實上,為了容忍f個replica的失敗,majority vote和ISR在commit前需要等待的replica數量是一樣的,但是ISR需要的總的replica的個數幾乎是majority vote的一半。

  雖然majority vote與ISR相比有不需等待最慢的server這一優勢,但是Kafka作者認為Kafka可以通過producer選擇是否被commit阻塞來改善這一問題,并且節省下來的replica和磁盤使得ISR模式仍然值得。

  上文提到,在ISR中至少有一個follower時,Kafka可以確定已經commit的資料不丢失,但如果某一個partition的所有replica都挂了,就無法保證資料不丢失了。這種情況下有兩種可行的方案:

  • 等待ISR中的任一個replica“活”過來,并且選它作為leader
  • 選擇第一個“活”過來的replica(不一定是ISR中的)作為leader

  這就需要在可用性和一緻性當中作出一個簡單的平衡。如果一定要等待ISR中的replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有replica都無法“活”過來了,或者資料都丢失了,這個partition将永遠不可用。選擇第一個“活”過來的replica作為leader,而這個replica不是ISR中的replica,那即使它并不保證已經包含了所有已commit的消息,它也會成為leader而作為consumer的資料源(前文有說明,所有讀寫都由leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在以後的版本中,Kafka支援使用者通過配置選擇這兩種方式中的一種,進而根據不同的使用場景選擇高可用性還是強一緻性。

  上文說明了一個parition的replication過程,然爾Kafka叢集需要管理成百上千個partition,Kafka通過round-robin的方式來平衡partition進而避免大量partition集中在了少數幾個節點上。同時Kafka也需要平衡leader的分布,盡可能的讓所有partition的leader均勻分布在不同broker上。另一方面,優化leadership election的過程也是很重要的,畢竟這段時間相應的partition處于不可用狀态。一種簡單的實作是暫停當機的broker上的所有partition,并為之選舉leader。實際上,Kafka選舉一個broker作為controller,這個controller通過watch Zookeeper檢測所有的broker failure,并負責為所有受影響的parition選舉leader,再将相應的leader調整指令發送至受影響的broker,過程如下圖所示。

kafka基本原理

  這樣做的好處是,可以批量的通知leadership的變化,進而使得選舉過程成本更低,尤其對大量的partition而言。如果controller失敗了,幸存的所有broker都會嘗試在Zookeeper中建立/controller->{this broker id},如果建立成功(隻可能有一個建立成功),則該broker會成為controller,若建立不成功,則該broker會等待新controller的指令。  

kafka基本原理

Consumer group

  (本節所有描述都是基于consumer hight level API而非low level API)。

  每一個consumer執行個體都屬于一個consumer group,每一條消息隻會被同一個consumer group裡的一個consumer執行個體消費。(不同consumer group可以同時消費同一條消息)

kafka基本原理

  很多傳統的message queue都會在消息被消費完後将消息删除,一方面避免重複消費,另一方面可以保證queue的長度比較少,提高效率。而如上文所将,Kafka并不删除已消費的消息,為了實作傳統message queue消息隻被消費一次的語義,Kafka保證保證同一個consumer group裡隻有一個consumer會消費一條消息。與傳統message queue不同的是,Kafka還允許不同consumer group同時消費同一條消息,這一特性可以為消息的多元化處理提供了支援。實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時線上處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時将資料實時備份到另一個資料中心,隻需要保證這三個操作所使用的consumer在不同的consumer group即可。下圖展示了Kafka在Linkedin的一種簡化部署。

kafka基本原理

  為了更清晰展示Kafka consumer group的特性,筆者作了一項測試。建立一個topic (名為topic1),建立一個屬于group1的consumer執行個體,并建立三個屬于group2的consumer執行個體,然後通過producer向topic1發送key分别為1,2,3r的消息。結果發現屬于group1的consumer收到了所有的這三條消息,同時group2中的3個consumer分别收到了key為1,2,3的消息。如下圖所示。

kafka基本原理

Consumer Rebalance

  (本節所講述内容均基于Kafka consumer high level API)

  Kafka保證同一consumer group中隻有一個consumer會消費某條消息,實際上,Kafka保證的是穩定狀态下每一個consumer執行個體隻會消費某一個或多個特定partition的資料,而某個partition的資料隻會被某一個特定的consumer執行個體所消費。這樣設計的劣勢是無法讓同一個consumer group裡的consumer均勻消費資料,優勢是每個consumer不用都跟大量的broker通信,減少通信開銷,同時也降低了配置設定難度,實作也更簡單。另外,因為同一個partition裡的資料是有序的,這種設計可以保證每個partition裡的資料也是有序被消費。

  如果某consumer group中consumer數量少于partition數量,則至少有一個consumer會消費多個partition的資料,如果consumer的數量與partition數量相同,則正好一個consumer消費一個partition的資料,而如果consumer的數量多于partition的數量時,會有部分consumer無法消費該topic下任何一條消息。

  如下例所示,如果topic1有0,1,2共三個partition,當group1隻有一個consumer(名為consumer1)時,該 consumer可消費這3個partition的所有資料。

kafka基本原理

  增加一個consumer(consumer2)後,其中一個consumer(consumer1)可消費2個partition的資料,另外一個consumer(consumer2)可消費另外一個partition的資料。

kafka基本原理

  再增加一個consumer(consumer3)後,每個consumer可消費一個partition的資料。consumer1消費partition0,consumer2消費partition1,consumer3消費partition2

kafka基本原理

  再增加一個consumer(consumer4)後,其中3個consumer可分别消費一個partition的資料,另外一個consumer(consumer4)不能消費topic1任何資料。

kafka基本原理

  此時關閉consumer1,剩下的consumer可分别消費一個partition的資料。

kafka基本原理

  接着關閉consumer2,剩下的consumer3可消費2個partition,consumer4可消費1個partition。

kafka基本原理

  再關閉consumer3,剩下的consumer4可同時消費topic1的3個partition。

kafka基本原理

  consumer rebalance算法如下:   

  • Sort PT (all partitions in topic T)
  • Sort CG(all consumers in consumer group G)
  • Let i be the index position of Ci in CG and let N=size(PT)/size(CG)
  • Remove current entries owned by Ci from the partition owner registry
  • Assign partitions from iN to (i+1)N-1 to consumer Ci
  • Add newly assigned partitions to the partition owner registry

  目前consumer rebalance的控制政策是由每一個consumer通過Zookeeper完成的。具體的控制方式如下:

  • Register itself in the consumer id registry under its group.
  • Register a watch on changes under the consumer id registry.
  • Register a watch on changes under the broker id registry.
  • If the consumer creates a message stream using a topic filter, it also registers a watch on changes under the broker topic registry.
  • Force itself to rebalance within in its consumer group.

      在這種政策下,每一個consumer或者broker的增加或者減少都會觸發consumer rebalance。因為每個consumer隻負責調整自己所消費的partition,為了保證整個consumer group的一緻性,是以當一個consumer觸發了rebalance時,該consumer group内的其它所有consumer也應該同時觸發rebalance。

  目前(2015-01-19)最新版(0.8.2)Kafka采用的是上述方式。但該方式有不利的方面:

  • Herd effect

      任何broker或者consumer的增減都會觸發所有的consumer的rebalance

  • Split Brain

      每個consumer分别單獨通過Zookeeper判斷哪些partition down了,那麼不同consumer從Zookeeper“看”到的view就可能不一樣,這就會造成錯誤的reblance嘗試。而且有可能所有的consumer都認為rebalance已經完成了,但實際上可能并非如此。

  根據Kafka官方文檔,Kafka作者正在考慮在還未釋出的0.9.x版本中使用中心協調器(coordinator)。大體思想是選舉出一個broker作為coordinator,由它watch Zookeeper,進而判斷是否有partition或者consumer的增減,然後生成rebalance指令,并檢查是否這些rebalance在所有相關的consumer中被執行成功,如果不成功則重試,若成功則認為此次rebalance成功(這個過程跟replication controller非常類似,是以我很奇怪為什麼當初設計replication controller時沒有使用類似方式來解決consumer rebalance的問題)。

消息Deliver guarantee

  通過上文介紹,想必讀者已經明白了producer和consumer是如何工作的,以及Kafka是如何做replication的,接下來要讨論的是Kafka如何確定消息在producer和consumer之間傳輸。有這麼幾種可能的delivery guarantee:

  • At most once

     消息可能會丢,但絕不會重複傳輸
  • At least one

     消息絕不會丢,但可能會重複傳輸
  • Exactly once

     每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是使用者所想要的。 
    • Kafka的delivery guarantee semantic非常直接。當producer向broker發送消息時,一旦這條消息被commit,因為replication的存在,它就不會丢。
    • 但是如果producer發送資料給broker後,遇到的網絡問題而造成通信中斷,那producer就無法判斷該條消息是否已經commit。這一點有點像向一個自動生成primary key的資料庫表中插入資料。雖然Kafka無法确定網絡故障期間發生了什麼,但是producer可以生成一種類似于primary key的東西,發生故障時幂等性的retry多次,這樣就做到了

      Exactly one

      。截止到目前(Kafka 0.8.2版本,2015-01-25),這一feature還并未實作,有希望在Kafka未來的版本中實作。(是以目前預設情況下一條消息從producer和broker是確定了

      At least once

      ,但可通過設定producer異步發送實作

      At most once

      )。

      接下來讨論的是消息從broker到consumer的delivery guarantee semantic。(僅針對Kafka consumer high level API)。consumer在從broker讀取消息後,可以選擇commit,該操作會在Zookeeper中存下該consumer在該partition下讀取的消息的offset。該consumer下一次再讀該partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之後的開始位置相同。當然可以将consumer設定為autocommit,即consumer一旦讀到資料立即自動commit。如果隻讨論這一讀取消息的過程,那Kafka是確定了

      Exactly once

      。但實際上實際使用中consumer并非讀取完資料就結束了,而是要進行進一步處理,而資料處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。
    • 讀完消息先commit再處理消息。這種模式下,如果consumer在commit後還沒來得及處理消息就crash了,下次重新開始工作後就無法讀到剛剛已送出而未處理的消息,這就對應于

      At most once

    • 讀完消息先處理再commit。這種模式下,如果處理完了消息在commit之前consumer crash了,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經被處理過了。這就對應于

      At least once

      。在很多情況使用場景下,消息都有一個primary key,是以消息的處理往往具有幂等性,即多次處理這一條消息跟隻處理一次是等效的,那就可以認為是

      Exactly once

      。(人個感覺這種說法有些牽強,畢竟它不是Kafka本身提供的機制,而且primary key本身不保證操作的幂等性。而且實際上我們說delivery guarantee semantic是讨論被處理多少次,而非處理結果怎樣,因為處理方式多種多樣,我們的系統不應該把處理過程的特性–如是否幂等性,當成Kafka本身的feature)
    • 如果一定要做到

      Exactly once

      ,就需要協調offset和實際操作的輸出。精典的做法是引入兩階段送出。如果能讓offset和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支援兩階段送出。比如,consumer拿到資料後可能把資料放到HDFS,如果把最新的offset和資料本身一起寫到HDFS,那就可以保證資料的輸出和offset的更新要麼都完成,要麼都不完成,間接實作

      Exactly once

      。(目前就high level API而言,offset是存于Zookeeper中的,無法存于HDFS,而low level API的offset是由自己去維護的,可以将之存于HDFS中)

        總之,Kafka預設保證

      At least once

      ,并且允許通過設定producer異步送出來實作

      At most once

      。而

      Exactly once

      要求與目标存儲系統協作,幸運的是Kafka提供的offset可以使用這種方式非常直接非常容易。