一、對kafka的了解
1.Kafka是一個分布式消息隊列
1)kafka 推送消息模式
(1)點對點模式--pull拉(一對一,消費者主動拉取資料,消息收到後消息清除)
點對點模型通常用戶端輪詢或定時從隊列中拉去消息,處理完之後向隊列發送ack确認接收到消息,消息将清除。一個消息有且隻能被一個消費者消費。
(2)釋出/訂閱模式--push推(一對多,資料生産後,推送給所有訂閱者)
釋出訂閱模型則是一個基于推送的消息傳送模型。釋出訂閱模型可以有多種不同的訂閱者,臨時訂閱者隻在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的所有消息,即使目前訂閱者不可用,處于離線狀态。
2)kafka子產品了解
1.broker :
是一個程序,一個服務,一般一台機器一個broker
2.Topic:
(1)話題,同一個kafka叢集可以共同維護一個Topic(例如TopicA),
(2)多台機器,多個broker,多個partition可以維護一個話題
3.Partition:
(1)分區,一個Topic可以有無限個分區,是
(2)分區可以有主從Leader和Fllower(kafka的資料備援機制),同一個話題(例如TopicA),不能有同名的Partition(例Partition0)。如果有那必須是主從關系。
隻有Leader才能與消費者進行互動,Fllower隻是備份。
(3)消費者消費的最小單元,是分區。consumer會與一個分區建立socket連接配接。

