一、Kafka介紹
Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多生産者、多訂閱者,基于zookeeper協調的分布式日志系統(也可以當做MQ系統),常見可以用于web/nginx日志、通路日志,消息服務等等,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。
主要應用場景是:日志收集系統和消息系統。
Kafka主要設計目标如下:
- 以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上資料也能保證常數時間的通路性能。
- 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條消息的傳輸。
- 支援Kafka Server間的消息分區,及分布式消費,同時保證每個partition内的消息順序傳輸。
- 同時支援離線資料處理和實時資料處理。
- 支援線上水準擴充
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLwsGVPhHNXpVesd1YwBnMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL2UTMzAjN1MjM5AzMwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
消息模式:
- 有兩種主要的消息傳遞模式:點對點傳遞模式、釋出-訂閱模式。大部分的消息系統選用釋出-訂閱模式。Kafka就是一種釋出-訂閱模式。
- 對于消息中間件,消息分推拉兩種模式。Kafka隻有消息的拉取,沒有推送,可以通過輪詢實作消息的推送。
特點:
- Kafka在一個或多個可以跨越多個資料中心的伺服器上作為叢集運作。
- Kafka叢集中按照主題分類管理,一個主題可以有多個分區,一個分區可以有多個副本分區。
- 每個記錄由一個鍵,一個值和一個時間戳組成。
Kafka具有四個核心API:
- Producer API:允許應用程式将記錄流釋出到一個或多個Kafka主題。
- Consumer API:允許應用程式訂閱一個或多個主題并處理為其生成的記錄流。
- Streams API:允許應用程式充當流處理器,使用一個或多個主題的輸入流,并生成一個或多個輸出主題的輸出流,進而有效地将輸入流轉換為輸出流。
- Connector API:允許建構和運作将Kafka主題連接配接到現有應用程式或資料系統的可重用生産者或使用者。例如,關系資料庫的連接配接器可能會捕獲對表的所有更改。
二、Kafka優勢
- 高吞吐量:單機每秒處理幾十上百萬的消息量。即使存儲了許多TB的消息,它也保持穩定的性能。
- 高性能:單節點支援上千個用戶端,并保證零停機和零資料丢失。
-
持久化資料存儲:将消息持久化到磁盤。通過将資料持久化到硬碟以及replication防止資料丢失。
1、零拷貝
2、順序讀,順序寫
3、利用Linux的頁緩存
- 分布式系統,易于向外擴充。所有的Producer、Broker和Consumer都會有多個,均為分布式的。無需停機即可擴充機器。多個Producer、Consumer可能是不同的應用。
- 可靠性 - Kafka是分布式,分區,複制和容錯的。
- 用戶端狀态維護:消息被處理的狀态是在Consumer端維護,而不是由server端維護。當失敗時能自動平衡。
- 支援online和offline的場景。
- 支援多種用戶端語言。Kafka支援Java、.NET、PHP、Python等多種語言。
三、Kafka應用場景
- 日志收集:一個公司可以用Kafka可以收集各種服務的Log,通過Kafka以統一接口服務的方式開放給各種Consumer;
- 消息系統:解耦生産者和消費者、緩存消息等;
- 使用者活動跟蹤:Kafka經常被用來記錄Web使用者或者App使用者的各種活動,如浏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到Kafka的Topic中,然後消費者通過訂閱這些Topic來做實時的監控分析,亦可儲存到資料庫;
- 營運名額:Kafka也經常用來記錄營運監控資料。包括收集各種分布式應用的資料,生産各種操作的集中回報,比如報警和報告;
- 流式處理:比如Spark Streaming和Storm。
四、基本架構
1. 消息和批次
Kafka的資料單元稱為消息。可以把消息看成是資料庫裡的一個“資料行”或一條“記錄”。消息由位元組數組組成。
消息有鍵,鍵也是一個位元組數組。當消息以一種可控的方式寫入不同的分區時,會用到鍵。
為了提高效率,消息被分批寫入Kafka。批次就是一組消息,這些消息屬于同一個主題和分區。
把消息分成批次可以減少網絡開銷。批次越大,機關時間内處理的消息就越多,單個消息的傳輸時間就越長。批次資料會被壓縮,這樣可以提升資料的傳輸和存儲能力,但是需要更多的計算處理。
2. 模式
消息模式(schema)有許多可用的選項,以便于了解。如JSON和XML,但是它們缺乏強類型處理能力。Kafka的許多開發者喜歡使用Apache Avro。Avro提供了一種緊湊的序列化格式,模式和消息體分開。當模式發生變化時,不需要重新生成代碼,它還支援強類型和模式進化,其版本既向前相容,也向後相容。
資料格式的一緻性對Kafka很重要,因為它消除了消息讀寫操作之間的耦合性。
3. 主題和分區
Kafka的消息通過主題進行分類。主題可比是資料庫的表或者檔案系統裡的檔案夾。主題可以被分為若幹分區,一個主題通過分區分布于Kafka叢集中,提供了橫向擴充的能力。類似于mysql得分庫分表。
4. 生産者和消費者
生産者建立消息。消費者消費消息。
一個消息被釋出到一個特定的主題上。
生産者在預設情況下把消息均衡地分布到主題的所有分區上:
- 直接指定消息的分區
- 根據消息的key散列取模得出分區
- 輪詢指定分區。
消費者通過偏移量來區分已經讀過的消息,進而消費消息。會在zookeeper記錄偏移量。
消費者是消費組的一部分。消費組保證每個分區隻能被一個消費者使用,避免重複消費。
5. broker和叢集
一個獨立的Kafka伺服器稱為broker。broker接收來自生産者的消息,為消息設定偏移量,并送出消息到磁盤儲存。broker為消費者提供服務,對讀取分區的請求做出響應,傳回已經送出到磁盤上的消息。單個broker可以輕松處理數千個分區以及每秒百萬級的消息量。
每個叢集都有一個broker是叢集控制器(自動從叢集的活躍成員中選舉出來)。
控制器負責管理工作:
- 将分區配置設定給broker
- 監控broker
叢集中一個分區屬于一個broker,該broker稱為分區首領。
一個分區可以配置設定給多個broker,此時會發生分區複制。
分區的複制提供了消息備援,高可用。副本分區不負責處理消息的讀寫。
五、kafka腳本
1、kafka-topics.sh 用于管理主題。
# 列出現有的主題
[[email protected] ~]# kafka-topics.sh --list --zookeeper localhost:2181/myKafka
# 建立主題,該主題包含一個分區,該分區為Leader分區,它沒有Follower分區副本。
[[email protected] ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_1 --
partitions 1 --replication-factor 1
# 檢視分區資訊
[[email protected] ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --list
# 檢視指定主題的詳細資訊
[[email protected] ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_1
# 删除指定主題
[[email protected] ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_1
2、kafka-console-producer.sh用于生産消息:
# 開啟生産者
[[email protected] ~]# kafka-console-producer.sh --topic topic_1 --broker-list localhost:9020
3、kafka-console-consumer.sh用于消費消息:
# 開啟消費者
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1
# 開啟消費者方式二,從頭消費,不按照偏移量消費
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1 --from-beginning
六、Kafka單機安裝與配置
1、Java環境為前提
- 上傳jdk-8u261-linux-x64.rpm到伺服器并安裝
- 配置環境變量
2、Zookeeper的安裝配置
- 上傳zookeeper-3.4.14.tar.gz到伺服器
- 解壓到/opt
-
修改Zookeeper儲存資料的目錄,dataDir:
dataDir=/var/lagou/zookeeper/data
- 編輯/etc/profile
-
啟動Zookeeper:
zkServer.sh start
3、Kafka的安裝與配置
- 上傳kafka_2.12-1.0.2.tgz到伺服器并解壓
- 配置環境變量并生效
-
配置/opt/kafka_2.12-1.0.2/config中的server.properties檔案
Kafka連接配接Zookeeper的位址,此處使用本地啟動的Zookeeper執行個體,連接配接位址是localhost:2181
kafka 學習 -
啟動Kafka
kafka-server-start.sh [-daemon] /opt/kafka_2.12-1.0.2/config/server.properties
kafka 學習 - 在zookeeper中檢視相關資訊
kafka 學習
七、叢集搭建
1、 搭建設計
2、配置設定三台Linux,用于安裝擁有三個節點的Kafka叢集。
- slave1(192.168.192.131)
- slave2(192.168.192.132)
- slave3(192.168.192.134)
以上三台主機的/etc/hosts配置:
192.168.192.131 node2
192.168.192.132 node3
192.168.192.134 node4
1、Zookeeper叢集搭建
上傳JDK到linux,安裝并配置JDK
# 使用rpm安裝JDK
rpm -ivh jdk-8u261-linux-x64.rpm
# 預設的安裝路徑是/usr/java/jdk1.8.0_261-amd64
# 配置JAVA_HOME
vim /etc/profile
# 檔案最後添加兩行
export JAVA_HOME=/usr/java/jdk1.8.0_261-amd64
export PATH=$PATH:$JAVA_HOME/bin
# 退出vim,使配置生效
source /etc/profile
檢視JDK是否正确安裝
java -version
上傳zookeeper-3.4.14.tar.gz到Linux,解壓并配置zookeeper
# node2操作
# 解壓到/opt目錄
tar -zxf zookeeper-3.4.14.tar.gz -C /opt
# 配置
cd /opt/zookeeper-3.4.14/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 設定
dataDir=/var/lagou/zookeeper/data
# 添加
server.1=node2:2881:3881
server.2=node3:2881:3881
server.3=node4:2881:3881
# 退出vim
mkdir -p /var/lagou/zookeeper/data
echo 1 > /var/lagou/zookeeper/data/myid
# 配置環境變量
vim /etc/profile
# 添加
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log
# 退出vim,讓配置生效
source /etc/profile
# 将/opt/zookeeper-3.4.14拷貝到node3,node4
scp -r /opt/zookeeper-3.4.14/ node3:/opt
scp -r /opt/zookeeper-3.4.14/ node4:/opt
node3配置
# 配置環境變量
vim /etc/profile
# 在配置JDK環境變量基礎上,添加内容
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log
# 退出vim,讓配置生效
source /etc/profile
mkdir -p /var/lagou/zookeeper/data
echo 2 > /var/lagou/zookeeper/data/myid
node4配置
# 配置環境變量
vim /etc/profile
# 在配置JDK環境變量基礎上,添加内容
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log
# 退出vim,讓配置生效
source /etc/profile
mkdir -p /var/lagou/zookeeper/data
echo 3 > /var/lagou/zookeeper/data/myid
啟動zookeeper
# 在三台Linux上啟動Zookeeper
[[email protected] ~]# zkServer.sh start
[[email protected] ~]# zkServer.sh start
[[email protected] ~]# zkServer.sh start
# 在三台Linux上檢視Zookeeper的狀态
[[email protected] ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower
[[email protected] ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: leader
[[email protected] ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower
2、Kafka叢集搭建
上傳并解壓Kafka到/opt
# 解壓到/opt
tar -zxf kafka_2.12-1.0.2.tgz -C /opt
# 拷貝到node3和node4
scp -r /opt/kafka_2.12-1.0.2/ node3:/opt
scp -r /opt/kafka_2.12-1.0.2/ node4:/opt
配置Kafka
# 配置環境變量,三台Linux都要配置
vim /etc/profile
# 添加以下内容:
export KAFKA_HOME=/opt/kafka_2.12-1.0.2
export PATH=$PATH:$KAFKA_HOME/bin
# 讓配置生效
source /etc/profile
# node2配置
vim /opt/kafka_2.12-1.0.2/config/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node2:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka
# 其他使用預設配置
# node3配置
vim /opt/kafka_2.12-1.0.2/config/server.properties
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node3:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka
# 其他使用預設配置
# node4配置
vim /opt/kafka_2.12-1.0.2/config/server.properties
broker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node4:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka
# 其他使用預設配置
啟動Kafka
kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
驗證Kafka
三個節點都會有相同得Cluster ID
zkCli.sh
# 檢視每個Broker的資訊
get /brokers/ids/0
get /brokers/ids/1
get /brokers/ids/2
八、擴充
kafka代碼實戰
kafka日志收集實戰