玩轉Kafka—初步使用
官方文檔:http://kafka.apache.org/
中文文檔:https://kafka.apachecn.org/
1 簡單介紹
首先是一些概念:
- Kafka作為一個叢集,運作在一台或者多台伺服器上.
- Kafka 通過topic對存儲的流資料進行分類。
- 每條記錄中包含一個key,一個value和一個timestamp(時間戳)。
Kafka有四個核心的API:
- TheProducer API 允許一個應用程式釋出一串流式的資料到一個或者多個Kafka topic。
- TheConsumer API 允許一個應用程式訂閱一個或多個 topic ,并且對釋出給他們的流式資料進行處理。
- TheStreams API 允許一個應用程式作為一個流處理器,消費一個或者多個topic産生的輸入流,然後生産一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。
- TheConnector API 允許建構并運作可重用的生産者或者消費者,将Kafka topics連接配接到已存在的應用程式或者資料系統。比如,連接配接到一個關系型資料庫,捕捉表(table)的所有變更内容。
支援的語言(除了Java之外的):
常見概念:
1 Topics和日志
讓我們首先深入了解下Kafka的核心概念:
提供一串流式的記錄— topic
Topic 就是資料主題,是資料記錄釋出的地方,可以用來區分業務系統。Kafka中的Topics總是多訂閱者模式,一個topic可以擁有一個或者多個消費者來訂閱它的資料。
對于每一個topic, Kafka叢集都會維持一個分區日志,如下所示:
每個分區都是有序且順序不可變的記錄集,并且不斷地追加到結構化的commit log檔案。
分區中的每一個記錄都會配置設定一個id号來表示順序,我們稱之為offset,offset用來唯一的辨別分區中每一條記錄。
Kafka 叢集保留所有釋出的記錄—無論他們是否已被消費—并通過一個可配置的參數——保留期限來控制. 舉個例子, 如果保留政策設定為2天,一條記錄釋出後兩天内,可以随時被消費,兩天過後這條記錄會被抛棄并釋放磁盤空間。Kafka的
性能和資料大小無關
,是以長時間存儲資料沒有什麼問題.
日志中的 partition(分區)有以下幾個用途。第一,當日志大小超過了單台伺服器的限制,允許日志進行擴充。每個單獨的分區都必須受限于主機的檔案限制,不過一個主題可能有多個分區,是以可以處理無限量的資料。第二,可以作為并行的單元集—關于這一點,更多細節如下
2 分布式
日志的分區partition (分布)在Kafka叢集的伺服器上。每個伺服器在處理資料和請求時,共享這些分區。每一個分區都會在已配置的伺服器上進行備份,確定容錯性.
每個分區都有一台 server 作為 “leader”,零台或者多台server作為 follwers 。leader server 處理一切對 partition (分區)的讀寫請求,而follwers隻需被動的同步leader上的資料。當leader當機了,followers 中的一台伺服器會自動成為新的 leader。每台 server 都會成為某些分區的 leader 和某些分區的 follower,是以叢集的負載是平衡的。
3 生産者
生産者可以将資料釋出到所選擇的topic中。生産者負責将記錄配置設定到topic的哪一個 partition(分區)中。可以使用循環的方式來簡單地實作負載均衡,也可以根據某些語義分區函數(例如:記錄中的key)來完成。下面會介紹更多關于分區的使用。
4 消費者
消費者使用一個 消費組 名稱來進行辨別,釋出到topic中的每條記錄被配置設定給訂閱消費組中的一個消費者執行個體.消費者執行個體可以分布在多個程序中或者多個機器上。
如果所有的消費者執行個體在同一消費組中,消息記錄會負載平衡到每一個消費者執行個體.
如果所有的消費者執行個體在不同的消費組中,每條消息記錄會廣播到所有的消費者程序.
如圖,這個 Kafka 叢集有兩台 server 的,四個分區(p0-p3)和兩個消費者組。消費組A有兩個消費者,消費組B有四個消費者。
通常情況下,每個 topic 都會有一些消費組,一個消費組對應一個"邏輯訂閱者"。一個消費組由許多消費者執行個體組成,便于擴充和容錯。這就是釋出和訂閱的概念,隻不過訂閱者是一組消費者而不是單個的程序。
在Kafka中實作消費的方式是将日志中的分區劃分到每一個消費者執行個體上,以便在任何時間,每個執行個體都是分區唯一的消費者。維護消費組中的消費關系由Kafka協定動态處理。如果新的執行個體加入組,他們将從組中其他成員處接管一些 partition 分區;如果一個執行個體消失,擁有的分區将被分發到剩餘的執行個體。
Kafka 隻保證分區内的記錄是有序的,而不保證主題中不同分區的順序。每個 partition 分區按照key值排序足以滿足大多數應用程式的需求。但如果你需要總記錄在所有記錄的上面,可使用僅有一個分區的主題來實作,這意味着每個消費者組隻有一個消費者程序。
保證
high-level Kafka給予以下保證:
- 生産者發送到特定topic partition 的消息将按照發送的順序處理。 也就是說,如果記錄M1和記錄M2由相同的生産者發送,并先發送M1記錄,那麼M1的偏移比M2小,并在日志中較早出現
- 一個消費者執行個體按照日志中的順序檢視記錄.
- 對于具有N個副本的主題,我們最多容忍N-1個伺服器故障,進而保證不會丢失任何送出到日志中的記錄.
關于保證的更多細節可以看文檔的設計部分。
2 下載下傳安裝
Kafka依賴于Zookeeper,而Zookeeper又依賴于Java,是以在使用Kafka之前要安裝jdk1.8的環境和啟動zookeeper伺服器。
下載下傳或安裝位址:
- JDK1.8:
- Zookeeper:
- Kafka:https://kafka.apachecn.org/downloads.html
好,下面我們開始進行安裝
[root@iZ2ze4m2ri7irkf6h6n8zoZ local]# tar -zxf kafka_2.11-1.0.0.tgz
[root@iZ2ze4m2ri7irkf6h6n8zoZ local]# mv kafka_2.11-1.0.0 kafka-2.11
3 基本使用
3.1 啟動Kafka
首先檢查下自己的jdk 是否安裝:
[root@iZ2ze4m2ri7irkf6h6n8zoZ local]# java -version
java version "1.8.0_144"
Java(TM) SE Runtime Environment (build 1.8.0_144-b01)
Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)
啟動Zookeeper:
[root@iZ2ze4m2ri7irkf6h6n8zoZ zookeeper-3.5.9]# ls
bin conf docs lib LICENSE.txt NOTICE.txt README.md README_packaging.txt
[root@iZ2ze4m2ri7irkf6h6n8zoZ zookeeper-3.5.9]# cd conf/
[root@iZ2ze4m2ri7irkf6h6n8zoZ conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
[root@iZ2ze4m2ri7irkf6h6n8zoZ conf]# cp zoo_sample.cfg zoo.cfg
[root@iZ2ze4m2ri7irkf6h6n8zoZ conf]# cd ../bin/
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ls
README.txt zkCli.cmd zkEnv.cmd zkServer.cmd zkServer.sh zkTxnLogToolkit.sh
zkCleanup.sh zkCli.sh zkEnv.sh zkServer-initialize.sh zkTxnLogToolkit.cmd
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./zkServer.
zkServer.cmd zkServer.sh
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.9/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
啟動Kafka:
[root@iZ2ze4m2ri7irkf6h6n8zoZ kafka-2.11]# ls
bin config libs LICENSE NOTICE site-docs
[root@iZ2ze4m2ri7irkf6h6n8zoZ kafka-2.11]# cd config/
[root@iZ2ze4m2ri7irkf6h6n8zoZ config]# ls
connect-console-sink.properties connect-file-source.properties log4j.properties zookeeper.properties
connect-console-source.properties connect-log4j.properties producer.properties
connect-distributed.properties connect-standalone.properties server.properties
connect-file-sink.properties consumer.properties tools-log4j.properties
[root@iZ2ze4m2ri7irkf6h6n8zoZ config]# cd ../bin/
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-server-start.sh ../config/server.properties
[2021-11-20 10:21:10,326] INFO KafkaConfig values:
......
[2021-11-20 10:21:12,423] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-11-20 10:21:12,423] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2021-11-20 10:21:12,424] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
3.2 簡單測試使用
建立和檢視topic
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ymx
Created topic "ymx".
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
ymx
生産者發送消息:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic ymx
>Hello Kafka!
>Hello Ymx!
>Hello Kafka and Ymx!
>
消費者消費消息:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ymx --from-beginning
Hello Kafka!
Hello Ymx!
Hello Kafka and Ymx!
3.3 搭建多代理叢集
3.3.1 開始搭建
首先要copy下配置檔案
[root@iZ2ze4m2ri7irkf6h6n8zoZ config]# cp server.properties server-01.properties
[root@iZ2ze4m2ri7irkf6h6n8zoZ config]# cp server.properties server-02.properties
[root@iZ2ze4m2ri7irkf6h6n8zoZ config]# vim server-01.properties
#### 内容開始 ####
broker.id=1 # 21行左右,broker的唯一辨別(同一個叢集中)
listeners=PLAINTEXT://:9093 # 31行左右,放開,代表kafka的端口号
log.dirs=/tmp/kafka-logs-01 # 60行左右,用逗号分隔的目錄清單,在其中存儲日志檔案
#### 内容結束 ####
[root@iZ2ze4m2ri7irkf6h6n8zoZ config]# vim server-02.properties
#### 内容開始 ####
broker.id=2 # 21行左右,broker的唯一辨別(同一個叢集中)
listeners=PLAINTEXT://:9094 # 31行左右,放開,代表kafka的端口号
log.dirs=/tmp/kafka-logs-02 # 60行左右,用逗号分隔的目錄清單,在其中存儲日志檔案
#### 内容結束 ####
根據配置檔案啟動Kafka(同一主機下)
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties
報錯資訊:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/kafka-2.11/bin/hs_err_pid4036.log
原因
:實體機或虛拟機記憶體不足,不足以保證Kafka啟動或運作時需要的内容容量
解決方式:
- 增加實體機或虛拟機的記憶體
- 減少Kafka啟動所需内容的配置,将要修改的檔案為
kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx512M -Xms256M" #29行左右
3.3.2 使用
解決好之後我們開始啟動:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties
[2021-11-20 10:58:33,138] INFO KafkaConfig values:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-server-start.sh ../config/server-02.properties
[2021-11-20 10:59:04,187] INFO KafkaConfig values:
ps:看下我們的阿裡雲伺服器的狀況
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic mr-yan
Created topic "mr-yan".
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mr-yan
Topic:mr-yan PartitionCount:1 ReplicationFactor:3 Configs:
Topic: mr-yan Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
- PartitionCount:主題分區數。
- ReplicationFactor:用來設定主題的副本數。
- leader:是負責給定分區所有讀寫操作的節點。每個節點都是随機選擇的部分分區的上司者。
- replicas:是複制分區日志的節點清單,不管這些節點是leader還是僅僅活着。
- isr:是一組“同步”replicas,是replicas清單的子集,它活着并被指到leader。
進行叢集環境下的使用:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic mr-yan
>Hello Kafkas!
>Hello Mr.Yan
>
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mr-yan
Hello Kafkas!
Hello Mr.Yan
3.3.3 驗證容錯性
首先我們停掉一個Kafka的Broker:
[root@iZ2ze4m2ri7irkf6h6n8zoZ ~]# ps -ef|grep server-01.properties
root 19859 28247 1 10:58 pts/3 ../config/server-01.properties
root 23934 16569 0 11:12 pts/11 00:00:00 grep --color=auto server-01.properties
[root@iZ2ze4m2ri7irkf6h6n8zoZ ~]# kill -9 28247
[root@iZ2ze4m2ri7irkf6h6n8zoZ ~]# ps -ef|grep server-01.properties
root 32604 16569 0 11:13 pts/11 00:00:00 grep --color=auto server-01.properties
[root@iZ2ze4m2ri7irkf6h6n8zoZ ~]# cd /usr/local/kafka-2.11/bin/
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mr-yan
Topic:mr-yan PartitionCount:1 ReplicationFactor:3 Configs:
Topic: mr-yan Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2
檢視生産者和消費者的變化,并再次使用,發現仍可以進行使用
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic mr-yan
>Hello Kafkas!
>Hello Mr.Yan
>[2021-11-20 11:12:28,881] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
>Hello Kafkas too!
>Hello Mr.Yan too!
>
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mr-yan
Hello Kafkas!
Hello Mr.Yan
[2021-11-20 11:12:28,812] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22158] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-11-20 11:12:29,165] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22158] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Hello Kafkas too!
Hello Mr.Yan too!
4 小總結
主題,分區,副本的概念
Kafka是根據主題(topic)進行消息的傳遞,但是又有分區和副本的概念,下面來分别解釋下:
- 分區:kafka對每一條消息的key做一個hashcode運算,然後将得到的數值對分區數量進行模運算就得到了這條消息所在分區的數字。
- 副本:同一分區的幾個副本之間儲存的是相同的資料,副本之間的關系是“一主多從”,其中的主(leader)則負責對外提供讀寫操作的服務,而從(follower)則負責與主節點同步資料,當主節點當機,從節點之間能重新選舉leader進行對外服務。
kafka會保證同一個分區内的消息有序,但是不保證主題内的消息有序。