天天看點

kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

Kafka簡單介紹

Kafka是由Apache軟體基金會開發的一個分布式、分區的、多副本的、多訂閱者的開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式釋出訂閱消息系統,它可以處理消費者在網站中的所有動作流資料。 這種動作(網頁浏覽,搜尋和其他使用者的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些資料通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對于像Hadoop一樣的日志資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的并行加載機制來統一線上和離線的消息處理,也是為了通過叢集來提供實時的消息。

主要特性

  • 通過O(1)的磁盤資料結構提供消息的持久化,這種結構對于即使數以TB的消息存儲也能夠保持長時間的穩定性能。
  • 高吞吐量 :即使是非常普通的硬體Kafka也可以支援每秒數百萬的消息。
  • 支援Hadoop并行資料加載。
  • 釋出和訂閱消息流(類似于消息隊列或者企業級的消息系統)
  • 以容錯的、持久的方式存儲消息流
  • 當消息流到來的時候,處理消息
  • 支援Kafka Server間的消息分區,及分布式消費,同時保證每個partition内的消息順序傳輸
  • 同時支援離線資料處理和實時資料處理

應用場景

  • 資料推送
  • 作為大緩沖區使用
  • 日志收集(scribe或者nginx)
  • 服務中間件

kafka架構

kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

Broker

代理,用來存儲消息,Kafka叢集中的每一個伺服器都是一個代理(Broker),消費者将從broker拉取訂閱的消息

Producer

向Kafa發送消息的程序,生産者會根據topic分布消息。生産者也負責把消息關聯到Topic上的哪一個分區。最簡單的方式從分區清單中輪流選擇。也可以根據某種算法依照權重選擇分區。算法由開發者定義。

Cousumer

Consermer執行個體可以是獨立的程序,負責訂閱和消費消息。消費者用consumerGroup來辨別自己。同一個消費組可以并發地消費多個分區的消息,同一個partition也可以由多個consumerGroup并發消費,但是在consumerGroup中一個partition隻能由一個consumer消費

CousumerGroup

Consumer Group:同一個Consumer Group中的Consumers,Kafka将相應Topic中的每個消息隻發送給其中一個Consumer

主題、分區和副本(Topic,Partition,replication)

對于每個topic,Kafka都用分區的的方法維護(如下圖)

kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

Topic

消息的類别。Kafka中可以将Topic從實體上劃分成一個或多個分區(Partition),每個分區在實體上對應一個檔案夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個分區的所有消息(.log)和索引檔案(.index),這使得Kafka的吞吐率可以水準擴充。

producer在釋出消息的時候,可以為每條消息指定Key,這樣消息被發送到broker時,會根據分區算法把消息存儲到對應的分區中(一個分區存儲多個消息),如果分區規則設定的合理,那麼所有的消息将會被均勻的分布到不同的分區中,這樣就實作了負載均衡。

事件被組織并持久地存儲在Topic中,Topic類似于檔案系統中的檔案夾,事件就是該檔案夾中的檔案。Kafka中的Topic始終是多生産者和多訂閱者:一個Topic可以有零個、一個或多個生産者向其寫入事件,也可以有零個、一個或多個消費者訂閱這些事件。Topic中的事件可以根據需要随時讀取,與傳統的消息中間件不同,事件在使用後不會被删除,相反,可以通過每個Topic的配置來定義Kafka應該保留事件的時間,之後舊事件将被丢棄。Kafka的性能在資料大小方面實際上是恒定的,是以長時間存儲資料是非常好的。

Partition

對Topic中的消息做水準切分,每塊稱為一個Partition。這意味着一個Topic可以分布在多個Kafka節點上。每個分區都是一個 順序的、不可變的消息隊列, 并且可以持續的添加;分區中的消息都被分了一個序列号,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

資料的這種分布式放置對于可伸縮性非常重要,因為它允許用戶端應用程式同時從Kafka節點讀取和寫入資料。将新事件釋出到Topic時,它實際上會appended到Topic的一個Partition中。具有相同僚件key的事件将寫入同一Partition,Kafka保證給定Topic的Partition的任何使用者都将始終以與寫入時完全相同的順序讀取該分區的事件。

Replication

為了使資料具有容錯性和高可用性,每個Topic都可以有多個Replication,以便始終有多個Kafka節點具有資料副本,以防出現問題。常見的生産設定是replicationFactor為3,即始終有三份資料副本(包括一份原始資料)。此Replication在Topic的Partition級别執行。

Kafka在指定數量(通過replicationFactor)的伺服器上複制每個Topic的Partition,這允許在叢集中的某些伺服器發生故障時進行自動故障轉移,以便在出現故障時服務仍然可用。Replication的機關是Topic的Partition。在非故障條件下,Kafka中的每個Partition都有一個leader和零個或多個follower。replicationFactor是複制副本(包括leader)的總數。所有讀和寫操作都将轉到Partition的leader上。通常,有比Kafka節點多得多的Partition,并且這些Partition的leader在Kafka節點之間均勻分布。follower上的資料需要與leader的資料同步,所有資料都具有相同的偏移量和順序(當然,在任何給定時間,leader的資料末尾可能有一些尚未複制的資料)。follower會像普通Kafka消費者一樣使用來自leader的消息,并将其應用到自己的資料中。如下圖所示,三個Kafka節點上有兩個Topic(Topic 0和Topic 1),Topic 0有兩個Partition并且replicationFactor為3(紅色的Partition為leader),Topic 1有三個Partition,replicationFactor也為3(紅色的Partition為leader)。

kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

消費序列

無論消息是否被消費了,Kafka叢集都會儲存所有的消息,直到它們過期 。比如我們配置一個消息的有效期為兩天,那麼消息釋出的兩天内,consumer可以消費消息,而超出時間後,消息将被丢棄以釋放空間。那麼Kafa怎麼知道消費到哪兒了呢?每個消費者都維護着一個中繼資料(偏移量,消費者的消費進度)。這個offset由消費者控制:當consumer消費消息的時候,偏移量也線性地增加。但是偏移量由消費者控制,是以消費者能夠把偏移量重置為一個更老的,以便重新讀取。此外,由于偏移量由消費者控制offset, 一個消費者的操作不會影響其它消費者對此log的處理。

這些特點使得Kafka消費者的成本很低,他們可以來取自如的消費消息,而且不影響其他消費者,這樣,我們可以用tail指令處理任何topic,而不需要改變這些消息。再說說分區。Kafka中采用分區的設計有以下好處。一是可以水準擴充,不受單台伺服器的限制;Topic擁有多個分區意味着它可以不受限的處理更多的資料。第二,分區可以作為并行處理的單元,稍後會談到這一點。

分布式

log的分區被分散的存儲在Kafka叢集中,每個伺服器隻處理配置設定到它機器上的partitions。此外,partation會根據配置備份到其他server以實作容錯性。每個分區,隻有一個server可以作為partation Leader,其他的都将作為follower。leader負責處理partation的讀寫請求,followers被動的同步資料,以便在leader挂掉的時候,更新為leader。需要知道的是,每個server都是一些分區的leader,同僚又是其他分區的flowwer,以更好地實作負載均衡,避免熱點問題。

異地同步

Kafka MirrorMaker為群集提供geo-replication支援。借助MirrorMaker,消息可以跨多個資料中心或雲區域進行複制、同步。 我們可以利用它在active/passive場景中備份和恢複; 或者在active/passive方案中将資料置于更接近使用者的位置,或資料本地化。

保證

  • 消息将會以被sent的順序添加到partition Log中
  • 消費者将先看到先加入到partition Log中的消息
  • 對于一個topic的消息,如果備份數為N,系統最多容許有N-1台失敗,而不丢失commit到log的消息

kafka的優點

Kafka作為一個消息系統與傳統的企業級消息系統相比,Kafka如何?

傳統的消息系統有兩種模型:

  • 隊列模型:一組消費者去消費server上的消息,但是一個消息隻能被一個consumer消費
  • 釋出-訂閱模型: 每個消息都會廣播到每個消費者,即每個消費者都會消費

隊列模型讓多個consumer瓜分消息并消費,這樣可以很好地水準擴充,提高性能,但和隊列模型不同的是,消息一旦被讀取就會被幹掉,是以有丢資料的風險;釋出訂閱模型會把每個消息都廣播給訂閱者,因為不友善做水準擴充。

Kafka同時具備隊列模型和釋出-訂閱模型,你不必糾結于消息系統模型的二選一

Kafka的 consumerGroup結合上這兩中模型:隊列模型-同一group中,一個messgae隻有一個consumer消費,消息多的時候,往group中添加consumer就好,容易擴充;釋出訂閱模型:同一個消息可以被廣播到不同的consumerGroup中。

kafka有更強的順序保證機制

傳統的消息隊列按序儲存消息,如果有多個消費者從隊列消費消息,server将會按照順序把消息移出隊列,不幸的是,即使server順序移出消息,但是消息到達consumers的過程是異步的,是以他們真正抵達消費者的順序與stored進隊列時不同,消息系統的解決方案是隻讓一個consumer去消費這個隊列,然而這又違背了并發。

這方面,Kafka做的更好,通過分區概念,在一個topic下,Kafka能同時滿足

  • 順序保證:同一個partition的消息隻能由group中的一個consumer消費
  • 負載均衡:由于有多個partition,是以仍然具有并發性

需要注意的是,group中的consumer數不能大于partition,否則會造成空閑

Kafka作為一個存儲系統

任何允許把釋出消息和消費消息過程解耦的消息隊列,本質上都充當了存儲系統,不同的是,Kafka是一個高性能的存儲系統:

  • 寫到kafka的資料會被寫到硬碟,并且備份,以實作容錯。Kafka允許生産者等待消息應答,直到完成備份,保證持久化了。
  • Kafka的硬碟結構可以水準擴充,無論你的server是50K還是50T,都可以很好的執行

由于重視存儲并且允許client去控制讀取位置,你還可以認為kafka是一種具備高性能,低延遲,送出日志存儲,複制,和傳播特性的分布式檔案系統

Kafka流處理

不隻是讀寫和存儲資料,Kafka的目标是能夠實時流處理。在kafka中,流處理持續擷取輸入topic的資料,進行處理加工,然後寫入輸出topic。例如,一個零售APP,接收銷售和出貨的輸入流,統計數量或調整價格後輸出。

批處理

把消息、存儲、流處理組合在一起似乎并不常見,但對于Kafka來說是非常有必要的,因為它的目标是做一個流平台。

類似于HDFS的分布式檔案系統允許存儲靜态檔案來進行批處理。這種系統能夠有效地存儲和處理曆史資料; 傳統的消息系統允許你處理訂閱topic之後才到達的topic消息。Kafka結合了這兩種能力,這種組合對于kafka作為流處理應用的平台和流資料管道的平台是至關重要的。

作為流式應用程式,Kafka通過組合存儲和低延遲訂閱,可用相同的方式處理過去和未來的資料。 這意味着一個單一的應用程式可以處理曆史記錄的資料,并且在到達最後一條記錄時不用結束,而是等待future data。

作為流資料管道,Kafka能夠訂閱實時事件,是以Kafka可作為低延遲的管道; 同時Kafka可靠存儲資料的特性使得它能夠存儲一些要求較高的資料(保證安全送達or系統因維護下線一段時間也不會丢資料)。

部署kafka服務

kafka需要依賴zookeeper做配置管理,leader選舉和服務發現,是以需要搭建zookeeper服務。

​Docker-compose部署單主機zookeeper叢集​​

拉取kafka可用鏡像

先搜尋有哪些kafka鏡像可以用

docker search kafka      
kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

我拉取Stars數目最高的鏡像

不指定版本号就是拉取最新版本鏡像。

docker pull wurstmeister/kafka      

拉取一個kafka管理器

docker pull sheepkiller/kafka-manager      

運作一個簡單的kafka容器

建立zookeeper檔案夾

mkdir -p /usr/local/zookeeper/zookeeper-1/data
mkdir -p /usr/local/zookeeper/zookeeper-1/datalog
mkdir -p /usr/local/zookeeper/zookeeper-1/logs
mkdir -p /usr/local/zookeeper/zookeeper-1/conf      

建立kafka檔案夾

mkdir -p /usr/local/kafka

mkdir -p /usr/local/kafka/kafka-1/logs
mkdir -p /usr/local/kafka/kafka-2/logs
mkdir -p /usr/local/kafka/kafka-3/logs      

賦予這個檔案夾的讀寫可執行權限(這個寫法比較暴力,實際工作不可取)

檔案所有者(Owner)、使用者組(Group)、其它使用者(Other Users)都賦予了最高權限

chmod -R 777 /usr/local/kafka
chmod -R 777 /usr/local/zookeeper      

先運作一個zookeeper

像​​Docker-compose部署單主機zookeeper叢集​​這篇部落格一樣,将一些配置檔案放到主控端的如下所示相應檔案夾下面(隻用放conf下面的哪些配置檔案就行了)

docker run -itd --name zookeeper-1 -p 2181:2181 \
-v /usr/local/zookeeper/zookeeper-1/data:/data \
-v /usr/local/zookeeper/zookeeper-1/datalog:/datalog \
-v /usr/local/zookeeper/zookeeper-1/logs:/logs \
-v /usr/local/zookeeper/zookeeper-1/conf:/conf \
 zookeeper      

再運作一個kafka

docker run -itd --name kafka-1 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=81.68.82.48:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://81.68.82.48:9092 -e KAFKA_LISTENERS=PLAINTEXT://localhost:9092 -t wurstmeister/kafka      
  • -e KAFKA_BROKER_ID=0 在kafka叢集中,每個kafka都有一個BROKER_ID來區分自己
  • -e KAFKA_ZOOKEEPER_CONNECT 配置zookeeper管理kafka的路徑
  • -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 把kafka的位址端口注冊給zookeeper
  • -e KAFKA_LISTENERS=PLAINTEXT 配置kafka的監聽端口

建立成功之後,可以使用docker exec 指令或者docker 可視化界面工具portainer進入容器檢視

kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

可以自己進入裡面檢視檔案結構以及檔案内容

重要配置檔案詳解

1.server.properties配置檔案

屬性 預設值 描述
​​broker.id​​ 必填參數,broker的唯一辨別
log.dirs /tmp/kafka-logs Kafka資料存放的目錄。可以指定多個目錄,中間用逗号分隔,當新partition被建立的時會被存放到目前存放partition最少的目錄。
port 9092 BrokerServer 接受用戶端連接配接的端口号
zookeeper.connect null Zookeeper的連接配接串,格式為:hostname1:port1,hostname2:port2,hostname3:port3。可以填一個或多個,為了提高可靠性,建議都填上。注意,此配置允許我們指定一個zookeeper路徑來存放此kafka叢集的所有資料,為了與其他應用叢集區分開,建議在此配置中指定本叢集存放目錄,格式為:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消費者的參數要和此參數一緻。
message.max.bytes 1000000 伺服器可以接收到的最大的消息大小。注意此參數要和consumer的maximum.message.size大小一緻,否則會因為生産者生産的消息太大導緻消費者無法消費。
num.io.threads 8 伺服器用來執行讀寫請求的IO線程數,此參數的數量至少要等于伺服器上磁盤的數量。
queued.max.requests 500 I/O線程可以處理請求的隊列大小,若實際請求數超過此大小,網絡線程将停止接收新的請求。
socket.send.buffer.bytes 100 * 1024 The SO_SNDBUFF buffer the server prefers for socket connections.
socket.receive.buffer.bytes 100 * 1024 The SO_RCVBUFF buffer the server prefers for socket connections.
socket.request.max.bytes 100 * 1024 * 1024 伺服器允許請求的最大值, 用來防止記憶體溢出,其值應該小于 Java heap size.
num.partitions 1 預設partition數量,如果topic在建立時沒有指定partition數量,預設使用此值,建議改為5
log.segment.bytes 1024 * 1024 * 1024 Segment檔案的大小,超過此值将會自動建立一個segment,此值可以被topic級别的參數覆寫。
log.roll.{ms,hours} 24 * 7 hours 建立segment檔案的時間,此值可以被topic級别的參數覆寫。
log.retention.{ms,minutes,hours} 7 days Kafka segment log的儲存周期,儲存周期超過此時間日志就會被删除。此參數可以被topic級别參數覆寫。資料量大時,建議減小此值。
log.retention.bytes -1 每個partition的最大容量,若資料量超過此值,partition資料将會被删除。注意這個參數控制的是每個partition而不是topic。此參數可以被log級别參數覆寫。
​​log.retention.check.interval.ms​​ 5 minutes 删除政策的檢查周期
auto.create.topics.enable true 自動建立topic參數,建議此值設定為false,嚴格控制topic管理,防止生産者錯寫topic。
default.replication.factor 1 預設副本數量,建議改為2。
​​replica.lag.time.max.ms​​ 10000 在此視窗時間内沒有收到follower的fetch請求,leader會将其從ISR(in-sync replicas)中移除。
replica.lag.max.messages 4000 如果replica節點落後leader節點此值大小的消息數量,leader節點就會将其從ISR中移除。
​​replica.socket.timeout.ms​​ 30 * 1000 replica向leader發送請求的逾時時間。
replica.socket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests to the leader for replicating data.
replica.fetch.max.bytes 1024 * 1024 The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.
​​replica.fetch.wait.max.ms​​ 500 The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.
num.replica.fetchers 1 Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.
fetch.purgatory.purge.interval.requests 1000 The purge interval (in number of requests) of the fetch request purgatory.
​​zookeeper.session.timeout.ms​​ 6000 ZooKeeper session 逾時時間。如果在此時間内server沒有向zookeeper發送心跳,zookeeper就會認為此節點已挂掉。 此值太低導緻節點容易被标記死亡;若太高,.會導緻太遲發現節點死亡。
​​zookeeper.connection.timeout.ms​​ 6000 用戶端連接配接zookeeper的逾時時間。
​​zookeeper.sync.time.ms​​ 2000 H ZK follower落後 ZK leader的時間。
controlled.shutdown.enable true 允許broker shutdown。如果啟用,broker在關閉自己之前會把它上面的所有leaders轉移到其它brokers上,建議啟用,增加叢集穩定性。
auto.leader.rebalance.enable true If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available.
leader.imbalance.per.broker.percentage 10 The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker.
leader.imbalance.check.interval.seconds 300 The frequency with which to check for leader imbalance.
offset.metadata.max.bytes 4096 The maximum amount of metadata to allow clients to save with their offsets.
​​connections.max.idle.ms​​ 600000 Idle connections timeout: the server socket processor threads close the connections that idle more than this.
num.recovery.threads.per.data.dir 1 The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
unclean.leader.election.enable true Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.
delete.topic.enable false 啟用deletetopic參數,建議設定為true。
offsets.topic.num.partitions 50 The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).
offsets.topic.retention.minutes 1440 Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.
​​offsets.retention.check.interval.ms​​ 600000 The frequency at which the offset manager checks for stale offsets.
offsets.topic.replication.factor 3 The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.
offsets.topic.segment.bytes 104857600 Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.
offsets.load.buffer.size 5242880 An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache.
offsets.commit.required.acks -1 The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden.
​​offsets.commit.timeout.ms​​ 5000 The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.

