天天看點

KafkaKafka介紹Kafka架構Kafka安裝部署Kafke指令行操作Kafka Api操作Kafka優化總結參考資料

文章目錄

  • Kafka介紹
    • 消息隊列的作用
    • 主題和日志
      • 釋出消息
      • 讀取消息
    • 配置設定
    • 生産者
    • 消費者
  • Kafka架構
  • Kafka安裝部署
  • Kafke指令行操作
  • Kafka Api操作
  • Kafka優化
  • 總結
  • 參考資料

Kafka介紹

官網:http://kafka.apache.org/

ApacheKafka®是一個分布式流媒體平台,用于建構資料管道和流應用程式。它具有水準可擴充性,容錯性,快速性,并在數千家公司中的生産中運作。

流媒體平台有三個關鍵功能:

釋出和訂閱:讀取和寫入資料流,類似于消息隊列或企業消息傳遞系統。

處理:編寫可擴充的處理應用程式、實時響應事件。

存儲:将資料流安全地存儲在分布式、副本的容錯叢集中。

Kafka通常用于兩大類應用:

建構可在系統或應用程式之間可靠擷取資料的實時流資料管道

建構轉換或響應資料流的實時流應用程式

Kafka有四個核心API:

  1. 生産者(Producer)API:允許應用程式釋出的記錄流至一個或多個Kafka的話題。
  2. 消費者(Consumer)API:允許應用程式訂閱一個或多個主題,并處理所産生的對他們記錄的資料流。
  3. 流(Streams)API:允許應用程式充當流處理器,從一個或多個主題消耗的輸入流,并産生一個輸出流至一個或多個輸出的主題,有效地變換所述輸入流,以輸出流。
  4. 連接配接器(Connector)API:允許建構和運作Kafka主題連接配接到現有的應用程式或資料系統中重用生産者或消費者。例如,關系資料庫的連接配接器可能捕獲對表的每個更改。
KafkaKafka介紹Kafka架構Kafka安裝部署Kafke指令行操作Kafka Api操作Kafka優化總結參考資料

  在流計算中,kafka主要功能是用來緩存資料,storm可以通過消費kafka中的資料進行流計算。

是一套開源的消息系統,由scala寫成。支援javaAPI的。

  kafka最初由LinkedIn公司開發,2011年開源。

  2012年從Apache畢業。

  是一個分布式消息隊列,kafka讀消息儲存采用Topic進行歸類。分布式消息隊列有兩個重要的角色:發送消息的Producer(生産者)和接收消息的Consumer(消費者)。

消息隊列的作用

  1. 解耦

    使用消息隊列作為傳輸存儲消息的通道,可以避免因一方出問題後資料丢失的安全問題。如圖所示:

    KafkaKafka介紹Kafka架構Kafka安裝部署Kafke指令行操作Kafka Api操作Kafka優化總結參考資料

      上圖中顯示了無消息隊列和有消息隊列的消息流程圖。其中,如果無消息隊列,當發送消息方發送消息時,如果接收消息方出現了問題,此時這一段消息将會丢失或發送消息方重試多次發送。

      而加入了消息隊列後,發送消息方和接收消息方不再直接相連,接收消息方當機後,消息會存儲到消息隊列的記憶體或磁盤中,直到接收消息方重新啟動後會從消息隊列擷取消息,而發送消息方在發送完消息之後不需要關心接收消息方是否接收到消息。

      消息隊列就是傳輸和存儲消息的通道。

  2. 拓展性

      上圖中隻是展示了一個發送消息方和接收消息方,在分布式消息隊列中,就是生産者和消費者。在實際應用中,一個生産者發送的消息會由多個消費者消費,而一個消費者可以消費多個生産者的消息,加入消息隊列,對消息的傳輸進入統一的管理,可以便于新增生産者和消費者,同時不影響其他系統的工作。

    KafkaKafka介紹Kafka架構Kafka安裝部署Kafke指令行操作Kafka Api操作Kafka優化總結參考資料
      上圖中也清楚顯示了消息隊列的作用。在Kafka中,topic是用于消息隊列的實作,生産者和消費者通過注冊和訂閱topic,實作發送和接收消息的功能。當需要新增一個生産者或消費者時,隻需要注冊或訂閱對應的topic就可以實作發送和接收消息的功能。
  3. 靈活

      面對通路量劇增的情況下,分布式消息隊列可以把通路量均衡配置設定到多個消息隊列中,當所有消息隊列都滿了後,會拒絕通路,避免系統完全癱瘓。

  4. 可恢複性

      當分布式消息隊列部分元件失效,不會影響到整個系統,可以随時恢複過來。

  5. 緩沖性

       消息統一由消息隊列中間件管理,可以控制資料流經過系統的速度,避免系統負載過重,出現問題,實作服務治理的效果。

  6. 順序保證性

      消息隊列遵循先入先出原則,保證了資料的順序。

  7. 異步通信

      kafka消息隊列提供了異步處理的機制,允許使用者把消息放到隊列中,而不立即處理。

