天天看點

kafka 學習

一、Kafka介紹

Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多生産者、多訂閱者,基于zookeeper協調的分布式日志系統(也可以當做MQ系統),常見可以用于web/nginx日志、通路日志,消息服務等等,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。

主要應用場景是:日志收集系統和消息系統。

Kafka主要設計目标如下:

  • 以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上資料也能保證常數時間的通路性能。
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條消息的傳輸。
  • 支援Kafka Server間的消息分區,及分布式消費,同時保證每個partition内的消息順序傳輸。
  • 同時支援離線資料處理和實時資料處理。
  • 支援線上水準擴充
kafka 學習

消息模式:

  • 有兩種主要的消息傳遞模式:點對點傳遞模式、釋出-訂閱模式。大部分的消息系統選用釋出-訂閱模式。Kafka就是一種釋出-訂閱模式。
  • 對于消息中間件,消息分推拉兩種模式。Kafka隻有消息的拉取,沒有推送,可以通過輪詢實作消息的推送。

特點:

  1. Kafka在一個或多個可以跨越多個資料中心的伺服器上作為叢集運作。
  2. Kafka叢集中按照主題分類管理,一個主題可以有多個分區,一個分區可以有多個副本分區。
  3. 每個記錄由一個鍵,一個值和一個時間戳組成。

Kafka具有四個核心API:

  1. Producer API:允許應用程式将記錄流釋出到一個或多個Kafka主題。
  2. Consumer API:允許應用程式訂閱一個或多個主題并處理為其生成的記錄流。
  3. Streams API:允許應用程式充當流處理器,使用一個或多個主題的輸入流,并生成一個或多個輸出主題的輸出流,進而有效地将輸入流轉換為輸出流。
  4. Connector API:允許建構和運作将Kafka主題連接配接到現有應用程式或資料系統的可重用生産者或使用者。例如,關系資料庫的連接配接器可能會捕獲對表的所有更改。

二、Kafka優勢

  1. 高吞吐量:單機每秒處理幾十上百萬的消息量。即使存儲了許多TB的消息,它也保持穩定的性能。
  2. 高性能:單節點支援上千個用戶端,并保證零停機和零資料丢失。
  3. 持久化資料存儲:将消息持久化到磁盤。通過将資料持久化到硬碟以及replication防止資料丢失。

    1、零拷貝

    2、順序讀,順序寫

    3、利用Linux的頁緩存

  4. 分布式系統,易于向外擴充。所有的Producer、Broker和Consumer都會有多個,均為分布式的。無需停機即可擴充機器。多個Producer、Consumer可能是不同的應用。
  5. 可靠性 - Kafka是分布式,分區,複制和容錯的。
  6. 用戶端狀态維護:消息被處理的狀态是在Consumer端維護,而不是由server端維護。當失敗時能自動平衡。
  7. 支援online和offline的場景。
  8. 支援多種用戶端語言。Kafka支援Java、.NET、PHP、Python等多種語言。

三、Kafka應用場景

  • 日志收集:一個公司可以用Kafka可以收集各種服務的Log,通過Kafka以統一接口服務的方式開放給各種Consumer;
  • 消息系統:解耦生産者和消費者、緩存消息等;
  • 使用者活動跟蹤:Kafka經常被用來記錄Web使用者或者App使用者的各種活動,如浏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到Kafka的Topic中,然後消費者通過訂閱這些Topic來做實時的監控分析,亦可儲存到資料庫;
  • 營運名額:Kafka也經常用來記錄營運監控資料。包括收集各種分布式應用的資料,生産各種操作的集中回報,比如報警和報告;
  • 流式處理:比如Spark Streaming和Storm。

四、基本架構

1. 消息和批次

Kafka的資料單元稱為消息。可以把消息看成是資料庫裡的一個“資料行”或一條“記錄”。消息由位元組數組組成。

消息有鍵,鍵也是一個位元組數組。當消息以一種可控的方式寫入不同的分區時,會用到鍵。

為了提高效率,消息被分批寫入Kafka。批次就是一組消息,這些消息屬于同一個主題和分區。

把消息分成批次可以減少網絡開銷。批次越大,機關時間内處理的消息就越多,單個消息的傳輸時間就越長。批次資料會被壓縮,這樣可以提升資料的傳輸和存儲能力,但是需要更多的計算處理。