2.producer.properties配置檔案

屬性 預設值 描述
metadata.broker.list 啟動時producer查詢brokers的清單,可以是叢集中所有brokers的一個子集。注意,這個參數隻是用來擷取topic的元資訊用,producer會從元資訊中挑選合适的broker并與之建立socket連接配接。格式是:host1:port1,host2:port2。
request.required.acks 參見3.2節介紹
​​request.timeout.ms​​ 10000 Broker等待ack的逾時時間,若等待時間超過此值,會傳回用戶端錯誤資訊。
producer.type sync 同步異步模式。async表示異步,sync表示同步。如果設定成異步模式,可以允許生産者以batch的形式push資料,這樣會極大的提高broker性能,推薦設定為異步。
serializer.class kafka.serializer.DefaultEncoder 序列号類,.預設序列化成 byte[] 。
key.serializer.class Key的序列化類,預設同上。
partitioner.class kafka.producer.DefaultPartitioner Partition類,預設對key進行hash。
compression.codec none 指定producer消息的壓縮格式,可選參數為: “none”, “gzip” and “snappy”。關于壓縮參見4.1節
compressed.topics null 啟用壓縮的topic名稱。若上面參數選擇了一個壓縮格式,那麼壓縮僅對本參數指定的topic有效,若本參數為空,則對所有topic有效。
message.send.max.retries 3 Producer發送失敗時重試次數。若網絡出現問題,可能會導緻不斷重試。
​​retry.backoff.ms​​ 100 Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
​​topic.metadata.refresh.interval.ms​​ 600 * 1000 The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed
​​queue.buffering.max.ms​​ 5000 啟用異步模式時,producer緩存消息的時間。比如我們設定成1000時,它會緩存1秒的資料再一次發送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。
queue.buffering.max.messages 10000 采用異步模式時producer buffer 隊列裡最大緩存的消息數量,如果超過這個數值,producer就會阻塞或者丢掉消息。
​​queue.enqueue.timeout.ms​​ -1 當達到上面參數值時producer阻塞等待的時間。如果值設定為0,buffer隊列滿時producer不會阻塞,消息直接被丢掉。若值設定為-1,producer會被阻塞,不會丢消息。
batch.num.messages 200 采用異步模式時,一個batch緩存的消息數量。達到這個數量值時producer才會發送消息。
send.buffer.bytes 100 * 1024 Socket write buffer size
​​client.id​​ “” The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

