Apache Kafka是開源分布式高并發消息中間件,支援每秒百萬級消息并發,在網際網路高并發架構:雙11、電商秒殺搶購、網絡直播、IOT大資料采集、聊天App、導航等高并發架構中大量使用。

生産環境一般要求搭建Kafka叢集。Java開發Kafka叢集需要注意參數的詳細配置,Kafka參數的含義在配置叢集的時候非常重要,尤其是關系性能和叢集的參數。下面我們一起來看看Kafka的詳細參數。
Kafka伺服器基礎配置
broker的身份ID。必須為每個代理設定一個唯一的整數。叢集配置時候非常重要。不能重複。
broker.id = 1
Kafka套接字伺服器配置
Kafka伺服器套接字伺服器偵聽的位址。 如果未配置,就會使用java.net.InetAddress.getCanonicalHostName()位址。格式:
#listeners = listener_name:// host_name:port
例如:
listeners = PLAINTEXT://localhost:9092
listeners = PLAINTEXT://localhost:9091
Broker向生産者和消費者通告的主機名和端口。如果沒有設定,它将使用“偵聽器”的值。否則,它将使用該值從java.net.InetAddress.getCanonicalHostName()傳回的位址。
#advertised.listeners = PLAINTEXT://host.name:9092
Kafka偵聽器名稱映射到安全協定,預設為它們是相同名稱
#listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
伺服器用于從網絡接收請求并向網絡發送響應消息的線程數
num.network.threads = 3
伺服器用于處理請求的線程數,可能包括磁盤I / O需要的線程.
num.io.threads = 8
套接字伺服器使用的發送緩沖區(SO_SNDBUF)大小,位元組。
socket.send.buffer.bytes = 102400
套接字伺服器使用的接收緩沖區(SO_RCVBUF)大小,位元組。
socket.receive.buffer.bytes = 102400
套接字伺服器将接受的請求的最大大小(防止OOM記憶體溢出)大小,位元組。
socket.request.max.bytes = 104857600
Kafka日志配置
逗号分隔的目錄清單,用于存儲日志檔案
log.dirs =。/日志
每個主題的預設日志分區數。更多分區允許更大并行處理消息,但這也會導緻更多的檔案
num.partitions = 1
在啟動時用于日志恢複和在關閉時重新整理的每個資料目錄檔案夾需要的線程數。
#對于資料目錄檔案夾位于RAID陣列中的情況,建議增加此線程數值。
num.recovery.threads.per.data.dir = 1
Kafka内部主題配置
Topic主題的消費者的group中繼資料複制因子,"__consumer_offsets" 和"__transaction_state" 。
offsets.topic.replication.factor = 1
transaction.state.log.replication.factor = 1
transaction.state.log.min.isr = 1
Kafka日志重新整理政策配置
Kafka消息立即寫入檔案系統,但預設情況下我們隻有fsync()來緩慢延遲地同步作業系統緩存消息到磁盤上。 以下配置參數控制将消息資料重新整理到磁盤過程。這裡有一些重要的權衡:
1、持久性:如果Kafka不使用複制,則可能會丢失未重新整理的資料。
2、延遲:當Kafka重新整理确實發生時,非常大的重新整理間隔可能會導緻延遲峰值,因為會有大量資料需要重新整理到磁盤,間隔太久緩沖消息越多。
3、吞吐量:Flush通常是最昂貴的操作,并且小的Flush間隔可能導緻過多的磁盤IO操作搜尋。
下面以下設定允許配置重新整理政策以在一段時間後或每N條消息(或兩者)重新整理資料。這可以在全局範圍内配置完成,也可以針對每個主題的單獨配置。
#強制重新整理資料到磁盤之前要接受的消息數,10000消息時批量刷盤
#log.flush.interval.messages = 10000
#強制重新整理之前消息可以在日志中停留的最長時間 1000毫秒
#log.flush.interval.ms = 1000
Kafka日志儲存政策
以下配置參數控制Kafka日志段的處理政策,我們可以設定為在一段時間後或在累積給定大小後删除日志資料段。隻要滿足一下任意條件,就會删除一個日志段。删除總是從日志的末尾開始。
log log的最小時間長度,超過删除日志。
#由于時間可以删除的日志檔案的最小年齡 168小時
log.retention.hours = 168
基于大小的日志保留政策。 除非剩餘的段大小在log.retention.bytes之下,否則将從日志中删除段。 功能獨立于log.retention.hours限制。
#log.retention.bytes = 1073741824
#日志段檔案的最大大小。達到此大小時,将建立新的日志段。
log.segment.bytes = 1073741824
#檢查日志段以檢視是否可以删除日志段的時間間隔
#保留政策
log.retention.check.interval.ms = 300000
Zookeeper參數設定
Zookeeper連接配接字元串。預設單台設定,叢集需要多台,逗号分隔的主機:端口,每個對應一個Zookeeper。例如“127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”。
#所有kafka znodes的根目錄。叢集設定模式
zookeeper.connect =localhost:2181,localhost:2182,localhost:2183
連接配接到zookeeper的逾時時間(以毫秒為機關)
zookeeper.connection.timeout.ms = 6000
kafka Group協調配置
以下配置指定GroupCoordinator将延遲初始消費者重新平衡的時間(以毫秒為機關)。
#當新成員加入組時,重新平衡延遲是group.initial.rebalance.delay.ms的值,最多max.poll.interval.ms。
預設值為3秒。我們将此參數設定為0,友善開發和測試。但是生産環境中預設值推薦3秒更合适,因為這有助于避免在應用程式啟動期間不必要且可能很昂貴的重新平衡過程,減少系統資源的消耗。
group.initial.rebalance.delay.ms = 0
在叢集和優化情況下需要了解每個參數的确切含義,對于Kafka叢集的設定,需要配置多個Zookeeper位址。預設的日志清理、垃圾回收、連接配接池、線程模型都是非常重要的因素。
使用最新的Java Spring Boot 2.x版本連接配接Kafka需要在配置檔案中修改位址參數:
spring.kafka.consumer.group-id=myGroup
spring.kafka.bootstrap-servers=localhost:9091,localhost:9092,localhost:9093
參考文檔
http://kafka.apache.org/8、阿裡巴巴Java群超過2900人
直播位址:Java技術進階群
進群方式:釘釘掃碼入群
阿裡巴巴MongoDB群