2. 模式

消息模式(schema)有許多可用的選項,以便于了解。如JSON和XML,但是它們缺乏強類型處理能力。Kafka的許多開發者喜歡使用Apache Avro。Avro提供了一種緊湊的序列化格式,模式和消息體分開。當模式發生變化時,不需要重新生成代碼,它還支援強類型和模式進化,其版本既向前相容,也向後相容。

資料格式的一緻性對Kafka很重要,因為它消除了消息讀寫操作之間的耦合性。

3. 主題和分區

Kafka的消息通過主題進行分類。主題可比是資料庫的表或者檔案系統裡的檔案夾。主題可以被分為若幹分區,一個主題通過分區分布于Kafka叢集中,提供了橫向擴充的能力。類似于mysql得分庫分表。

kafka 學習

4. 生産者和消費者

生産者建立消息。消費者消費消息。

一個消息被釋出到一個特定的主題上。

生産者在預設情況下把消息均衡地分布到主題的所有分區上:

  1. 直接指定消息的分區
  2. 根據消息的key散列取模得出分區
  3. 輪詢指定分區。

消費者通過偏移量來區分已經讀過的消息,進而消費消息。會在zookeeper記錄偏移量。

消費者是消費組的一部分。消費組保證每個分區隻能被一個消費者使用,避免重複消費。

kafka 學習

5. broker和叢集

一個獨立的Kafka伺服器稱為broker。broker接收來自生産者的消息,為消息設定偏移量,并送出消息到磁盤儲存。broker為消費者提供服務,對讀取分區的請求做出響應,傳回已經送出到磁盤上的消息。單個broker可以輕松處理數千個分區以及每秒百萬級的消息量。

kafka 學習

每個叢集都有一個broker是叢集控制器(自動從叢集的活躍成員中選舉出來)。

控制器負責管理工作:

  • 将分區配置設定給broker
  • 監控broker

叢集中一個分區屬于一個broker,該broker稱為分區首領。

一個分區可以配置設定給多個broker,此時會發生分區複制。

分區的複制提供了消息備援,高可用。副本分區不負責處理消息的讀寫。

五、kafka腳本

1、kafka-topics.sh 用于管理主題。