3.consumer.properties配置檔案

屬性 預設值 描述
​​group.id​​ Consumer的組ID,相同goup.id的consumer屬于同一個組。
zookeeper.connect Consumer的zookeeper連接配接串,要和broker的配置一緻。
​​consumer.id​​ null 如果不設定會自動生成。
​​socket.timeout.ms​​ 30 * 1000 網絡請求的socket逾時時間。實際逾時時間由max.fetch.wait + ​​socket.timeout.ms​​ 确定。
vsocket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests.
fetch.message.max.bytes 1024 * 1024 查詢topic-partition時允許的最大消息大小。consumer會為每個partition緩存此大小的消息到記憶體,是以,這個參數可以控制consumer的記憶體使用量。這個值應該至少比server允許的最大消息大小大,以免producer發送的消息大于consumer允許的消息。
num.consumer.fetchers 1 The number fetcher threads used to fetch data.
auto.commit.enable true 如果此值設定為true,consumer會周期性的把目前消費的offset值儲存到zookeeper。當consumer失敗重新開機之後将會使用此值作為新開始消費的值。
​​auto.commit.interval.ms​​ 60 * 1000 Consumer送出offset值到zookeeper的周期。
queued.max.message.chunks 2 用來被consumer消費的message chunks 數量, 每個chunk可以緩存fetch.message.max.bytes大小的資料量。
​​auto.commit.interval.ms​​ 60 * 1000 Consumer送出offset值到zookeeper的周期。
queued.max.message.chunks 2 用來被consumer消費的message chunks 數量, 每個chunk可以緩存fetch.message.max.bytes大小的資料量。
fetch.min.bytes 1 The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
​​fetch.wait.max.ms​​ 100 The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch.min.bytes.
​​rebalance.backoff.ms​​ 2000 Backoff time between retries during rebalance.
​​refresh.leader.backoff.ms​​ 200 Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.
auto.offset.reset largest What to do when there is no initial offset in ZooKeeper or if an offset is out of range ;smallest : automatically reset the offset to the smallest offset; largest : automatically reset the offset to the largest offset;anything else: throw exception to the consumer
​​consumer.timeout.ms​​ -1 若在指定時間内沒有消息消費,consumer将會抛出異常。
exclude.internal.topics true Whether messages from internal topics (such as offsets) should be exposed to the consumer.
​​zookeeper.session.timeout.ms​​ 6000 ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
​​zookeeper.connection.timeout.ms​​ 6000 The max time that the client waits while establishing a connection to zookeeper.
​​zookeeper.sync.time.ms​​ 2000 How far a ZK follower can be behind a ZK leader