主題和日志

Kafka記錄流的核心抽象是Topic——主題,主題是釋出消息的類别或訂閱源名稱。上圖中顯示,一個主題一般都是多使用者的,多個生産者或多個消費者寫入或讀取它的資料。

釋出消息

對于每個主題,Kafka叢集都維護一個分區日志,如下圖:

KafkaKafka介紹Kafka架構Kafka安裝部署Kafke指令行操作Kafka Api操作Kafka優化總結參考資料

每個分區都是一個有序的,不可變的記錄序列,不斷附加到結構化的送出日志中。分區中的記錄每個都被配置設定一個稱為偏移的順序ID号,它唯一地辨別分區中的每個記錄。即Kafka是用分區日志來實作消息隊列資料結構的。

Kafka叢集會持久化地保留所有已釋出的消息,即不管它們是否已被消耗,Kafka都會儲存它們在日志分區中。持久化消息有三個配置方式:

(1)消息達到預設10000條時将資料寫入到日志檔案。

(2)當達到某個時間時,強制執行一次flush,預設值為null。

(3)周期性檢查消息是否需要flush。

另外,配置資料儲存政策,可以将日志檔案删除。儲存政策有兩種方式:

(1)按時間粒度,可設定分鐘或小時,達到一定時間就處理,預設是7天。

(2)按檔案大小,設定最大檔案大小,達到上限就删除。同時可以設定檔案大小檢查的周期。

當達到以上儲存政策的其中一個條件時,會把日志檔案作一個标記"delete",當這些标記了"delete"的檔案達到一定大小或達到一定時間後才會真正删除。詳情請檢視官方文檔。

讀取消息

每個消費者保留的唯一進制資料是該消費者在日志中的偏移或位置,該偏移可以由消費者控制,預設在讀取消息時會線性提高其偏移量。消費者可以控制偏移,即消費者可以重複消費舊的消息或直接跳到最新的消息處,且對叢集或其他消費者沒有太大影響。

KafkaKafka介紹Kafka架構Kafka安裝部署Kafke指令行操作Kafka Api操作Kafka優化總結參考資料

配置設定

日志分區可以分布在多個Kafka叢集中的伺服器上,而一個主題可以有多個日志分區,是以,它可以處理任意數量的消息。

每個伺服器處理資料并請求分區的共享。每個分區都在可配置數量的伺服器上進行複制,以實作容錯。

每個分區都有一個伺服器充當“Leader”,零個或多個伺服器充當“Follower”,Leader負責處理分區的所有讀取和寫入請求,而分區的Follower則被動地複制Leader,如果Leader當機了,其中一個Follower會自動成為新的Leader。每個伺服器都充當某些分區的Leader伺服器和其他分區Leader伺服器的Follower。

生産者

生産者将消息釋出到它指定的主題,并負責選擇把記錄配置設定到主題中的哪個分區。可自定義設定分區資訊。

消費者

消費者使用消費者組來标記自己,并且釋出到主題的每個記錄會被傳遞到訂閱了該主題的消費者組中的一個消費者執行個體。消費者執行個體可以在單獨的程序中,也可以在不同的機器上。

如果消費者執行個體在相同的消費者組中,則會均衡負載到消費者執行個體上。

如果消費者執行個體在不同的消費者組中,則會廣播到所有消費者程序中。

Kafka架構

kafka依賴zookeeper,用zk儲存中繼資料資訊。搭建kafka叢集需要先搭建zookeeper叢集。下面首先以微信公衆号的消息發送接收為例,展示一下Kafka的整體流程。

KafkaKafka介紹Kafka架構Kafka安裝部署Kafke指令行操作Kafka Api操作Kafka優化總結參考資料

上圖中背景系統充當Producer,使用者用戶端充當Consumer,而Kafka叢集充當消息隊列的角色。

下面看看細節實作:

KafkaKafka介紹Kafka架構Kafka安裝部署Kafke指令行操作Kafka Api操作Kafka優化總結參考資料
  1. 生産者把消息釋出到指定主題的某個分區中;
  2. kafka把主題中的消息傳遞給訂閱了該主題的消費者組;
  3. 預設情況下,消費者組會把所訂閱主題的分區均衡配置設定給消費者執行個體來處理,當傳遞過來的消息是分區Partition中的,則把它傳遞給負責此分區的消費者執行個體處理。

Kafka安裝部署

上面對Kafka進行了詳細的說明,下面開始使用Kafka。

  1. 官網下載下傳安裝包:http://kafka.apache.org/downloads,本文使用的是2.11版本。
  2. 上傳到伺服器并解壓檔案
tar -zxvf .tar
           
  1. 修改配置檔案config/server.properties
broker.id=0  #叢集中的每個伺服器的id不同
delete.topic.enable=true #是否允許删除主題
log.dirs=/kafka/logs #指定日志儲存的位置,需要事先建立指定的目錄
zookeeper.connect=hd-even-01:2181,hd-even-02:2181,hd-even-03:2181  #指定zookeeper叢集
           
  1. 啟動zk叢集
