天天看點

kafka學習筆記一、對kafka的了解二、叢集部署

一、對kafka的了解

1.Kafka是一個分布式消息隊列
           

1)kafka 推送消息模式

(1)點對點模式--pull拉(一對一,消費者主動拉取資料,消息收到後消息清除)
   點對點模型通常用戶端輪詢或定時從隊列中拉去消息,處理完之後向隊列發送ack确認接收到消息,消息将清除。一個消息有且隻能被一個消費者消費。
  (2)釋出/訂閱模式--push推(一對多,資料生産後,推送給所有訂閱者)
   釋出訂閱模型則是一個基于推送的消息傳送模型。釋出訂閱模型可以有多種不同的訂閱者,臨時訂閱者隻在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的所有消息,即使目前訂閱者不可用,處于離線狀态。
           

2)kafka子產品了解

1.broker :

是一個程序,一個服務,一般一台機器一個broker
           

2.Topic:

(1)話題,同一個kafka叢集可以共同維護一個Topic(例如TopicA),
 (2)多台機器,多個broker,多個partition可以維護一個話題  
           

3.Partition:

(1)分區,一個Topic可以有無限個分區,是
   (2)分區可以有主從Leader和Fllower(kafka的資料備援機制),同一個話題(例如TopicA),不能有同名的Partition(例Partition0)。如果有那必須是主從關系。
   隻有Leader才能與消費者進行互動,Fllower隻是備份。
(3)消費者消費的最小單元,是分區。consumer會與一個分區建立socket連接配接。
           
kafka學習筆記一、對kafka的了解二、叢集部署

4.Consumer group(消費者組)

(1) 同一個組裡,同一時刻,隻能有一個消費者消費消息
(2) 同一個組裡,消費者不能重複消費同一個消息
(3) 消費者會和一個Partition建立socket建立連接配接。
(4) group 維持了消費者的分布式,想消費同一個消息需要在不同的group。
           

5.Zookpper的作用

1.儲存kafka,各個broker的狀态
監聽kafka心跳包,如果kafka死了,通知Follwer成為Leader
2.儲存event偏移量
儲存讀取目前分區的隊列的位置(即 偏移量),同一個分區的資料是有序的。
           

6.半持久化

(1)消息資料,預設儲存7天,儲存在磁盤内
(2)consumer能看到的消息是producer,發送給leader分區并且被foller分區複制完。收到ack,producer再收到ack确認完以後的消息。
           

二、叢集部署

1.jar包下載下傳

http://kafka.apache.org/downloads.html
           

2.配置

1)解壓安裝包

tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

2)修改解壓後的檔案名稱

mv kafka_2.11-0.11.0.0/ kafka

3)在/opt/module/kafka目錄下建立logs檔案夾

[[email protected] kafka]$ mkdir logs

4)修改配置檔案

[[email protected] kafka]$ cd config/

[[email protected] config]$ vi server.properties

輸入以下内容:

#broker的全局唯一編号,不能重複

broker.id=0

#是否允許删除topic

delete.topic.enable=true

#處理網絡請求的線程數量

num.network.threads=3

#用來處理磁盤IO的線程數量

num.io.threads=8

#發送套接字的緩沖區大小

socket.send.buffer.bytes=102400

#接收套接字的緩沖區大小

socket.receive.buffer.bytes=102400

#請求套接字的最大緩沖區大小

socket.request.max.bytes=104857600

#kafka運作日志存放的路徑

log.dirs=/opt/module/kafka/logs

#topic在目前broker上的分區個數

num.partitions=1

#用來恢複和清理data下資料的線程數量

num.recovery.threads.per.data.dir=1

#segment檔案保留的最長時間,逾時将被删除

log.retention.hours=168

#配置連接配接Zookeeper叢集位址

zookeeper.connect=(ip)102:2181,(ip)103:2181,(ip)104:2181

5)配置環境變量

[[email protected] module]# vi /etc/profile

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH= P A T H : PATH: PATH:KAFKA_HOME/bin

[[email protected] module]# source /etc/profile

6)分發安裝包

[[email protected] etc]# xsync profile

[[email protected] module]$ xsync kafka/

7)分别在103和104上修改配置檔案/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2

注:broker.id不得重複

8)啟動叢集

依次在102、103、104節點上啟動kafka

[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &

[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &

[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &

9)關閉叢集

[[email protected] kafka]$ bin/kafka-server-stop.sh stop

[[email protected] kafka]$ bin/kafka-server-stop.sh stop

[[email protected] kafka]$ bin/kafka-server-stop.sh stop

3. Kafka指令行操作

1)檢視目前伺服器中的所有topic

[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper 102(ip):2181 --list

如果多個zookeepr:【例 bin/kafka-topics.sh --zookeeper 102(ip):2181,103(ip):2181 --list】

2)建立topic

[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper 102(ip):2181 --create --replication-factor 3 --partitions 1 --topic first

選項說明:

–topic 定義topic名

–replication-factor 定義副本數(備援數)

–partitions 定義分區數

3)删除topic

[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper (ip)102:2181 --delete --topic first

需要server.properties中設定delete.topic.enable=true否則隻是标記删除或者直接重新開機。

4)發送消息

[[email protected] kafka]$ bin/kafka-console-producer.sh --broker-list (ip)102:9092 --topic first

》hello world

》li li

5)消費消息