還有幾篇部落格也介紹的很清楚:

​Kafka的配置檔案較長的描述​​kafka 配置檔案參數詳解

Kafka最佳實踐配置項

服務端必要參數

  • zookeeper.connect:必配參數,建議在kafka叢集的每台執行個體都配置所有的zk節點
  • broker.id:必配參數。叢集節點的标示符,不得重複,取值範圍0~n
  • log.dirs:不要使用預設的“/tmp/kafka-logs”,因為/tmp目錄的性質沒法保證資料的持久性

服務端推薦參數

  • advertised.host.name:注冊到zk供使用者使用的主機名
  • advertised.port:注冊到zk供使用者使用的服務端口
  • num.partitions:建立topic時的預設partition數量,預設是1
  • default.replication.factor:自動建立topic的預設副本數量,建議至少修改為2
  • min.insync.replicasISR:送出生成者請求的最小副本數,建議至少2~3個
  • unclean.leader.election.enable:是否允許不具備ISR資格的replicas被選舉為leader,建議設定為否,除非能夠允許資料的丢失
  • controlled.shutdown.enable:在kafka收到stop指令或者異常終止時,允許自動同步資料,建議開啟

可動态調整的參數

  • unclean.leader.election.enable:不嚴格的leader選舉,有助于叢集健壯,但是存在資料丢失風險。
  • min.insync.replicas:如果同步狀态的副本小于該值,伺服器将不再接受request.required.acks為-1或all的寫入請求。
  • max.message.bytes:單條消息的最大長度。如果修改了該值,那麼replica.fetch.max.bytes和消費者的fetch.message.max.bytes也要跟着修改。
  • cleanup.policy:生命周期終結資料的處理,預設删除。
  • flush.messages:強制重新整理寫入的最大緩存消息數。
  • flush.ms:強制重新整理寫入的最大等待時長。