zkServer.sh start
           
  1. 啟動kafka叢集
# &代表是背景啟動
bin/kafka-server-start.sh config/server.properties &
           
  1. 關閉的指令
bin/kafka-server-stop.sh
           

Kafke指令行操作

1. 檢視目前叢集已存在的主題
bin/kafka-topic.sh --zookeeper hd-even-01:2181 --list
2. 建立topic
bin/kafka-topic.sh --zookeeper hd-even-01:2181 --create --replication-factor 3 --partitions 1 --topic test

--zookeeper 連接配接zk叢集
--create 建立指令
--replication-factor 副本數
--partition 分區數
--topic 主題名
3. 删除主題
bin/kafka-topic.sh --zookeeper hd-even-01:2181 --delete --topic test
4. 發送消息
	生産者啟動(9092是Kafka對外開放的接口):
	bin/kafka-console-producer.sh --broker-list hd-even-01:9092 --topic test
	消費者啟動:
	bin/kafka-console-consumer.sh --bootstrap-server hd-even-01:9092 --topic test --from-beginning
	--bootstrap-server 指定kafka叢集
	--from-beginning 表示消費者偏移為第一個位置
5. 檢視主題詳細資訊
bin/kafka-topic.sh --zookeeper hd-even-01:2181 --describe --topic test
           

Kafka Api操作

除了通過指令行的方式操作Kafka,還可以使用Java Api來操作Kafka,以便與企業項目內建。

  1. pom.xml導入依賴:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.0.0</version>
</dependency>
           
  1. 生産者Producer:
package com.even.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


public class Producer {
    public static void main(String[] args) {

        //1.配置生産者屬性(指定多個參數)
        Properties prop = new Properties();

        //參數配置
        //kafka節點的位址
        prop.put("bootstrap.servers", "192.168.11.136:9092");
        //發送消息是否等待應答
        prop.put("acks", "all");
        //配置發送消息失敗重試
        prop.put("retries", "0");
        //配置批量處理消息大小
        prop.put("batch.size", "10241");
        //配置批量處理資料延遲
        prop.put("linger.ms", "5");
        //配置記憶體緩沖大小
        prop.put("buffer.memory", "12341235");
        //消息在發送前必須序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //2.執行個體化producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        //3.發送消息
        for (int i = 0; i < 10000; i++) {
            producer.send(new ProducerRecord<String, String>("test", "even" + i));
        }
        //4.釋放資源
        producer.close();
    }
}

           
  1. 消費者Consumer:
package com.even.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;


public class Consumer {
    public static void main(String[] args) {
        //1.配置消費者屬性
        Properties prop = new Properties();
        //配置屬性
        //伺服器位址指定
        prop.put("bootstrap.servers", "192.168.11.136:9092");
        //配置消費者組
        prop.put("group.id", "g1");
        //配置是否自動确認offset
        prop.put("enable.auto.commit", "true");
//        prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        //序列化
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //2.執行個體消費者
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);

        //訂閱消息主題
        consumer.subscribe(Collections.singletonList("test"));
        //3.拉消息 推push 拉poll
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            //周遊消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.topic() + "------" + record.value());
            }

        }
    }
}
           
  1. 先啟動Producer,再啟動Consumer,結果列印:
    KafkaKafka介紹Kafka架構Kafka安裝部署Kafke指令行操作Kafka Api操作Kafka優化總結參考資料

Kafka優化

Kafka可以認為是一個記憶體資料庫,消息儲存在記憶體中,定期持久化到硬碟。是以,對Kafka記憶體的合理使用是Kafka優化的關鍵。下面列出幾點常用優化:

  1. 增加Kafka堆記憶體大小,修改kafka-server-start.sh檔案
    KafkaKafka介紹Kafka架構Kafka安裝部署Kafke指令行操作Kafka Api操作Kafka優化總結參考資料
    堆記憶體預設是1G,可以适當調整。
  2. 設定持久化資料的政策

    上面提到,Kafka通過一定的方式,會把記憶體的資料持久化到硬碟,這三種方式分别為:

    (1)消息達到預設10000條時将資料寫入到日志檔案。

    (2)當達到某個時間時,強制執行一次flush,預設值為null。

    (3)周期性檢查消息是否需要flush。

  3. 限流

    對于生産者釋出的消息進行限流。

  4. 叢集模式

    把Kafka配置成叢集模式,當一個伺服器的記憶體快要滿時,可以把資料均衡配置設定到另一台伺服器,然後再把資料持久化到本地硬碟,避免記憶體溢出。

總結

Kafka有幾個重點要了解的概念:生産者Producer,消費者Consumer,主題Topic,分區Partition,消費者組ConsumerGrouping。生産者負責釋出消息,Topic是一個抽象概念,包含多個分區Partition,分區Partition是具體存儲消息的地方,消費者組ConsumerGrouping從訂閱的主題擷取消息并傳遞給消費者Consumer。

參考資料

Kafka官方文檔:http://kafka.apache.org/