[[email protected] kafka]$ bin/kafka-console-consumer.sh --zookeeper (ip)102:2181 --from-beginning --topic first

–from-beginning:會把first主題中以往所有的資料都讀取出來。根據業務場景選擇是否增加該配置。

6)檢視某個Topic的詳情

[[email protected] kafka]$ bin/kafka-topics.sh --zookeeper (ip)102:2181 --describe --topic first

4 Kafka配置資訊

4.1 Broker配置資訊

屬性 預設值 描述
broker.id 必填參數,broker的唯一辨別
log.dirs /tmp/kafka-logs Kafka資料存放的目錄。可以指定多個目錄,中間用逗号分隔,當新partition被建立的時會被存放到目前存放partition最少的目錄。
port 9092 BrokerServer接受用戶端連接配接的端口号
zookeeper.connect null Zookeeper的連接配接串,格式為:hostname1:port1,hostname2:port2,hostname3:port3。可以填一個或多個,為了提高可靠性,建議都填上。注意,此配置允許我們指定一個zookeeper路徑來存放此kafka叢集的所有資料,為了與其他應用叢集區分開,建議在此配置中指定本叢集存放目錄,格式為:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消費者的參數要和此參數一緻。
queued.max.requests 500 I/O線程可以處理請求的隊列大小,若實際請求數超過此大小,網絡線程将停止接收新的請求。
socket.send.buffer.bytes 100 * 1024 The SO_SNDBUFF buffer the server prefers for socket connections.
socket.receive.buffer.bytes 100 * 1024 The SO_RCVBUFF buffer the server prefers for socket connections.
socket.request.max.bytes 100 * 1024 * 1024 伺服器允許請求的最大值, 用來防止記憶體溢出,其值應該小于 Java heap size.
num.partitions 1 預設partition數量,如果topic在建立時沒有指定partition數量,預設使用此值,建議改為5
log.segment.bytes 1024 * 1024 * 1024 Segment檔案的大小,超過此值将會自動建立一個segment,此值可以被topic級别的參數覆寫。
log.roll.{ms,hours} 24 * 7 hours 建立segment檔案的時間,此值可以被topic級别的參數覆寫。
log.retention.{ms,minutes,hours} 7 days Kafka segment log的儲存周期,儲存周期超過此時間日志就會被删除。此參數可以被topic級别參數覆寫。資料量大時,建議減小此值。
log.retention.bytes -1 每個partition的最大容量,若資料量超過此值,partition資料将會被删除。注意這個參數控制的是每個partition而不是topic。此參數可以被log級别參數覆寫。
log.retention.check.interval.ms 5 minutes 删除政策的檢查周期
auto.create.topics.enable true 自動建立topic參數,建議此值設定為false,嚴格控制topic管理,防止生産者錯寫topic。
default.replication.factor 1 預設副本數量,建議改為2。
replica.lag.time.max.ms 10000 在此視窗時間内沒有收到follower的fetch請求,leader會将其從ISR(in-sync replicas)中移除。
replica.lag.max.messages 4000 如果replica節點落後leader節點此值大小的消息數量,leader節點就會将其從ISR中移除。
replica.socket.timeout.ms 30 * 1000 replica向leader發送請求的逾時時間。
replica.socket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests to the leader for replicating data.
replica.fetch.max.bytes 1024 * 1024 The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.
replica.fetch.wait.max.ms 500 The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.
num.replica.fetchers 1 Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.
fetch.purgatory.purge.interval.requests 1000 The purge interval (in number of requests) of the fetch request purgatory.
zookeeper.session.timeout.ms 6000 ZooKeeper session 逾時時間。如果在此時間内server沒有向zookeeper發送心跳,zookeeper就會認為此節點已挂掉。 此值太低導緻節點容易被标記死亡;若太高,.會導緻太遲發現節點死亡。
zookeeper.connection.timeout.ms 6000 用戶端連接配接zookeeper的逾時時間。
zookeeper.sync.time.ms 2000 H ZK follower落後 ZK leader的時間。
controlled.shutdown.enable true 允許broker shutdown。如果啟用,broker在關閉自己之前會把它上面的所有leaders轉移到其它brokers上,建議啟用,增加叢集穩定性。
auto.leader.rebalance.enable true If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available.
leader.imbalance.per.broker.percentage 10 The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker.
leader.imbalance.check.interval.seconds 300 The frequency with which to check for leader imbalance.
offset.metadata.max.bytes 4096 The maximum amount of metadata to allow clients to save with their offsets.
connections.max.idle.ms 600000 Idle connections timeout: the server socket processor threads close the connections that idle more than this.
num.recovery.threads.per.data.dir 1 The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
unclean.leader.election.enable true Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.
delete.topic.enable false 啟用deletetopic參數,建議設定為true。
offsets.topic.num.partitions 50 The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).
offsets.topic.retention.minutes 1440 Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.
offsets.retention.check.interval.ms 600000 The frequency at which the offset manager checks for stale offsets.
offsets.topic.replication.factor 3 The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.
offsets.topic.segment.bytes 104857600 Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.
offsets.load.buffer.size 5242880 An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache.
offsets.commit.required.acks -1 The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden.
offsets.commit.timeout.ms 5000 The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.

繼續閱讀