用戶端配置:

  • Producer用戶端:ack、壓縮、同步生産 vs 異步生産、批處理大小(異步生産)
  • Consumer用戶端方面主要考慮:partition數量及擷取消息的大小

Kafka 叢集相關知識

首先,我們需要了解Kafka叢集的一些機制:

  • Kafka是天然支援叢集的,哪怕是一個節點實際上也是叢集模式
  • Kafka叢集依賴于Zookeeper進行協調,并且在早期的Kafka版本中很多資料都是存放在Zookeeper的
  • Kafka節點隻要注冊到同一個Zookeeper上就代表它們是同一個叢集的
  • Kafka通過brokerId來區分叢集中的不同節點

Kafka的叢集拓撲圖如下:

kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

Kafka叢集中的幾個角色:

  • Broker:一般指Kafka的部署節點
  • Leader:用于處理消息的接收和消費等請求,也就是說producer是将消息push到leader,而consumer也是從leader上去poll消息
  • Follower:主要用于備份消息資料,一個leader會有多個follower

Kafka副本集

關于Kafka的副本集:

  • Kafka副本集是指将日志複制多份,我們知道Kafka的資料是存儲在日志檔案中的,這就相當于資料的備份、備援
  • Kafka可以通過配置設定預設的副本集數量
  • Kafka可以為每個Topic設定副本集,是以副本集是相對于Topic來說的

