天天看點

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

最近做的項目,通過資料庫的log日志将資料庫某些千萬量級的表(這些表需要聯表查詢)資料同步到elasticsearch中,以減輕資料庫的查詢壓力,其中以kafka作為消息中間件,以下是做該項目過程中對kafka的一些整理。

一、中間件

中間件,用于業務對于資料的時效性要求并不是特别高,有削峰填谷、解耦之功效。特别是中間件可以實作發送端和消費端的解耦,讓消息的發送端異步發送消息,并迅速傳回,可以極大提高系統整體的性能。中間件本質,就是發送、存儲、消費,其實可以自己封裝一個中間件,比如用并發包中的ArrayBlockingQueue,LinkedBlockingQueue,用put(T t)寫入資訊,用take(T t)拉取資訊,這兩個方法都有阻塞功能。FIFO隊列,其中LinkedBlockingQueue效率更高。

選取中間件時,根據業務對于功能、性能、可靠性、可用性、生态性的需要,選用流行中間件,可以借助搜尋引擎很快解決Bug,并且流行中間件的版本更疊及時,可以更快的擷取更高的性能,和更多的功能。

二、kafka基本介紹

1、基本概念

需要了解producer,consumer,groupId,broker,topic,partition,segment的概念,如下圖。

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

2、版本名

kafka_2.10-0.8.2.jar,2.10是指Scala版本,0.8.2是指kafka版本。

3、核心功能

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。
  • Producer API允許程式釋出資料流到一個到多個Kafka topic。
  • Consumer API允許程式訂閱一個到多個topic,并且進行消費。
  • Streams API允許程式作為一個資料流處理,将一個或多個topic中輸入的資料進行消費,并生産資料流到一個或多個topics中。
  • Connector API,可以通過Connector管理Kafka和另一個系統之間的資料複制,比如去捕獲關系型資料庫中的任意改變到一個表中。

4、topic介紹

topic(不同的業務資料,分流到不同的topic進行處理)

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

                                   \\

                                     \\

topic是基于zk建立的,實 \\ 際上同一topic下的partition是按如下分布在各個伺服器上的(可以設定replicas的個數,此圖partition黑色為leader,紅色為 \\ 副本folower)。

                                         \    \\    /

                                              \||/

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

kafka對與zookeeper是強依賴的,是以zookeeper作為基礎的,即使不做叢集,也需要zk的支援。以下是kafka中必須要填寫的配置檔案,id為在zk中注冊的brokerid,後者為要注冊到的zookeeper的host和port。

broker.id=0
zookeeper.connect=localhost:2181
           

zk說白了,就是一個節點服務系統,至于用這個節點做什麼,做單活、開關鎖還是做檢測伺服器存活狀态,都是業務代碼根據這個節點做的一些邏輯處理。以下是kafka預設在zk中的節點層級結構:

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

5、partition介紹

partion可以看作一個有序的隊列,裡面的資料是儲存在硬碟中的,追加式的。partition的作用就是提供分布式的擴充,一個topic可以有許多partions,多個partition可以并行處理資料,是以可以處理相當量的資料。隻有partition的leader才會進行讀寫操作,folower僅進行複制,用戶端是感覺不到的。下圖把kafka叢集看成一個kakfa服務,僅顯示leader。

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

1)offset概念

每一條資料都有一個offset,是每一條資料在該partition中的唯一辨別。各個consumer控制和設定其在該partition下消費到offset位置,這樣下次可以以該offset位置開始進行消費。

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

各個consumer的offset位置預設是在某一個broker當中的topic中儲存的(為防止該broker宕掉無法擷取offset資訊,可以配置在每個broker中都進行儲存,配置檔案中配置)

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
           

2)replicas的同步時機。

假如有N個replicas,其中一個replica為leader,其他都為follower,leader處理partition的所有讀寫請求,于此同時,follower會被動定期的去複制leader上的資料。

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

3)ISR介紹

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

leader會追蹤和維護ISR中所有follower的滞後狀态。如果滞後太多(數量滞後和時間滞後兩個次元,replica.lag.time.max.ms和replica.lag.max.message可配置),leader會把該replica從ISR中移除。被移除ISR的replica一直在追趕leader。如下圖,leader寫入資料後并不會commit,隻有ISR清單中的所有folower同步之後才會commit,把滞後的follower移除ISR主要是避免寫消息延遲。設定ISR主要是為了broker宕掉之後,重新選舉partition的leader從ISR清單中選擇。

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

滞後情況:新增副本,GC挂起,follower失效,I/O瓶頸。

6、producer介紹