4.Consumer group(消費者組)
(1) 同一個組裡,同一時刻,隻能有一個消費者消費消息
(2) 同一個組裡,消費者不能重複消費同一個消息
(3) 消費者會和一個Partition建立socket建立連接配接。
(4) group 維持了消費者的分布式,想消費同一個消息需要在不同的group。
5.Zookpper的作用
1.儲存kafka,各個broker的狀态
監聽kafka心跳包,如果kafka死了,通知Follwer成為Leader
2.儲存event偏移量
儲存讀取目前分區的隊列的位置(即 偏移量),同一個分區的資料是有序的。
6.半持久化
(1)消息資料,預設儲存7天,儲存在磁盤内
(2)consumer能看到的消息是producer,發送給leader分區并且被foller分區複制完。收到ack,producer再收到ack确認完以後的消息。
二、叢集部署
1.jar包下載下傳
http://kafka.apache.org/downloads.html
2.配置
1)解壓安裝包
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2)修改解壓後的檔案名稱
mv kafka_2.11-0.11.0.0/ kafka
3)在/opt/module/kafka目錄下建立logs檔案夾
[[email protected] kafka]$ mkdir logs
4)修改配置檔案
[[email protected] kafka]$ cd config/
[[email protected] config]$ vi server.properties
輸入以下内容:
#broker的全局唯一編号,不能重複
broker.id=0
#是否允許删除topic
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的線程數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的最大緩沖區大小
socket.request.max.bytes=104857600
#kafka運作日志存放的路徑
log.dirs=/opt/module/kafka/logs
#topic在目前broker上的分區個數
num.partitions=1
#用來恢複和清理data下資料的線程數量
num.recovery.threads.per.data.dir=1
#segment檔案保留的最長時間,逾時将被删除
log.retention.hours=168
#配置連接配接Zookeeper叢集位址
zookeeper.connect=(ip)102:2181,(ip)103:2181,(ip)104:2181
5)配置環境變量
[[email protected] module]# vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH= P A T H : PATH: PATH:KAFKA_HOME/bin
[[email protected] module]# source /etc/profile
6)分發安裝包
[[email protected] etc]# xsync profile
[[email protected] module]$ xsync kafka/
7)分别在103和104上修改配置檔案/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
注:broker.id不得重複
8)啟動叢集
依次在102、103、104節點上啟動kafka
[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &
[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &
[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &
9)關閉叢集
[[email protected] kafka]$ bin/kafka-server-stop.sh stop
[[email protected] kafka]$ bin/kafka-server-stop.sh stop
[[email protected] kafka]$ bin/kafka-server-stop.sh stop
3. Kafka指令行操作
1)檢視目前伺服器中的所有topic
[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper 102(ip):2181 --list
如果多個zookeepr:【例 bin/kafka-topics.sh --zookeeper 102(ip):2181,103(ip):2181 --list】
2)建立topic
[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper 102(ip):2181 --create --replication-factor 3 --partitions 1 --topic first
選項說明:
–topic 定義topic名
–replication-factor 定義副本數(備援數)
–partitions 定義分區數
3)删除topic
[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper (ip)102:2181 --delete --topic first
需要server.properties中設定delete.topic.enable=true否則隻是标記删除或者直接重新開機。
4)發送消息
[[email protected] kafka]$ bin/kafka-console-producer.sh --broker-list (ip)102:9092 --topic first
》hello world
》li li
5)消費消息
[[email protected] kafka]$ bin/kafka-console-consumer.sh --zookeeper (ip)102:2181 --from-beginning --topic first
–from-beginning:會把first主題中以往所有的資料都讀取出來。根據業務場景選擇是否增加該配置。
6)檢視某個Topic的詳情
[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper (ip)102:2181 --describe --topic first
4 Kafka配置資訊
4.1 Broker配置資訊
屬性 | 預設值 | 描述 |
---|---|---|
broker.id | 必填參數,broker的唯一辨別 | |
log.dirs | /tmp/kafka-logs | Kafka資料存放的目錄。可以指定多個目錄,中間用逗号分隔,當新partition被建立的時會被存放到目前存放partition最少的目錄。 |
port | 9092 | BrokerServer接受用戶端連接配接的端口号 |
zookeeper.connect | null | Zookeeper的連接配接串,格式為:hostname1:port1,hostname2:port2,hostname3:port3。可以填一個或多個,為了提高可靠性,建議都填上。注意,此配置允許我們指定一個zookeeper路徑來存放此kafka叢集的所有資料,為了與其他應用叢集區分開,建議在此配置中指定本叢集存放目錄,格式為:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消費者的參數要和此參數一緻。 |
queued.max.requests | 500 | I/O線程可以處理請求的隊列大小,若實際請求數超過此大小,網絡線程将停止接收新的請求。 |
socket.send.buffer.bytes | 100 * 1024 | The SO_SNDBUFF buffer the server prefers for socket connections. |
socket.receive.buffer.bytes | 100 * 1024 | The SO_RCVBUFF buffer the server prefers for socket connections. |
socket.request.max.bytes | 100 * 1024 * 1024 | 伺服器允許請求的最大值, 用來防止記憶體溢出,其值應該小于 Java heap size. |
num.partitions | 1 | 預設partition數量,如果topic在建立時沒有指定partition數量,預設使用此值,建議改為5 |
log.segment.bytes | 1024 * 1024 * 1024 | Segment檔案的大小,超過此值将會自動建立一個segment,此值可以被topic級别的參數覆寫。 |
log.roll.{ms,hours} | 24 * 7 hours | 建立segment檔案的時間,此值可以被topic級别的參數覆寫。 |
log.retention.{ms,minutes,hours} | 7 days | Kafka segment log的儲存周期,儲存周期超過此時間日志就會被删除。此參數可以被topic級别參數覆寫。資料量大時,建議減小此值。 |
log.retention.bytes | -1 | 每個partition的最大容量,若資料量超過此值,partition資料将會被删除。注意這個參數控制的是每個partition而不是topic。此參數可以被log級别參數覆寫。 |
log.retention.check.interval.ms | 5 minutes | 删除政策的檢查周期 |
auto.create.topics.enable | true | 自動建立topic參數,建議此值設定為false,嚴格控制topic管理,防止生産者錯寫topic。 |
default.replication.factor | 1 | 預設副本數量,建議改為2。 |
replica.lag.time.max.ms | 10000 | 在此視窗時間内沒有收到follower的fetch請求,leader會将其從ISR(in-sync replicas)中移除。 |
replica.lag.max.messages | 4000 | 如果replica節點落後leader節點此值大小的消息數量,leader節點就會将其從ISR中移除。 |
replica.socket.timeout.ms | 30 * 1000 | replica向leader發送請求的逾時時間。 |
replica.socket.receive.buffer.bytes | 64 * 1024 | The socket receive buffer for network requests to the leader for replicating data. |
replica.fetch.max.bytes | 1024 * 1024 | The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader. |
replica.fetch.wait.max.ms | 500 | The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader. |
num.replica.fetchers | 1 | Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker. |
fetch.purgatory.purge.interval.requests | 1000 | The purge interval (in number of requests) of the fetch request purgatory. |
zookeeper.session.timeout.ms | 6000 | ZooKeeper session 逾時時間。如果在此時間内server沒有向zookeeper發送心跳,zookeeper就會認為此節點已挂掉。 此值太低導緻節點容易被标記死亡;若太高,.會導緻太遲發現節點死亡。 |
zookeeper.connection.timeout.ms | 6000 | 用戶端連接配接zookeeper的逾時時間。 |
zookeeper.sync.time.ms | 2000 | H ZK follower落後 ZK leader的時間。 |
controlled.shutdown.enable | true | 允許broker shutdown。如果啟用,broker在關閉自己之前會把它上面的所有leaders轉移到其它brokers上,建議啟用,增加叢集穩定性。 |
auto.leader.rebalance.enable | true | If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available. |
leader.imbalance.per.broker.percentage | 10 | The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker. |
leader.imbalance.check.interval.seconds | 300 | The frequency with which to check for leader imbalance. |
offset.metadata.max.bytes | 4096 | The maximum amount of metadata to allow clients to save with their offsets. |
connections.max.idle.ms | 600000 | Idle connections timeout: the server socket processor threads close the connections that idle more than this. |
num.recovery.threads.per.data.dir | 1 | The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. |
unclean.leader.election.enable | true | Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss. |
delete.topic.enable | false | 啟用deletetopic參數,建議設定為true。 |
offsets.topic.num.partitions | 50 | The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200). |
offsets.topic.retention.minutes | 1440 | Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic. |
offsets.retention.check.interval.ms | 600000 | The frequency at which the offset manager checks for stale offsets. |
offsets.topic.replication.factor | 3 | The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas. |
offsets.topic.segment.bytes | 104857600 | Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads. |
offsets.load.buffer.size | 5242880 | An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache. |
offsets.commit.required.acks | -1 | The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden. |
offsets.commit.timeout.ms | 5000 | The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout. |