一個Topic的副本集可以分布在多個Broker中,當一個Broker挂掉了,其他的Broker上還有資料,這就提高了資料的可靠性,這也是副本集的主要作用。

我們都知道在Kafka中的Topic隻是個邏輯概念,實際存儲資料的是Partition,是以真正被複制的也是Partition。如下圖:

kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

關于副本因子:

  • 副本因子其實決定了一個Partition的副本數量,例如副本因子為1,則代表将Topic中的所有Partition按照Broker的數量複制一份,并分布到各個Broker上

副本配置設定算法如下:

  • 将所有N Broker和待配置設定的i個Partition排序
  • 将第i個Partition配置設定到第(i mod n)個Broker上
  • 将第i個Partition的第j個副本配置設定到第((i + j) mod n)個Broker上

Kafka節點故障原因及處理方式

Kafka節點(Broker)故障的兩種情況:

  • Kafka節點與Zookeeper心跳未保持視為節點故障
  • 當follower的消息落後于leader太多也會視為節點故障

Kafka對節點故障的處理方式:

  • Kafka會對故障節點進行移除,是以基本不會因為節點故障而丢失資料
  • Kafka的 語義擔保也很大程度上避免了資料丢失
  • Kafka會對消息進行叢集内平衡,減少消息在某些節點熱度過高