# 列出現有的主題
[[email protected] ~]# kafka-topics.sh --list --zookeeper localhost:2181/myKafka
# 建立主題,該主題包含一個分區,該分區為Leader分區,它沒有Follower分區副本。
[[email protected] ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_1 --
partitions 1 --replication-factor 1
# 檢視分區資訊
[[email protected] ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --list
# 檢視指定主題的詳細資訊
[[email protected] ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_1
# 删除指定主題
[[email protected] ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_1
           

2、kafka-console-producer.sh用于生産消息:

# 開啟生産者
[[email protected] ~]# kafka-console-producer.sh --topic topic_1 --broker-list localhost:9020
           

3、kafka-console-consumer.sh用于消費消息:

# 開啟消費者
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1
# 開啟消費者方式二,從頭消費,不按照偏移量消費
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1 --from-beginning
           

六、Kafka單機安裝與配置

1、Java環境為前提

  1. 上傳jdk-8u261-linux-x64.rpm到伺服器并安裝
  2. 配置環境變量

2、Zookeeper的安裝配置

  1. 上傳zookeeper-3.4.14.tar.gz到伺服器
  2. 解壓到/opt
  3. 修改Zookeeper儲存資料的目錄,dataDir:

    dataDir=/var/lagou/zookeeper/data

  4. 編輯/etc/profile
  5. 啟動Zookeeper:

    zkServer.sh start

3、Kafka的安裝與配置

  1. 上傳kafka_2.12-1.0.2.tgz到伺服器并解壓
  2. 配置環境變量并生效
  3. 配置/opt/kafka_2.12-1.0.2/config中的server.properties檔案

    Kafka連接配接Zookeeper的位址,此處使用本地啟動的Zookeeper執行個體,連接配接位址是localhost:2181

    kafka 學習
  4. 啟動Kafka

    kafka-server-start.sh [-daemon] /opt/kafka_2.12-1.0.2/config/server.properties

    kafka 學習
  5. 在zookeeper中檢視相關資訊
    kafka 學習

七、叢集搭建

1、 搭建設計

kafka 學習

2、配置設定三台Linux,用于安裝擁有三個節點的Kafka叢集。

  • slave1(192.168.192.131)
  • slave2(192.168.192.132)
  • slave3(192.168.192.134)

以上三台主機的/etc/hosts配置:

192.168.192.131 node2 
192.168.192.132 node3  
192.168.192.134 node4  
           

1、Zookeeper叢集搭建

上傳JDK到linux,安裝并配置JDK

# 使用rpm安裝JDK
rpm -ivh jdk-8u261-linux-x64.rpm

# 預設的安裝路徑是/usr/java/jdk1.8.0_261-amd64
# 配置JAVA_HOME
vim /etc/profile

# 檔案最後添加兩行
export JAVA_HOME=/usr/java/jdk1.8.0_261-amd64
export PATH=$PATH:$JAVA_HOME/bin

# 退出vim,使配置生效
source /etc/profile
           

檢視JDK是否正确安裝

java -version
           

上傳zookeeper-3.4.14.tar.gz到Linux,解壓并配置zookeeper

# node2操作
# 解壓到/opt目錄
tar -zxf zookeeper-3.4.14.tar.gz -C /opt

# 配置
cd /opt/zookeeper-3.4.14/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg

# 設定
dataDir=/var/lagou/zookeeper/data

# 添加
server.1=node2:2881:3881
server.2=node3:2881:3881
server.3=node4:2881:3881

# 退出vim
mkdir -p /var/lagou/zookeeper/data
echo 1 > /var/lagou/zookeeper/data/myid

# 配置環境變量
vim /etc/profile

# 添加
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log

# 退出vim,讓配置生效
source /etc/profile

# 将/opt/zookeeper-3.4.14拷貝到node3,node4
scp -r /opt/zookeeper-3.4.14/ node3:/opt
scp -r /opt/zookeeper-3.4.14/ node4:/opt
           

node3配置

# 配置環境變量
vim /etc/profile

# 在配置JDK環境變量基礎上,添加内容
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log

# 退出vim,讓配置生效
source /etc/profile
mkdir -p /var/lagou/zookeeper/data
echo 2 > /var/lagou/zookeeper/data/myid
           

node4配置

# 配置環境變量
vim /etc/profile

# 在配置JDK環境變量基礎上,添加内容
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log

# 退出vim,讓配置生效
source /etc/profile
mkdir -p /var/lagou/zookeeper/data
echo 3 > /var/lagou/zookeeper/data/myid
           

啟動zookeeper

# 在三台Linux上啟動Zookeeper
[[email protected] ~]# zkServer.sh start
[[email protected] ~]# zkServer.sh start
[[email protected] ~]# zkServer.sh start

# 在三台Linux上檢視Zookeeper的狀态
[[email protected] ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower

[[email protected] ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: leader

[[email protected] ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg 
Mode: follower
           

2、Kafka叢集搭建

上傳并解壓Kafka到/opt

# 解壓到/opt
tar -zxf kafka_2.12-1.0.2.tgz -C /opt

# 拷貝到node3和node4
scp -r /opt/kafka_2.12-1.0.2/ node3:/opt
scp -r /opt/kafka_2.12-1.0.2/ node4:/opt
           

配置Kafka

# 配置環境變量,三台Linux都要配置
vim /etc/profile

# 添加以下内容:
export KAFKA_HOME=/opt/kafka_2.12-1.0.2
export PATH=$PATH:$KAFKA_HOME/bin

# 讓配置生效
source /etc/profile

# node2配置
vim /opt/kafka_2.12-1.0.2/config/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node2:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka

# 其他使用預設配置
# node3配置
vim /opt/kafka_2.12-1.0.2/config/server.properties
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node3:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka

# 其他使用預設配置
# node4配置
vim /opt/kafka_2.12-1.0.2/config/server.properties
broker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node4:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka
# 其他使用預設配置
           

啟動Kafka

kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
           

驗證Kafka

三個節點都會有相同得Cluster ID

kafka 學習
zkCli.sh
# 檢視每個Broker的資訊
get /brokers/ids/0
get /brokers/ids/1
get /brokers/ids/2
           

八、擴充

kafka代碼實戰

kafka日志收集實戰