消息系統的分類:
- Peer-to-Peer
- 一般基于pull或者polling接受消息
- 發送到隊列的消息被一個而且僅僅一個接收者所接收,即使有多個接收者在在同一個隊列中偵聽同一個消息
- 既支援異步的消息傳送方式,也支援請求/應答傳送方式

- 釋出/訂閱
- 釋出到一個主題的消息,可以被多個接收者訂閱
- 釋出/訂閱即可基于push消費資料,也可基于pull或者polling消費資料
- 解耦能力比p2p模型更強
消息系統使用的場景:
解耦:各系統之間使用消息系統這個統一的借口交換資料,不需要了解彼此的存在
備援:部分消息系統具有消息持久化的能力,可以避免消息在處理前丢失的風險
擴充:消息系統是統一的資料借口,各系統可以獨立擴充
峰值處理能力:消息系統可頂住峰值流量,業務系統可以根據處理能力從消息系統中擷取并處理對應量的請求
可恢複性:系統中部分元件失效并不影響整個系統,它恢複後仍然可以從消息系統中擷取并處理資料
異步通信:在不需要立即處理請求的場景下,可以将請求放入消息系統中,合适的時候在處理
常用的消息系統
RabbitMQ
Redis
ZeroMQ
ActiveMQ
Kafka
MetaQ
kafka介紹
kafka是一種分布式的,基于釋出/訂閱的消息系統。
原本開發自LinkedIn,用作LinkedIn的活動流(Activity Stream)和營運資料處理管道(Pipeline)的基礎。現在它已被多家不同類型的公司 作為多種類型的資料管道和消息系統使用。
kafka的架構組成
Broker(經紀人):kafka叢集包含一個或者多個伺服器,這些伺服器被稱作broker
Topic(主題):每條發送到kafka叢集的消息都有一個類别,這個類别叫做topic
Partition(分片):partition是實體概念,一個topic被分成了一個或者多個partition
Producer(生産者):負責釋出消息到kafka broker
Consumer(消費者):消息消費者,向kafka broker讀取資料的用戶端
Consumer:每個consumer屬于一個特定的consumer group(可為每個consumer指定group name,若不指定則屬于預設的group)
kafka拓撲結構
如圖:kafka叢集
- 若幹Producer(可以使web前端産生的page view,或者伺服器的日志)
- 若幹broker(kafka支援水準擴充,一般broker數量越多,叢集吞吐量越高)
- 若幹consumer group以及一個zookeeper叢集。kafka通過zookeeper管理叢集配置,選舉leader,以及在consumer group發生變化時進行rebalance。
- producer使用push模式将消息釋出到broker,consumer使用pull模式從broker訂閱并消費。
topic & partition
topic在邏輯上被認為是一個queue,每條消息必須指定它的topic。而為了
提高kafka的吞吐率,把topic分成多個partition,每一個partition在實體上對應一個檔案夾,該檔案夾下存儲這個partition的所有消息和索引檔案。因為每條消息都被append到該Partition中,屬于順序寫磁盤,是以效率非常高
a topic
zookeeper的介紹
zookeeper是一個高性能分布式應用協調服務
- naming server
- 配置管理
- leader election
- 服務發現
- 同步
- group server
- barrier
- 分布式隊列
- 兩階段送出
zookeeper的工作方式
- zookeeper叢集包括一個leader和多個follower
- 所有的follower都可以提供讀服務
- 所有的寫服務都會被轉發到leader上
- client和server通過NIO通信
- 全局串行化所有的寫操作
- 保證同一用戶端的指令被FIFO執行
- 保證消息通知的FIFO
Zab協定--廣播模式
zookeeper是通過Zab協定進行資料通信的,過程如下:
- leader收到寫服務時,将所有的更新(成為proposal),順序發給follower
- follower在收到proposal之後将proposal寫入磁盤,寫入成功傳回ACK到leader
- 當leader收到半數以上的follower發來的ACK時,向所有的follower發送commit消息,并在本地commit該消息(即使消息寫入到了磁盤,但是沒有commit,使用者是看不到的)
注:每個proposal都會有一個唯一的單調遞增的proposal ID,即zxid
Zab協定--恢複模式
進入恢複模式:當leader當機或者失去大多數follwer後進入恢複模式
結束恢複模式:新的leader選舉出來後,且大多數follower已經和leader完成可資料同步後,恢複模式結束
恢複過程:
- 發現叢集中被commit的proposal的最大zxid
- 建立新的epoch,進而保證之前的Leader不能再commit新的Proposal
- 叢集中大部分節點都commit過前一個Leader commit過的消息,而新的Leader是被大部分節點所支援的,是以被之前Leader commit過的Proposal不會丢失,至少被一個節點所儲存
- 新Leader會與所有Follower通信,進而保證大部分節點都擁有最新的資料
kafka安裝
下載下傳kafka源碼包
wget http://apache.fayea.com/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
tar xf kafka_2.12-0.10.2.0.tgz -C /usr/local/
ln -s /usr/local/kafka_2.12-0.10.2.0 /usr/local/kafka
cd /usr/local/kafka
注:kafka軟體包中帶有zookeeper,如果伺服器上沒有安裝zookeeper,可以用kafka自帶的zookeeper
測試kafka是否可用:
1.建立一個topic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 3 --replication-factor 1
檢視資訊
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1
Topic:test1 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
- 開啟zookeeper,終端輸出./bin/zookeeper-server-start.sh config/zookeeper.properties
- 在另一個終端開啟kafka,終端輸出./bin/kafka-server-start.sh config/server.properties
-
檢視兩個服務是否開啟[root@node11 ~]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 44477 root 103u IPv6 2312661 0t0 TCP *:XmlIpcRegSvc (LISTEN)
java 44477 root 107u IPv6 2312663 0t0 TCP localhost:27895->localhost:XmlIpcRegSvc (ESTABLISHED)
java 44477 root 108u IPv6 2312664 0t0 TCP localhost:XmlIpcRegSvc->localhost:27895 (ESTABLISHED)
[root@node11 ~]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 44220 root 86u IPv6 2312278 0t0 TCP *:eforward (LISTEN)
java 44220 root 87u IPv6 2312652 0t0 TCP localhost:eforward->localhost:38549 (ESTABLISHED)
java 44477 root 85u IPv6 2312651 0t0 TCP localhost:38549->localhost:eforward (ESTABLISHED)
- 在另一個終端,開啟kafka的consumer,等待接收topic為test1的資料./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1
-
在另一個終端開啟一個producer,往test1中發送資料./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
hello kafka #發送一個資料
- 在consumer終端檢視,是否收到消息
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1
hello kafka
kafka安裝成功
注:zookeeper在安裝
wget http://apache.fayea.com/zookeeper/current/zookeeper-3.4.10.tar.gz
tar xf zookeeper-3.4.10.tar.gz -C /usr/local/
cd /usr/local/
ln -s /usr/local/zookeeper-3.4.10/ zookeeper
kafka叢集和zookeeper叢集安裝
裝置:
centos6.8
192.168.163.11 node11
192.168.163.30 node30
192.168.163.50 node50
主機名互相解析
安裝zookeeper叢集
在三個節點上安裝zookeeper
wget http://apache.fayea.com/zookeeper/current/zookeeper-3.4.10.tar.gz
tar xf zookeeper-3.4.10.tar.gz -C /usr/local/
cd /usr/local/
ln -s /usr/local/zookeeper-3.4.10/ zookeeper
複制配置檔案
cd /usr/local/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
在各節點配置檔案中添加叢集節點資訊
vim zoo.cfg
server.1=192.168.163.11:2888:3888
server.2=192.168.163.30:2888:3888
server.3=192.168.163.50:2888:3888
# 節點可以是ip或者主機名
在各節點的資料目錄中建立myid檔案,并将自己的id号寫在裡面,節點之間的id号不能相同。在叢集中myid越大,成為上司的幾率越大。
[root@node11 zookeeper]# cat /tmp/zookeeper/myid
1
[root@node50 zookeeper]# cat /tmp/zookeeper/myid
3
[root@node30 zookeeper]# cat /tmp/zookeeper/myid
2
在各節點啟動zookeeper
./bin/zkServer.sh start conf/zoo.cfg
檢視狀态,node11和node30是跟随者,node50是上司者
node11
[root@node11 zookeeper]# ./bin/zkServer.sh status conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Mode: follower
node30
[root@node30 zookeeper]# ./bin/zkServer.sh status conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Mode: follower
node50
[root@node50 zookeeper]# ./bin/zkServer.sh status conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Mode: leader
測試叢集,在node11上長建立一個值,在其他節點檢視
node11
[root@node11 zookeeper]# ./bin/zkCli.sh -server 192.168.163.11:2181
# 進入叢集
[zk: 192.168.163.11:2181(CONNECTED) 0] create /one hello
# 建立一個/noe并指派hello
在node30和node50上檢視,如果能檢視到表叢集安裝成功
[root@node30 zookeeper]# ./bin/zkCli.sh -server 192.168.163.30:2181
[zk: 192.168.163.30:2181(CONNECTED) 2] get /one
hello
安裝kafka叢集
在各節點上安裝jdk,kafka,安裝過程略
修改配置檔案,添加zookeeper叢集的各節點。修改各節點的broker.id,各節點的id不能相同。
vim config/server.properties
zookeeper.cnotallow=192.168.163.11:2181,192.168.163.30:2181,192.168.163.50:2181
broker.id=
# 唯一性,不相同
listeners=plaintext://192.168.163.50:9092
# 監聽在自己的ip:port上
advertised.listeners=plaintext://192.168.163.50:9092
# 箭筒在自己的ip:port上
delete.topic.enable=true
# 開啟後可以手動删除一些無用的topic
num.partitinotallow=3
設定在不指定partition數量時的預設數量
# log.dirs=/tmp/kafka-logs
指定kafka接收資料的存放位置
vim config/consumer.properties
zookeeper.cnotallow="192.168.163.11:2181,192.168.163.30:2181,192.168.163.50:2181"
# 添加所有的zookeeper叢集節點
group.id=test-consumer-group
# 指定組id,可選項
啟動kafka
./bin/kafka-server-start.sh -daemon config/server.properties
檢視啟動狀态:
node11
[root@node11 zookeeper]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 5724 root 100u IPv6 35329 0t0 TCP *:XmlIpcRegSvc (LISTEN)
java 5724 root 119u IPv6 36244 0t0 TCP node11:42792->node50:XmlIpcRegSvc (ESTABLISHED)
node30
[root@node30 kafka]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 8828 root 83w IPv6 58610 0t0 TCP node30:XmlIpcRegSvc->node50:14370 (ESTABLISHED)
java 8828 root 100u IPv6 56969 0t0 TCP *:XmlIpcRegSvc (LISTEN)
java 8828 root 107u IPv6 57247 0t0 TCP node30:17443->node30:XmlIpcRegSvc (ESTABLISHED)
java 8828 root 108u IPv6 57248 0t0 TCP node30:XmlIpcRegSvc->node30:17443 (ESTABLISHED)
node50
[root@node50 zookeeper]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 15513 root 100u IPv6 52247 0t0 TCP *:XmlIpcRegSvc (LISTEN)
java 15513 root 107u IPv6 53213 0t0 TCP node50:XmlIpcRegSvc->node11:42792 (ESTABLISHED)
java 15513 root 118u IPv6 54849 0t0 TCP node50:14370->node30:XmlIpcRegSvc (ESTABLISHED)
測試kafka叢集
在node11上建立一個topic
[root@node11 kafka]# ./bin/kafka-topics.sh --zookeeper 192.168.163.11:2181 --create --topic nginxlog --partitions 3 --replication-factor 2
Created topic "nginxlog".
[root@node11 kafka]# ./bin/kafka-topics.sh --zookeeper 192.168.163.11:2181 --topic nginxlog --describe
Topic:nginxlog PartitionCount:3 ReplicationFactor:2 Configs:
Topic: nginxlog Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: nginxlog Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2
Topic: nginxlog Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2
在node30上往kafka叢集發送消息
[root@node30 kafka]# ./bin/kafka-console-producer.sh --broker-list 192.168.163.30:9092 --topic weblog
hello
在node11上開啟consumer接受消息
[root@node11 kafka]# ./bin/kafka-console-consumer.sh --zookeeper 192.168.163.11:2181 --topic weblog
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
hello
到此,kafka叢集安裝成功
kafka-manager
管理kafka的工具,可以在網頁上管理叢集
功能:
- 管理多個kafka叢集
- 便捷的檢查kafka叢集狀态(topics,brokers,備份分布情況,分區分布情況)
- 選擇你要運作的副本
- 基于目前分區狀況進行
- 可以選擇topic配置并建立topic(0.8.1.1和0.8.2的配置不同)
- 删除topic(隻支援0.8.2以上的版本并且要在broker配置中設定delete.topic.enable=true)
- Topic list會指明哪些topic被删除(在0.8.2以上版本适用)
- 為已存在的topic增加分區
- 為已存在的topic更新配置
- 在多個topic上批量重分區
- 在多個topic上批量重分區(可選partition broker位置)
安裝sbt 用來編譯
curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
mv bintray-sbt-rpm.repo /etc/yum.repos.d/
yum install sbt
下載下傳源碼包,編譯,時間會很長
git clone https://github.com/yahoo/kafka-manager.git
cd kafka-manager
sbt clean dist
編譯成功後,會在target/universal下生成一個zip包
解壓并修改檔案配置
unzip kafka-manager-1.3.0.4.zip
vim kafka-manager-1.3.0.4/conf/application.conf
将application.conf中的kafka-manager.zkhosts的值設定為你的zk位址
啟動,預設端口号是9000
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port 9001 &
kafka配置檔案參數
server.properties配置檔案
Property | Default | Description |
broker.id | 每個broker都可以用一個唯一的非負整數id進行辨別;這個id可以作為broker的“名字”,并且它的存在使得broker無須混淆consumers就可以遷移到不同的host/port上。你可以選擇任意你喜歡的數字作為id,隻要id是唯一的即可。 | |
log.dirs | /tmp/kafka-logs | kafka存放資料的路徑。這個路徑并不是唯一的,可以是多個,路徑之間隻需要使用逗号分隔即可;每當建立新partition時,都會選擇在包含最少partitions的路徑下進行。 |
port | 6667 | server接受用戶端連接配接的端口 |
zookeeper.connect | null | ZooKeeper連接配接字元串的格式為:hostname:port,此處hostname和port分别是ZooKeeper叢集中某個節點的host和port;為了當某個host宕掉之後你能通過其他ZooKeeper節點進行連接配接,你可以按照一下方式制定多個hosts:host,name1:port1, hostname2:port2, hostname3:port3. ZooKeeper 允許你增加一個“chroot”路徑,将叢集中所有kafka資料存放在特定的路徑下。當多個Kafka叢集或者其他應用使用相同ZooKeeper叢集時,可以使用這個方式設定資料存放路徑。這種方式的實作可以通過這樣設定連接配接字元串格式,如下所示: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 這樣設定就将所有kafka叢集資料存放在/chroot/path路徑下。注意,在你啟動broker之前,你必須建立這個路徑,并且consumers必須使用相同的連接配接格式。 |
message.max.bytes | 1000000 | server可以接收的消息最大尺寸。重要的是,consumer和producer有關這個屬性的設定必須同步,否則producer釋出的消息對consumer來說太大。 |
num.network.threads | 3 | server用來處理網絡請求的網絡線程數目;一般你不需要更改這個屬性。 |
num.io.threads | 8 | server用來處理請求的I/O線程的數目;這個線程數目至少要等于硬碟的個數。 |
background.threads | 4 | 用于背景處理的線程數目,例如檔案删除;你不需要更改這個屬性。 |
queued.max.requests | 500 | 在網絡線程停止讀取新請求之前,可以排隊等待I/O線程處理的最大請求個數。 |
host.name | null | broker的hostname;如果hostname已經設定的話,broker将隻會綁定到這個位址上;如果沒有設定,它将綁定到所有接口,并釋出一份到ZK |
advertised.host.name | null | 如果設定,則就作為broker 的hostname發往producer、consumers以及其他brokers |
advertised.port | null | 此端口将給與producers、consumers、以及其他brokers,它會在建立連接配接時用到; 它僅在實際端口和server需要綁定的端口不一樣時才需要設定。 |
socket.send.buffer.bytes | 100 * 1024 | SO_SNDBUFF 緩存大小,server進行socket 連接配接所用 |
socket.receive.buffer.bytes | 100 * 1024 | SO_RCVBUFF緩存大小,server進行socket連接配接時所用 |
socket.request.max.bytes | 100 * 1024 * 1024 | server允許的最大請求尺寸; 這将避免server溢出,它應該小于Java heap size |
num.partitions | 1 | 如果建立topic時沒有給出劃分partitions個數,這個數字将是topic下partitions數目的預設數值。 |
log.segment.bytes | 1014*1024*1024 | topic partition的日志存放在某個目錄下諸多檔案中,這些檔案将partition的日志切分成一段一段的;這個屬性就是每個檔案的最大尺寸;當尺寸達到這個數值時,就會建立新檔案。此設定可以由每個topic基礎設定時進行覆寫。 檢視 the per-topic configuration section |
log.roll.hours | 24 * 7 | 即使檔案沒有到達log.segment.bytes,隻要檔案建立時間到達此屬性,就會建立新檔案。這個設定也可以有topic層面的設定進行覆寫; 檢視the per-topic configuration section |
log.cleanup.policy | delete | |
log.retention.minutes log.retention.hours | 7 days | 每個日志檔案删除之前儲存的時間。預設資料儲存時間對所有topic都一樣。 log.retention.minutes 和 log.retention.bytes 都是用來設定删除日志檔案的,無論哪個屬性已經溢出。 這個屬性設定可以在topic基本設定時進行覆寫。 檢視the per-topic configuration section |
log.retention.bytes | -1 | 每個topic下每個partition儲存資料的總量;注意,這是每個partitions的上限,是以這個數值乘以partitions的個數就是每個topic儲存的資料總量。同時注意:如果log.retention.hours和log.retention.bytes都設定了,則超過了任何一個限制都會造成删除一個段檔案。 注意,這項設定可以由每個topic設定時進行覆寫。 檢視the per-topic configuration section |
log.retention.check.interval.ms | 5 minutes | 檢查日志分段檔案的間隔時間,以确定是否檔案屬性是否到達删除要求。 |
log.cleaner.enable | false | 當這個屬性設定為false時,一旦日志的儲存時間或者大小達到上限時,就會被删除;如果設定為true,則當儲存屬性達到上限時,就會進行log compaction。 |
log.cleaner.threads | 1 | 進行日志壓縮的線程數 |
log.cleaner.io.max.bytes.per.second | None | 進行log compaction時,log cleaner可以擁有的最大I/O數目。這項設定限制了cleaner,以避免幹擾活動的請求服務。 |
log.cleaner.io.buffer.size | 500*1024*1024 | log cleaner清除過程中針對日志進行索引化以及精簡化所用到的緩存大小。最好設定大點,以提供充足的記憶體。 |
log.cleaner.io.buffer.load.factor | 512*1024 | 進行log cleaning時所需要的I/O chunk尺寸。你不需要更改這項設定。 |
log.cleaner.io.buffer.load.factor | 0.9 | log cleaning中所使用的hash表的負載因子;你不需要更改這個選項。 |
log.cleaner.backoff.ms | 15000 | 進行日志是否清理檢查的時間間隔 |
log.cleaner.min.cleanable.ratio | 0.5 | 這項配置控制log compactor試圖清理日志的頻率(假定log compaction是打開的)。預設避免清理壓縮超過50%的日志。這個比率綁定了備份日志所消耗的最大空間(50%的日志備份時壓縮率為50%)。更高的比率則意味着浪費消耗更少,也就可以更有效的清理更多的空間。這項設定在每個topic設定中可以覆寫。 檢視the per-topic configuration section。 |
log.cleaner.delete.retention.ms | 1day | 儲存時間;儲存壓縮日志的最長時間;也是用戶端消費消息的最長時間,和log.retention.minutes的差別在于一個控制未壓縮資料,一個控制壓縮後的資料;會被topic建立時的指定時間覆寫。 |
log.index.size.max.bytes | 10*1024*1024 | 每個log segment的最大尺寸。注意,如果log尺寸達到這個數值,即使尺寸沒有超過log.segment.bytes限制,也需要産生新的log segment。 |
log.index.interval.bytes | 4096 | 當執行一次fetch後,需要一定的空間掃描最近的offset,設定的越大越好,一般使用預設值就可以 |
log.flush.interval.messages | Long.MaxValue | log檔案“sync”到磁盤之前累積的消息條數。因為磁盤IO操作是一個慢操作,但又是一個“資料可靠性”的必要手段,是以檢查是否需要固化到硬碟的時間間隔。需要在“資料可靠性”與“性能”之間做必要的權衡,如果此值過大,将會導緻每次“發sync”的時間過長(IO阻塞),如果此值過小,将會導緻“fsync”的時間較長(IO阻塞),如果此值過小,将會導緻”發sync“的次數較多,這也就意味着整體的client請求有一定的延遲,實體server故障,将會導緻沒有fsync的消息丢失。 |
log.flush.scheduler.interval.ms | Long.MaxValue | 檢查是否需要fsync的時間間隔 |
log.flush.interval.ms | Long.MaxValue | 僅僅通過interval來控制消息的磁盤寫入時機,是不足的,這個數用來控制”fsync“的時間間隔,如果消息量始終沒有達到固化到磁盤的消息數,但是離上次磁盤同步的時間間隔達到門檻值,也将觸發磁盤同步。 |
log.delete.delay.ms | 60000 | 檔案在索引中清除後的保留時間,一般不需要修改 |
auto.create.topics.enable | true | 是否允許自動建立topic。如果是真的,則produce或者fetch 不存在的topic時,會自動建立這個topic。否則需要使用指令行建立topic |
controller.socket.timeout.ms | 30000 | partition管理控制器進行備份時,socket的逾時時間。 |
controller.message.queue.size | Int.MaxValue | controller-to-broker-channles的buffer 尺寸 |
default.replication.factor | 1 | 預設備份份數,僅指自動建立的topics |
replica.lag.time.max.ms | 10000 | 如果一個follower在這個時間内沒有發送fetch請求,leader将從ISR重移除這個follower,并認為這個follower已經挂了 |
replica.lag.max.messages | 4000 | 如果一個replica沒有備份的條數超過這個數值,則leader将移除這個follower,并認為這個follower已經挂了 |
replica.socket.timeout.ms | 30*1000 | leader 備份資料時的socket網絡請求的逾時時間 |
replica.socket.receive.buffer.bytes | 64*1024 | 備份時向leader發送網絡請求時的socket receive buffer |
replica.fetch.max.bytes | 1024*1024 | 備份時每次fetch的最大值 |
replica.fetch.min.bytes | 500 | leader發出備份請求時,資料到達leader的最長等待時間 |
replica.fetch.min.bytes | 1 | 備份時每次fetch之後回應的最小尺寸 |
num.replica.fetchers | 1 | 從leader備份資料的線程數 |
replica.high.watermark.checkpoint.interval.ms | 5000 | 每個replica檢查是否将最高水位進行固化的頻率 |
fetch.purgatory.purge.interval.requests | 1000 | fetch 請求清除時的清除間隔 |
producer.purgatory.purge.interval.requests | 1000 | producer請求清除時的清除間隔 |
zookeeper.session.timeout.ms | 6000 | zookeeper會話逾時時間。 |
zookeeper.connection.timeout.ms | 6000 | 用戶端等待和zookeeper建立連接配接的最大時間 |
zookeeper.sync.time.ms | 2000 | zk follower落後于zk leader的最長時間 |
controlled.shutdown.enable | true | 是否能夠控制broker的關閉。如果能夠,broker将可以移動所有leaders到其他的broker上,在關閉之前。這減少了不可用性在關機過程中。 |
controlled.shutdown.max.retries | 3 | 在執行不徹底的關機之前,可以成功執行關機的指令數。 |
controlled.shutdown.retry.backoff.ms | 5000 | 在關機之間的backoff時間 |
auto.leader.rebalance.enable | true | 如果這是true,控制者将會自動平衡brokers對于partitions的leadership |
leader.imbalance.per.broker.percentage | 10 | 每個broker所允許的leader最大不平衡比率 |
leader.imbalance.check.interval.seconds | 300 | 檢查leader不平衡的頻率 |
offset.metadata.max.bytes | 4096 | 允許用戶端儲存他們offsets的最大個數 |
max.connections.per.ip | Int.MaxValue | 每個ip位址上每個broker可以被連接配接的最大數目 |
max.connections.per.ip.overrides | 每個ip或者hostname預設的連接配接的最大覆寫 | |
connections.max.idle.ms | 600000 | 空連接配接的逾時限制 |
log.roll.jitter.{ms,hours} | 0 | 從logRollTimeMillis抽離的jitter最大數目 |
num.recovery.threads.per.data.dir | 1 | 每個資料目錄用來日志恢複的線程數目 |
unclean.leader.election.enable | true | 指明了是否能夠使不在ISR中replicas設定用來作為leader |
delete.topic.enable | false | 能夠删除topic |
offsets.topic.num.partitions | 50 | The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200). |
offsets.topic.retention.minutes | 1440 | 存在時間超過這個時間限制的offsets都将被标記為待删除 |
offsets.retention.check.interval.ms | 600000 | offset管理器檢查陳舊offsets的頻率 |
offsets.topic.replication.factor | 3 | topic的offset的備份份數。建議設定更高的數字保證更高的可用性 |
offset.topic.segment.bytes | 104857600 | offsets topic的segment尺寸。 |
offsets.load.buffer.size | 5242880 | 這項設定與批量尺寸相關,當從offsets segment中讀取時使用。 |
offsets.commit.required.acks | -1 | 在offset commit可以接受之前,需要設定确認的數目,一般不需要更改 |
consumer.proerties配置檔案
Property | Default | Description |
group.id | 用來唯一辨別consumer程序所在組的字元串,如果設定同樣的group id,表示這些processes都是屬于同一個consumer group | |
zookeeper.connect | 指定zookeeper的連接配接的字元串,格式是hostname:port,此處host和port都是zookeeper server的host和port,為避免某個zookeeper 機器當機之後失聯,你可以指定多個hostname:port,使用逗号作為分隔: hostname1:port1,hostname2:port2,hostname3:port3 可以在zookeeper連接配接字元串中加入zookeeper的chroot路徑,此路徑用于存放他自己的資料,方式: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path | |
consumer.id | null | 不需要設定,一般自動産生 |
socket.timeout.ms | 30*100 | 網絡請求的逾時限制。真實的逾時限制是 max.fetch.wait+socket.timeout.ms |
socket.receive.buffer.bytes | 64*1024 | socket用于接收網絡請求的緩存大小 |
fetch.message.max.bytes | 1024*1024 | 每次fetch請求中,針對每次fetch消息的最大位元組數。這些位元組将會督導用于每個partition的記憶體中,是以,此設定将會控制consumer所使用的memory大小。這個fetch請求尺寸必須至少和server允許的最大消息尺寸相等,否則,producer可能發送的消息尺寸大于consumer所能消耗的尺寸。 |
num.consumer.fetchers | 1 | 用于fetch資料的fetcher線程數 |
auto.commit.enable | true | 如果為真,consumer所fetch的消息的offset将會自動的同步到zookeeper。這項送出的offset将在程序挂掉時,由新的consumer使用 |
auto.commit.interval.ms | 60*1000 | consumer向zookeeper送出offset的頻率,機關是秒 |
queued.max.message.chunks | 2 | 用于緩存消息的最大數目,以供consumption。每個chunk必須和fetch.message.max.bytes相同 |
rebalance.max.retries | 4 | 當新的consumer加入到consumer group時,consumers集合試圖重新平衡配置設定到每個consumer的partitions數目。如果consumers集合改變了,當配置設定正在執行時,這個重新平衡會失敗并重入 |
fetch.min.bytes | 1 | 每次fetch請求時,server應該傳回的最小位元組數。如果沒有足夠的資料傳回,請求會等待,直到足夠的資料才會傳回。 |
fetch.wait.max.ms | 100 | 如果沒有足夠的資料能夠滿足fetch.min.bytes,則此項配置是指在應答fetch請求之前,server會阻塞的最大時間。 |
rebalance.backoff.ms | 2000 | 在重試reblance之前backoff時間 |
refresh.leader.backoff.ms | 200 | 在試圖确定某個partition的leader是否失去他的leader地位之前,需要等待的backoff時間 |
auto.offset.reset | largest | zookeeper中沒有初始化的offset時,如果offset是以下值的回應: smallest:自動複位offset為smallest的offset largest:自動複位offset為largest的offset anything else:向consumer抛出異常 |
consumer.timeout.ms | -1 | 如果沒有消息可用,即使等待特定的時間之後也沒有,則抛出逾時異常 |
exclude.internal.topics | true | 是否将内部topics的消息暴露給consumer |
paritition.assignment.strategy | range | 選擇向consumer 流配置設定partitions的政策,可選值:range,roundrobin |
client.id | group id value | 是使用者特定的字元串,用來在每次請求中幫助跟蹤調用。它應該可以邏輯上确認産生這個請求的應用 |
zookeeper.session.timeout.ms | 6000 | zookeeper 會話的逾時限制。如果consumer在這段時間内沒有向zookeeper發送心跳資訊,則它會被認為挂掉了,并且reblance将會産生 |
zookeeper.connection.timeout.ms | 6000 | 用戶端在建立通zookeeper連接配接中的最大等待時間 |
zookeeper.sync.time.ms | 2000 | ZK follower可以落後ZK leader的最大時間 |
offsets.storage | zookeeper | 用于存放offsets的地點: zookeeper或者kafka |
offset.channel.backoff.ms | 1000 | 重新連接配接offsets channel或者是重試失敗的offset的fetch/commit請求的backoff時間 |
offsets.channel.socket.timeout.ms | 10000 | 當讀取offset的fetch/commit請求回應的socket 逾時限制。此逾時限制是被consumerMetadata請求用來請求offset管理 |
offsets.commit.max.retries | 5 | 重試offset commit的次數。這個重試隻應用于offset commits在shut-down之間。他 |
dual.commit.enabled | true | 如果使用“kafka”作為offsets.storage,你可以二次送出offset到zookeeper(還有一次是送出到kafka)。在zookeeper-based的offset storage到kafka-based的offset storage遷移時,這是必須的。對任意給定的consumer group來說,比較安全的建議是當完成遷移之後就關閉這個選項 |
partition.assignment.strategy | range | 在“range”和“roundrobin”政策之間選擇一種作為配置設定partitions給consumer 資料流的政策; 循環的partition配置設定器配置設定所有可用的partitions以及所有可用consumer 線程。它會将partition循環的配置設定到consumer線程上。如果所有consumer執行個體的訂閱都是确定的,則partitions的劃分是确定的分布。循環配置設定政策隻有在以下條件滿足時才可以:(1)每個topic在每個consumer實力上都有同樣數量的資料流。(2)訂閱的topic的集合對于consumer group中每個consumer執行個體來說都是确定的。 |
producer.properties配置檔案
Name | Type | Default | Importance | Description |
boostrap.servers | list | high | 用于建立與kafka叢集連接配接的host/port組。資料将會在所有servers上均衡加載,不管哪些server是指定用于bootstrapping。這個清單僅僅影響初始化的hosts(用于發現全部的servers)。這個清單格式: host1:port1,host2:port2,... 因為這些server僅僅是用于初始化的連接配接,以發現叢集所有成員關系(可能會動态的變化),這個清單不需要包含所有的servers(你可能想要不止一個server,盡管這樣,可能某個server當機了)。如果沒有server在這個清單出現,則發送資料會一直失敗,直到清單可用。 | |
acks | string | 1 | high | producer需要server接收到資料之後發出的确認接收的信号,此項配置就是指procuder需要多少個這樣的确認信号。此配置實際上代表了資料備份的可用性。以下設定為常用選項: (1)acks=0: 設定為0表示producer不需要等待任何确認收到的資訊。副本将立即加到socket buffer并認為已經發送。沒有任何保障可以保證此種情況下server已經成功接收資料,同時重試配置不會發生作用(因為用戶端不知道是否失敗)回饋的offset會總是設定為-1; (2)acks=1: 這意味着至少要等待leader已經成功将資料寫入本地log,但是并沒有等待所有follower是否成功寫入。這種情況下,如果follower沒有成功備份資料,而此時leader又挂掉,則消息會丢失。 (3)acks=all: 這意味着leader需要等待所有備份都成功寫入日志,這種政策會保證隻要有一個備份存活就不會丢失資料。這是最強的保證。 (4)其他的設定,例如acks=2也是可以的,這将需要給定的acks數量,但是這種政策一般很少用。 |
buffer.memory | long | 33554432 | high | producer可以用來緩存資料的記憶體大小。如果資料産生速度大于向broker發送的速度,producer會阻塞或者抛出異常,以“block.on.buffer.full”來表明。 這項設定将和producer能夠使用的總記憶體相關,但并不是一個硬性的限制,因為不是producer使用的所有記憶體都是用于緩存。一些額外的記憶體會用于壓縮(如果引入壓縮機制),同樣還有一些用于維護請求。 |
compression.type | string | none | high | producer用于壓縮資料的壓縮類型。預設是無壓縮。正确的選項值是none、gzip、snappy。 壓縮最好用于批量處理,批量處理消息越多,壓縮性能越好。 |
retries | int | 0 | high | 設定大于0的值将使用戶端重新發送任何資料,一旦這些資料發送失敗。注意,這些重試與用戶端接收到發送錯誤時的重試沒有什麼不同。允許重試将潛在的改變資料的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。 |
batch.size | int | 16384 | medium | producer将試圖批處理消息記錄,以減少請求次數。這将改善client與server之間的性能。這項配置控制預設的批量處理消息位元組數。 不會試圖處理大于這個位元組數的消息位元組數。 發送到brokers的請求将包含多個批量處理,其中會包含對每個partition的一個請求。 較小的批量處理數值比較少用,并且可能降低吞吐量(0則會僅用批量處理)。較大的批量處理數值将會浪費更多記憶體空間,這樣就需要配置設定特定批量處理數值的記憶體大小。 |
client.id | string | medium | 當向server送出請求時,這個字元串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可清單之外的一些應用可以發送資訊。這項應用可以設定任意字元串,因為沒有任何功能性的目的,除了記錄和跟蹤 | |
linger.ms | long | 0 | medium | producer組将會彙總任何在請求與發送之間到達的消息記錄一個單獨批量的請求。通常來說,這隻有在記錄産生速度大于發送速度的時候才能發生。然而,在某些條件下,用戶端将希望降低請求的數量,甚至降低到中等負載一下。這項設定将通過增加小的延遲來完成--即,不是立即發送一條記錄,producer将會等待給定的延遲時間以允許其他消息記錄發送,這些消息記錄可以批量處理。這可以認為是TCP種Nagle的算法類似。這項設定設定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他将會立即發送而不顧這項設定,然而如果我們獲得消息位元組數比這項設定要小的多,我們需要“linger”特定的時間以擷取更多的消息。 這個設定預設為0,即沒有延遲。設定linger.ms=5,例如,将會減少請求數目,但是同時會增加5ms的延遲。 |
max.request.size | int | 1028576 | medium | 請求的最大位元組數。這也是對最大記錄尺寸的有效覆寫。注意:server具有自己對消息記錄尺寸的覆寫,這些尺寸和這個設定不同。此項設定将會限制producer每次批量發送請求的數目,以防發出巨量的請求。 |
receive.buffer.bytes | int | 32768 | medium | TCP receive緩存大小,當閱讀資料時使用 |
send.buffer.bytes | int | 131072 | medium | TCP send緩存大小,當發送資料時使用 |
timeout.ms | int | 30000 | medium | 此配置選項控制server等待來自followers的确認的最大時間。如果确認的請求數目在此時間内沒有實作,則會傳回一個錯誤。這個逾時限制是以server端度量的,沒有包含請求的網絡延遲 |
block.on.buffer.full | boolean | true | low | 當我們記憶體緩存用盡時,必須停止接收新消息記錄或者抛出錯誤。預設情況下,這個設定為真,然而某些阻塞可能不值得期待,是以立即抛出錯誤更好。設定為false則會這樣:producer會抛出一個異常錯誤:BufferExhaustedException, 如果記錄已經發送同時緩存已滿 |
metadata.fetch.timeout.ms | long | 60000 | low | 是指我們所擷取的一些元素據的第一個時間資料。元素據包含:topic,host,partitions。此項配置是指當等待元素據fetch成功完成所需要的時間,否則會跑出異常給用戶端。 |
metadata.max.age.ms | long | 300000 | low | 以微秒為機關的時間,是在我們強制更新metadata的時間間隔。即使我們沒有看到任何partition leadership改變。 |
metric.reporters | list | [] | low | 類的清單,用于衡量名額。實作MetricReporter接口,将允許增加一些類,這些類在新的衡量名額産生時就會改變。JmxReporter總會包含用于注冊JMX統計 |
metrics.num.samples | int | 2 | low | 用于維護metrics的樣本數 |
metrics.sample.window.ms | long | 30000 | low | metrics系統維護可配置的樣本數量,在一個可修正的window size。這項配置配置了視窗大小,例如。我們可能在30s的期間維護兩個樣本。當一個視窗推出後,我們會擦除并重寫最老的視窗 |
recoonect.backoff.ms | long | 10 | low | 連接配接失敗時,當我們重新連接配接時的等待時間。這避免了用戶端反複重連 |
retry.backoff.ms | long | 100 | low | 在試圖重試失敗的produce請求之前的等待時間。避免陷入發送-失敗的死循環中。 |
zookeeper配置檔案詳解
參數名 | 說明 |
clientPort | 用戶端連接配接server的端口,預設是2181 |
dataDir | 存儲快照檔案snapshot的目錄。預設情況下,事務日志也會存儲在這裡。建議同時配置參數dataLogDir, 事務日志的寫性能直接影響zk性能。 |
tickTime | ZK中伺服器之間的心跳時間間隔。預設是2000毫秒 |
dataLogDir | 事務日志輸出目錄。盡量給事務日志的輸出配置單獨的磁盤或是挂載點,這将極大的提升ZK性能。 |
globalOutstandingLimit | 最大請求堆積數。預設是1000。ZK運作的時候, 盡管server已經沒有空閑來處理更多的用戶端請求了,但是還是允許用戶端将請求送出到伺服器上來,以提高吞吐性能。當然,為了防止Server記憶體溢出,這個請求堆積數還是需要限制下的。 |
preAllocSize | 預先開辟磁盤空間,用于後續寫入事務日志。預設是64M,每個事務日志大小就是64M。如果ZK的快照頻率較大的話,建議适當減小這個參數。 |
snapCount | 每進行snapCount次事務日志輸出後,觸發一次快照(snapshot), 此時,ZK會生成一個snapshot.*檔案,同時建立一個新的事務日志檔案log.*。預設是100000.(真正的代碼實作中,會進行一定的随機數處理,以避免所有伺服器在同一時間進行快照而影響性能) |
traceFile | 用于記錄所有請求的log,一般調試過程中可以使用,但是生産環境不建議使用,會嚴重影響性能。 |
maxClientCnxns | 單個用戶端與單台伺服器之間的連接配接數的限制,是ip級别的,預設是60,如果設定為0,那麼表明不作任何限制。請注意這個限制的使用範圍,僅僅是單台用戶端機器與單台ZK伺服器之間的連接配接數限制,不是針對指定用戶端IP,也不是ZK叢集的連接配接數限制,也不是單台ZK對所有用戶端的連接配接數限制。 |
clientPortAddress | 對于多網卡的機器,可以為每個IP指定不同的監聽端口。預設情況是所有IP都監聽 clientPort指定的端口。 New in 3.3.0 |
minSessionTimeoutmaxSessionTimeout | Session逾時時間限制,如果用戶端設定的逾時時間不在這個範圍,那麼會被強制設定為最大或最小時間。預設的Session逾時時間是在2 * tickTime ~ 20 * tickTime 這個範圍 New in 3.3.0 |
fsync.warningthresholdms | 事務日志輸出時,如果調用fsync方法超過指定的逾時時間,那麼會在日志中輸出警告資訊。預設是1000ms |
autopurge.purgeInterval | 在上文中已經提到,3.4.0及之後版本,ZK提供了自動清理事務日志和快照檔案的功能,這個參數指定了清理頻率,機關是小時,需要配置一個1或更大的整數,預設是0,表示不開啟自動清理功能。 |
autopurge.snapRetainCount | 這個參數和上面的參數搭配使用,這個參數指定了需要保留的檔案數目。預設是保留3個。 |
electionAlg | 在之前的版本中, 這個參數配置是允許我們選擇leader選舉算法,但是由于在以後的版本中,隻會留下一種“TCP-based version of fast leader election”算法,是以這個參數目前看來沒有用了,這裡也不詳細展開說了。 |
initLimit | Follower在啟動過程中,會從Leader同步所有最新資料,然後确定自己能夠對外服務的起始狀态。Leader允許F在initLimit時間内完成這個工作。通常情況下,我們不用太在意這個參數的設定。如果ZK叢集的資料量确實很大了,F在啟動的時候,從Leader上同步資料的時間也會相應變長,是以在這種情況下,有必要适當調大這個參數了。 |
syncLimit | 在運作過程中,Leader負責與ZK叢集中所有機器進行通信,例如通過一些心跳檢測機制,來檢測機器的存活狀态。如果L發出心跳包在syncLimit之後,還沒有從F那裡收到響應,那麼就認為這個F已經不線上了。注意:不要把這個參數設定得過大,否則可能會掩蓋一些問題。 |
leaderServes | 預設情況下,Leader是會接受用戶端連接配接,并提供正常的讀寫服務。但是,如果你想讓Leader專注于叢集中機器的協調,那麼可以将這個參數設定為no,這樣一來,會大大提高寫操作的性能。 |
server.x=[hostname]:nnnnn[:nnnnn] | 這裡的x是一個數字,與myid檔案中的id是一緻的。右邊可以配置兩個端口,第一個端口用于F和L之間的資料同步和其它通信,第二個端口用于Leader選舉過程中投票通信。 |
group.x=nnnnn[:nnnnn]weight.x=nnnnn | 對機器分組和權重設定 |
cnxTimeout | Leader選舉過程中,打開一次連接配接的逾時時間,預設是5s。 |
zookeeper.DigestAuthenticationProvider .superDigest | ZK權限設定相關,具體參見 《 使用super 和 《 ZooKeeper 權限控制》 |
skipACL | 對所有用戶端請求都不作ACL檢查。如果之前節點上設定有權限限制,一旦伺服器上打開這個開頭,那麼也将失效。 |
forceSync | 這個參數确定了是否需要在事務日志送出的時候調用FileChannel.force來保證資料完全同步到磁盤。 |
jute.maxbuffer | 每個節點最大資料量,是預設是1M。這個限制必須在server和client端都進行設定才會生效。 |
server.1=192.168.163.11:2888:3888 server.2=192.168.163.30:2888:3888 server.3=192.168.163.50:2888:3888 | zookeeper叢集中必須有的節點都加入到配置檔案中 |