Kafka Leader選舉機制簡介

Kafka叢集之Leader選舉:

  • 如果有接觸過其他一些分布式元件就會了解到大部分元件都是通過投票選舉來在衆多節點中選舉出一個leader,但在Kafka中沒有采用投票選舉來選舉leader
  • Kafka會動态維護一組Leader資料的副本(ISR)
  • Kafka會在ISR中選擇一個速度比較快的設為leader

    “巧婦難為無米之炊”:Kafka有一種無奈的情況,就是ISR中副本全部當機。對于這種情況,Kafka預設會進行unclean leader選舉。Kafka提供了兩種不同的方式進行處理:

  • 等待ISR中任一Replica恢複,并選它為Leader

    等待時間較長,會降低可用性,或ISR中的所有Replica都無法恢複或者資料丢失,則該Partition将永不可用

  • 選擇第一個恢複的Replica為新的Leader,無論它是否在ISR中

    并未包含所有已被之前Leader Commit過的消息,是以會造成資料丢失,但可用性較高

Leader選舉配置建議:

  • 禁用unclean leader選舉
  • 手動設定最小ISR

docker-compose搭建kafka

不了解docker-compose的話可以簡單看看這篇

​Docker-compose 安裝與基本使用​​ 我這裡就把日志的存儲挂載出來

如後續搭建叢集運作遇到錯誤,多半是空格縮進的原因

version: '3.9'
# 配置kafka叢集
# container services下的每一個子配置都對應一個節點的docker container

# 給kafka叢集配置一個網絡,網絡名為kafka-net
networks:
  kafka-net:
    name: kafka-net
    driver: bridge
    