send(String topic, Integer partition, Long timestamp, K key, V data)
           
  • producer在發送消息的時候,必須指定topic和data,可以選擇指定partion、key、timestamp,其中時間戳有兩種方式,CreateTime和LogAppendTime,前者是用戶端設定時間,後者是broker在消息寫入log時設定的時間。如果為null,用的是System.currentTimeMillis()。如果同時不指定partition和key,那麼就用round-bin決定發送到哪個partition。
  • 用戶端會定時的取Broker的topic、partition、replicas等中繼資料資訊,producer持有kafka節點的metadata資訊,通過該資訊建立ProducerPool,每次發送資訊會根據要發送哪個Partition,來選擇相應的Producer執行個體,Rpc連接配接。

7、consumer介紹

以下針對springBoot內建的kafka

@KafkaListener(topics = {"cache-music-user"},groupId="zwhUser",containerFactory = "batchAbleFactory")

public void consumeBatch(List<ConsumerRecord<String,String>> recordList, Acknowledgment acknowledgment) throws InterruptedException {

        ...

        方法體

        ...

}
           
  • consumer如何知道自己應該拉取哪一個partition。cordinator(某一個Kafka的broker)在配置設定consumer的時候,會選舉consumer leader,後者配置設定每一個consumer要連接配接的broker,topic,partition,然後上報cordinator。然後consumer會根據自己被配置設定的partion去拉取資料。
  • 批量讀取和單資料讀取,ack機制。
  • 如果poll()時間逾時,那麼broker會認為consumer挂掉了,會踢掉該consumer。cordinator重新配置設定consumer。有時逾時會抛異常,不過也會重新配置設定consumer。
  • consumer的groupId機制。對于一個groupId中的consumer來說,一個partition隻能由一個consumer來消費。即不可能多個consumer消費1個partition。如下:
  • kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

consumer可以在不同的機器中。

三、延伸

1、kafka重新選舉

  • KafkaController的作用。Kafka叢集中多個broker,有一個會被選舉為controller leader,負責管理整個叢集中分區和副本的狀态,比如partition的leader 副本故障,由controller 負責為該partition重新選舉新的leader 副本;當檢測到ISR清單發生變化,有controller通知叢集中所有broker更新其MetadataCache資訊;或者增加某個topic分區的時候也會由controller管理分區的重新配置設定工作
  • KafkaController建立節點的方式去選舉,作為leader,任何follower挂了,zk會感覺到并通過Controller注冊的Wather去通知Controller去重新選舉。而leader挂了,zk會感覺到,會通過Wather機制通知每一個broker去競争Master。而ReplicaManager每個broker都有,是接受Contrloller的請求,對本服務上的partition進行管理的。

2、效率高的原因

因為kafka的資料都是存儲在硬碟中,甚至有的公司将kafka其作為資料庫使用,既然資料是基于硬碟的,那麼為何kafka還是能夠擁有如此高的吞吐量呢?

1)硬碟的索引功能。二分查找法。

分區:找到響應的分區

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

分段:根據檔案segment的命名可以确認要查找的offset或timestamp在哪個檔案中。

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

稀疏索引:快速确定要找的offset在哪個記憶體位址的附近。

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

2)I/O優化

普通程式I/O需要把Disk中的資訊複制到系統環境記憶體(步驟1),再複制到kafka應用環境記憶體(步驟2),然後步驟3,步驟4到Socket通過網絡發出,重複複制文本,I/O消耗大。

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

kafka則不一樣:

kafka基本原理介紹,以及重新選舉,replica複制機制,isr等。

3、kafka和rabbitMq的對比。

kafka是一種高吞吐量的分布式釋出訂閱消息系統。和rabbitMq各占半臂江山。以下是對比:

kafka rabbitMq
書寫語言 Java和Scala Erlang
消息協定 自定義通信協定 AMQP/MQTT/STOMP等協定
消息過濾 topic和partition進行 交換機路由
消息堆積 磁盤式堆積 記憶體式堆積
消息傳遞模式 典型的Pub/Sub模式 典型的P2P模式
消費模式 典型的Pull Push+Pull
消息回朔 通過offset和timestamp 消費即删除
流量控制 對producer和consumer進行主動設定 Credit—Based算法,作用producer
消息順序 支援單分區級别的順序性 單線程發送、消費
QPs 單機維持數十萬,甚至達到百萬 單機萬級别

性能來說,kafka的吞吐量較大。kafka易于向外擴充,所有的producer、broker、consumer無需停機都可以即刻擴充機器。資訊将全部的資訊持久化到硬碟上,生産和消費互不影響,很靈活。功能來說,kafka适用于日志,事實上kafka本身就是LinkIn公司開發用于日志系統的,是以其檔案叫做log。使用者跟蹤管道,對使用者網頁行為的記錄跟蹤,用于離線資料分析或者做報表。大資料分析。