services:
  #zookeeper我就配置一個,能運作使用就行,怕伺服器資源不太夠用
  zookeeper-1:
    image: zookeeper
    container_name: zookeeper
    restart: always
    # 配置docker container和主控端的端口映射
    ports:
        - 2181:2181
        - 8081:8080
    # 将docker container上的路徑挂載到主控端上 實作主控端和docker container的資料共享
    volumes:
        - "/usr/local/zookeeper/zookeeper-1/data:/data"
        - "/usr/local/zookeeper/zookeeper-1/datalog:/datalog"
        - "/usr/local/zookeeper/zookeeper-1/logs:/logs"
        - "/usr/local/zookeeper/zookeeper-1/conf:/conf"
    # 配置docker container的環境變量
    environment:
        # 目前zk執行個體的id
        ZOO_MY_ID: 1
        # 整個zk叢集的機器、端口清單
        ZOO_SERVERS: server.1=zookeeper-1:2888:3888 

    command: ["zkServer.sh", "start-foreground"]
    
    networks:
        - kafka-net

        
  kafka-1:
    image: wurstmeister/kafka
    container_name: kafka-1
    restart: always
    # 配置docker container和主控端的端口映射  8083端口是後期部署kafka connect所需要的端口
    ports:
        - 9092:9092
        - 8084:8083
    # plugins 是我友善加入kafka connector 依賴所設檔案夾,不使用connect可以不設定
    # /opt/kafka/plugins 是配置connector的時候指定的容器内部檔案夾路徑
    volumes:
        - "/usr/local/kafka/kafka-1/logs:/kafka"
        - "/usr/local/kafka/plugins:/opt/kafka/plugins"

    # 配置docker container的環境變量
    environment:
      KAFKA_ADVERTISED_HOST_NAME: IP                   ## 修改:主控端IP
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://IP:9092    ## 修改:主控端IP
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper-1:2181"
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper-1
    networks:
        - kafka-net

  kafka-2:
    image: wurstmeister/kafka
    container_name: kafka-2
    restart: always
    # 配置docker container和主控端的端口映射
    ports:
        - 9093:9092
        - 8085:8083
    
    volumes:
        - "/usr/local/kafka/kafka-2/logs:/kafka"
        - "/usr/local/kafka/plugins:/opt/kafka/plugins"

    # 配置docker container的環境變量
    environment:
      KAFKA_ADVERTISED_HOST_NAME: IP                   ## 修改:主控端IP
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://IP:9093    ## 修改:主控端IP
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper-1:2181"
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper-1
    networks:
        - kafka-net
  
  kafka-3:
    image: wurstmeister/kafka
    container_name: kafka-3
    restart: always
    # 配置docker container和主控端的端口映射
    ports:
        - 9094:9092
        - 8086:8083
        
    volumes:
        - "/usr/local/kafka/kafka-3/logs:/kafka"
        - "/usr/local/kafka/plugins:/opt/kafka/plugins"

    # 配置docker container的環境變量
    environment:
      KAFKA_ADVERTISED_HOST_NAME: IP                   ## 修改:主控端IP
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://IP:9094    ## 修改:主控端IP
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper-1:2181"
      KAFKA_ADVERTISED_PORT: 9094
      KAFKA_BROKER_ID: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper-1
    networks:
        - kafka-net

  kafka-manager:
    image: sheepkiller/kafka-manager              ## 鏡像:開源的web管理kafka叢集的界面
    container_name: kafka-manager
    restart: always
    environment:
        ZK_HOSTS: IP:2181                  ## 修改:主控端IP
    ports:
      - "9001:9000"                               ## 暴露端口
    networks:
        - kafka-net      

将docker-compose.yml檔案拷貝到/usr/local/kafka這個檔案夾下面。

啟動docker-compose的kafka叢集

在docker-compose.yml檔案所在的地方運作

-d 背景運作

docker-compose up -d      

檢視運作情況,如果都為UP狀态,則為正常運作

docker ps -a      
kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

開放端口

我在部署zookeeper的時候是開放了zookeeper哪些服務的端口了的,是以我這裡隻用開放kafka和kafka-manager的端口

firewall-cmd --permanent --add-port=9092/tcp
firewall-cmd --permanent --add-port=9093/tcp
firewall-cmd --permanent --add-port=9094/tcp
//8084,8085,8086是 kafka connect rest服務的端口号
firewall-cmd --permanent --add-port=8084/tcp
firewall-cmd --permanent --add-port=8085/tcp
firewall-cmd --permanent --add-port=8086/tcp

firewall-cmd --permanent --add-port=9001/tcp      

重新開機防火牆(修改配置後要重新開機防火牆)

firewall-cmd --reload      

伺服器控制台的防火牆端口同樣也需要開放,便于外界通路伺服器,如果伺服器部署了安全組,則安全組也要開放這些端口。(據我所知,至少阿裡雲是設定了安全組的,需要安全組開放這些端口)

登入kafka-manager管理界面

http://伺服器IP:9001/      

添加叢集,因為kafka是使用zookeeper做配置管理的,是以填寫zookeeper的位址

kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

可以檢視這個叢集的資訊

kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

測試

建立一個topic

docker exec kafka-1 kafka-topics.sh --create --topic xt --partitions 1 --bootstrap-server localhost:9092 --replication-factor 1      
kafka的簡單介紹以及docker-compose部署單主機Kafka叢集

擷取topic清單

docker exec kafka-1 kafka-topics.sh --list --bootstrap-server localhost:9092      

檢視指定topic描述

docker exec kafka-1 kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic xt      

References:

  • ​​https://zhuanlan.zhihu.com/p/43843796​​
  • ​​https://www.jianshu.com/p/e8c29cba9fae​​
  • ​​https://blog.51cto.com/zero01/2509825​​
  • ​​https://blog.51cto.com/zero01/2501495​​

繼續閱讀