天天看點

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

Kafka

  • 一、概述
    • 1.1 定義
    • 1.2 消息隊列(Message Queue)
      • 1.2.1 應用場景
      • 1.2.2 消息的模式
    • 1.3 基礎架構
  • 二、Kafka快速入門
    • 2.1Kafka安裝
      • 2.1.1 叢集規劃
      • 2.1.1 叢集安裝部署
    • 2.2 Kafka指令行操作
  • 三、Kafka架構深入
    • 3.2 Kafka生産者
      • 3.2.1 分區政策
      • 3.2.2 資料可靠性保證
        • 1)副本資料同步政策
        • 2)ISR
        • 3)ack應答機制
        • 4)故障處理細節
      • 3.2.3 Exactly Once語義
    • 3.3 Kafka消費者
      • 3.3.1 消費方式
      • 3.3.2 分區配置設定政策
      • 3.3.3 offset的維護
    • 3.4 Kafka 高效讀寫資料
    • 3.5 Zookeeper在Kafka中的作用
  • 四、Kafka API java
    • 4.1 Producer API
      • 4.1.1 消息發送流程
      • 4.1.2 異步發送API
      • 4.1.3 同步發送API
    • 4.2 Consumer API
      • 4.2.1 手動送出offset
      • 4.2.2 自動送出offset
    • 4.3 自定義Interceptor
      • 4.3.1 攔截器原理
      • 4.3.2 攔截器案例
  • 五、Kafka監控
    • 5.1 Kafka Monitor
    • 5.2 Kafka Manager
  • 六、面試題

一、概述

1.1 定義

Kafka是一個分布式的基于釋出/訂閱模式的消息隊列,Scala和Java編寫,主要應用于大資料實時處理領域。

1.2 消息隊列(Message Queue)

1.2.1 應用場景

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

消息隊列的作用:

1、流量消峰

2、系統解耦

3、異步

4、資料分發

1.2.2 消息的模式

1、點對點模式(一對一,消費者主動拉取資料,消息收到後消息清除)

消息生産者生産消息發送到Queue中,然後消息消費者從Queue中取出并且消費消息。

消息被消費以後,queue中不再有存儲,是以消息消費者不可能消費到已經被消費的消息。Queue支援存在多個消費者,但是對一個消息而言,隻會有一個消費者可以消費。

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題
并不是一個生産者隻能有一個消費者,而是一個生産者的消息隻能被一個消費者消費。

2、釋出/訂閱模式(一對多,消費者消費資料之後不會清除消息)

消息生産者(釋出)将消息釋出到topic中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,釋出到topic的消息會被所有訂閱者消費。

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題
Kafka用的是釋出訂閱。

1.3 基礎架構

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

1)Producer :消息生産者,就是向kafka broker發消息的用戶端;

2)Consumer :消息消費者,向kafka broker取消息的用戶端;

3)Consumer Group (CG):消費者組,由多個consumer組成。消費者組内每個消費者負責消費不同分區的資料,一個分區隻能由一個消費者消費;消費者組之間互不影響。

所有的消費者都屬于某個消費者組,消費者組是邏輯上的一個訂閱者。

4)Broker :一台kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic。

5)Topic :可以了解為一個隊列,生産者和消費者面向的都是一個topic;

6)Partition:為了實作擴充性,一個非常大的topic可以分布到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列;

7)Replica:副本,為保證叢集中的某個節點發生故障時,該節點上的partition資料不丢失,且kafka仍然能夠繼續工作,kafka提供了副本機制,一個topic的每個分區都有若幹個副本,一個leader和若幹個follower。(副本數不能超過Broker數目)

8)leader:每個分區多個副本的“主”,生産者發送資料的對象,以及消費者消費資料的對象都是leader。

9)follower:每個分區多個副本中的“從”,實時從leader中同步資料,保持和leader資料的同步。leader發生故障時,某個follower會成為新的follower。

二、Kafka快速入門

2.1Kafka安裝

2.1.1 叢集規劃

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

2.1.1 叢集安裝部署

安裝包下載下傳:http://kafka.apache.org/downloads.html

1)解壓安裝包

Scala版本号 – kafka版本号

2)修改解壓後的檔案名稱

mv kafka_2.11-0.11.0.0/kafka
           

3)在/opt/module/kafka目錄下建立logs檔案夾

mkdir logs
           

4)修改配置檔案

cd config/
vi server.properties
           

配置檔案内容:

#broker的全局唯一編号,不能重複
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的現成數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#kafka運作日志存放的路徑	
log.dirs=/opt/module/kafka/logs
#topic在目前broker上的分區個數
num.partitions=1
#用來恢複和清理data下資料的線程數量
num.recovery.threads.per.data.dir=1
#segment檔案保留的最長時間,逾時将被删除
log.retention.hours=168
#配置連接配接Zookeeper叢集位址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
           

5)配置環境變量

sudo vi /etc/profile
           

配置内容

export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
           

生效配置

source /etc/profile
           

6)分發安裝包

xsync kafka/
           

注意:分發之後記得配置其他機器的環境變量

7)分别在hadoop103和hadoop104上修改配置檔案/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2

注:broker.id不得重複

8)啟動叢集

依次在hadoop102、hadoop103、hadoop104節點上啟動kafka

-daemon 背景

config/server.properties啟動配置

bin/kafka-server-start.sh -daemon config/server.properties
           

9)關閉叢集

bin/kafka-server-stop.sh stop
           

10)kafka群起腳本

for i in `cat /opt/module/hadoop-2.7.2/etc/hadoop/slaves`
do
echo "========== $i ==========" 
ssh $i 'source /etc/profile&&/opt/module/kafka_2.11-0.11.0.2/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-0.11.0.2/config/server.properties'
echo $?
done
           

2.2 Kafka指令行操作

  1. 檢視目前伺服器中的所有topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
           
  1. 建立topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 
  --create --replication-factor 3 --partitions 1 --topic first
           
  1. 删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181
--delete --topic first
           
Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題
  1. 發送消息
bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
           
  1. 消費消息
bin/kafka-console-consumer.sh  --bootstrap-server hadoop102:9092 --from-beginning --topic first
           
  1. 檢視某個Topic的詳情
bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
           
  1. 修改分區數

    分區數隻能增加不能減少,kafka分區的資料不能重新配置設定,内部沒實作

bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
           

三、Kafka架構深入

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

注:

0.9版本前offset是存在ZK上面的,0.9版本後存在一個内置的Topic()中。

kafka隻能保證一個分區内的有序性,不能保證全局有序性。

Kafka中消息是以topic進行分類的,生産者生産消息,消費者消費消息,都是面向topic的。

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

topic是邏輯上的概念,而partition是實體上的概念。

每個partition對應于一個log檔案,該log檔案中存儲的就是producer生産的資料。

Producer生産的資料會被不斷追加到該log檔案末端,且每條資料都有自己的offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個offset,以便出錯恢複時,從上次的位置繼續消費。

由于生産者生産的消息會不斷追加到log檔案末尾,為防止log檔案過大導緻資料定位效率低下。

Kafka采取了分片和索引機制,将每個partition分為多個segment。每個segment對應兩個檔案——“.index”檔案和“.log”檔案。這些檔案位于一個檔案夾下,該檔案夾的命名規則為:topic名稱+分區序号。例如,first這個topic有三個分區,則其對應的檔案夾為first-0,first-1,first-2。

00000000000000000000.index

00000000000000000000.log

00000000000000170410.index

00000000000000170410.log

00000000000000239430.index

00000000000000239430.log

index和log檔案以目前segment的第一條消息的offset命名

“.index”檔案存儲大量的索引資訊,索引檔案中的中繼資料指向對應資料檔案中message的實體偏移位址,“.log”檔案存儲大量的資料。

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

注:

資料預設存儲7天,配置檔案可以修改。

過期直接删除過期的segment檔案即可。

3.2 Kafka生産者

3.2.1 分區政策

  1. 分區的原因

    (1)友善在叢集中擴充,每個Partition可以通過調整以适應它所在的機器,而一個topic又可以有多個Partition組成,是以整個叢集就可以适應任意大小的資料了;

    (2)可以提高并發,因為可以以Partition為機關讀寫了。

  2. 分區的原則

    看一下Java的API中的ProducerRecord對象就清楚了

    Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

(1)指明 partition 的情況下,直接将指明的值直接作為 partiton 值;

(2)沒有指明 partition 值但有 key 的情況下,将 key 的 hash 值與 topic 的 partition 數進行取餘得到 partition 值;

(3)既沒有 partition 值又沒有 key 值的情況下,第一次調用時随機生成一個整數(後面每次調用在這個整數上自增),将這個值與 topic 可用的 partition 總數取餘得到 partition 值,也就是常說的 round-robin 算法。

3.2.2 資料可靠性保證

為保證producer發送的資料,能可靠的發送到指定的topic,topic的每個partition收到producer發送的資料後,都需要向producer發送ack(acknowledgement确認收到),如果producer收到ack,就會進行下一輪的發送,否則重新發送資料。

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

1)副本資料同步政策

方案 優點 缺點
半數以上完成同步,就發送ack 延遲低 選舉新的leader時,容忍n台節點的故障,需要2n+1個副本
全部完成同步,才發送ack 選舉新的leader時,容忍n台節點的故障,需要n+1個副本 延遲高

Kafka選擇了第二種方案,原因如下:

1.同樣為了容忍n台節點的故障,第一種方案需要2n+1個副本,而第二種方案隻需要n+1個副本,而Kafka的每個分區都有大量的資料,第一種方案會造成大量資料的備援。

2.雖然第二種方案的網絡延遲會比較高,但網絡延遲對Kafka的影響較小。(為了解決部分brocker出現網絡比較慢的情況,出現了一個ISR的概念,下面介紹)

2)ISR

采用第二種方案之後,設想以下情景:leader收到資料,所有follower都開始同步資料,但有一個follower,因為某種故障,遲遲不能與leader進行同步,那leader就要一直等下去,直到它完成同步,才能發送ack。這個問題怎麼解決呢?

Leader維護了一個動态的in-sync replica set (ISR),意為和leader保持同步的follower集合。當ISR中的follower完成資料的同步之後,leader就會給follower發送ack。如果follower長時間未向leader同步資料,則該follower将被踢出ISR,該時間門檻值由replica.lag.time.max.ms參數設定。Leader發生故障之後,就會從ISR中選舉新的leader。如果恢複以後又會加進來。

之前有兩種ISR踢出方式 1、時間 2、落後條數

後來把落後條數給去除了,原因:如果某一時間消息量比較大就會踢出,踢出又進來然後造成維護的資源消耗。

3)ack應答機制

對于某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丢失,是以沒必要等ISR中的follower全部接收成功。

是以Kafka為使用者提供了三種可靠性級别,使用者根據對可靠性和延遲的要求進行權衡,選擇以下的配置。

acks參數配置:

acks: 0 1 -1

  • 0:producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁盤就已經傳回,當broker故障時有可能丢失資料;
  • 1:producer等待broker的ack,partition的leader落盤成功後傳回ack,如果在follower同步成功之前leader故障,那麼将會丢失資料;
    Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題
  • -1(all):producer等待broker的ack,partition的leader和follower全部落盤成功後才傳回ack。但是如果在follower同步完成後,broker發送ack之前,leader發生故障,那麼會造成資料重複。
    Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

4)故障處理細節

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

(1)follower故障

follower發生故障後會被臨時踢出ISR,待該follower恢複後,follower會讀取本地磁盤記錄的上次的HW,并将log檔案高于HW的部分截取掉,從HW開始向leader進行同步。等該follower的LEO大于等于該Partition的HW,即follower追上leader之後,就可以重新加入ISR了。

(2)leader故障

leader發生故障之後,會從ISR中選出一個新的leader,之後,為保證多個副本之間的資料一緻性,其餘的follower會先将各自的log檔案高于HW的部分截掉,然後從新的leader同步資料。

注意:這隻能保證副本之間的資料一緻性,并不能保證資料不丢失或者不重複。

注:HW

3.2.3 Exactly Once語義

對于某些比較重要的消息,我們需要保證exactly once語義,即保證每條消息被發送且僅被發送一次。

0.11版本之後,Kafka引入了幂等性機制(idempotent),配合acks = -1時的at least once語義,實作了producer到broker的exactly once語義。

idempotent + at least once = exactly once

使用時,隻需将enable.idempotence屬性設定為true,kafka自動将acks屬性設為-1。

3.3 Kafka消費者

3.3.1 消費方式

consumer采用pull(拉)模式從broker中讀取資料。

push(推)模式很難适應消費速率不同的消費者,因為消息發送速率是由broker決定的。它的目标是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以适當的速率消費消息。

pull模式不足之處是,如果kafka沒有資料,消費者可能會陷入循環中,一直傳回空資料。針對這一點,Kafka的消費者在消費資料時會傳入一個時長參數timeout,如果目前沒有資料可供消費,consumer會等待一段時間之後再傳回,這段時長即為timeout。

3.3.2 分區配置設定政策

一個consumer group中有多個consumer,一個 topic有多個partition,是以必然會涉及到partition的配置設定問題,即确定那個partition由哪個consumer來消費。

Kafka有兩種配置設定政策,一是roundrobin,一是range。

1)roundrobin

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

2)range

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

如果再來一個按這種range分區,則可能出現某些消費者越堆越多。預設是range分區。

3.3.3 offset的維護

由于consumer在消費過程中可能會出現斷電當機等故障,consumer恢複後,需要從故障前的位置的繼續消費,是以consumer需要實時記錄自己消費到了哪個offset,以便故障恢複後繼續消費。

Kafka 0.9版本之前,consumer預設将offset儲存在Zookeeper中,從0.9版本開始,consumer預設将offset儲存在Kafka一個内置的topic中,該topic為__consumer_offsets。

3.4 Kafka 高效讀寫資料

1)順序寫磁盤

Kafka的producer生産資料,要寫入到log檔案中,寫的過程是一直追加到檔案末端,為順序寫。官網有資料表明,同樣的磁盤,順序寫能到到600M/s,而随機寫隻有100k/s。這與磁盤的機械機構有關,順序寫之是以快,是因為其省去了大量磁頭尋址的時間。

2)零複制技術

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

3.5 Zookeeper在Kafka中的作用

Kafka叢集中有一個broker會被選舉為Controller,負責管理叢集broker的上下線,所有topic的分區副本配置設定和leader選舉等工作。

Controller的管理工作都是依賴于Zookeeper的。

以下為partition的leader選舉過程:

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

四、Kafka API java

4.1 Producer API

4.1.1 消息發送流程

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

Kafka的Producer發送消息采用的是異步發送的方式。在消息發送的過程中,涉及到了兩個線程——main線程和Sender線程,以及一個線程共享變量——RecordAccumulator。main線程将消息發送給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發送到Kafka broker。

相關參數:

batch.size:隻有資料積累到batch.size之後,sender才會發送資料。

linger.ms:如果資料遲遲未達到batch.size,sender等待linger.time之後就會發送資料。

4.1.2 異步發送API

1)導入依賴

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>0.11.0.0</version>
</dependency>
           

2)編寫代碼

需要用到的類:

KafkaProducer:需要建立一個生産者對象,用來發送資料

ProducerConfig:擷取所需的一系列配置參數

ProducerRecord:每條資料都要封裝成一個ProducerRecord對象

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//kafka叢集,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }
}
           

2.帶回調函數的API

回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數,分别是RecordMetadata和Exception,如果Exception為null,說明消息發送成功,如果Exception不為null,說明消息發送失敗。

注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

隻會傳回成功,或者重試失敗的結果。

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//kafka叢集,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {

                //回調函數,該方法會在Producer收到ack時調用,為異步調用
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success->" + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
}
           

4.1.3 同步發送API

同步發送的意思就是,一條消息發送之後,會阻塞目前線程,直至傳回ack。

由于send方法傳回的是一個Future對象,根據Futrue對象的特點,我們也可以實作同步發送的效果,隻需在調用Future對象的get方發即可。(高版本的同步是基于異步的)

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//kafka叢集,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();
        }
        producer.close();
    }
}
           

4.2 Consumer API

Consumer消費資料時的可靠性是很容易保證的,因為資料在Kafka中是持久化的,故不用擔心資料丢失問題。

由于consumer在消費過程中可能會出現斷電當機等故障,consumer恢複後,需要從故障前的位置的繼續消費,是以consumer需要實時記錄自己消費到了哪個offset,以便故障恢複後繼續消費。

是以offset的維護是Consumer消費資料是必須考慮的問題

4.2.1 手動送出offset

1)導入依賴

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>0.11.0.0</version>
</dependency>
           

2)編寫代碼

需要用到的類:

KafkaConsumer:需要建立一個消費者對象,用來消費資料

ConsumerConfig:擷取所需的一系列配置參數

ConsuemrRecord:每條資料都要封裝成一個ConsumerRecord對象

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");
        props.put("group.id", "test");//消費者組,隻要group.id相同,就屬于同一個消費者組
        props.put("enable.auto.commit", "false");//自動送出offset
       
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            consumer.commitSync();
        }
    }
}
           

3)代碼分析:

手動送出offset的方法有兩種:分别是commitSync(同步送出)和commitAsync(異步送出)。兩者的相同點是,都會将本次poll的一批資料最高的偏移量送出;不同點是,commitSync會失敗重試,一直到送出成功(如果由于不可恢複原因導緻,也會送出失敗);

commitAsync則沒有失敗重試機制,故有可能送出失敗。(下次有送出不影響,相當于重試)

4)資料重複消費問題

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

如果先送出,再進行資料處理,則會出現消息丢失的問題,一般不按照這個順序。

4.2.2 自動送出offset

為了使我們能夠專注于自己的業務邏輯,Kafka提供了自動送出offset的功能。

自動送出offset的相關參數:

enable.auto.commit:是否開啟自動送出offset功能

auto.commit.interval.ms:自動送出offset的時間間隔

以下為自動送出offset的代碼:

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;


/**
 * @author liubo
 */
public class CustomOffsetConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");
        props.put("group.id", "test");//消費者組,隻要group.id相同,就屬于同一個消費者組
        props.put("enable.auto.commit", "false");//自動送出offset
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {

            //送出目前負責的分區的offset
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {


            }

            //定位新配置設定的分區的offset
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                for (TopicPartition partition : partitions) {
                    Long offset = getPartitionOffset(partition);
                    consumer.seek(partition,offset);
                }
            }
        });


        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {

                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                commitOffset(topicPartition,record.offset()+1);
            }
        }
    }

    private static void commitOffset(TopicPartition topicPartition, long l) {

    }

    private static Long getPartitionOffset(TopicPartition partition) {
        return null;
    }

}
           

4.3 自定義Interceptor

4.3.1 攔截器原理

Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實作clients端的定制化控制邏輯。

對于producer而言,interceptor使得使用者在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer允許使用者指定多個interceptor按序作用于同一條消息進而形成一個攔截鍊(interceptor chain)。Intercetpor的實作接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

(1)configure(configs)

擷取配置資訊和初始化資料時調用。

(2)onSend(ProducerRecord):

該方法封裝進KafkaProducer.send方法中,即它運作在使用者主線程中。Producer確定在消息被序列化以及計算分區前調用該方法。使用者可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區,否則會影響目标分區的計算。

(3)onAcknowledgement(RecordMetadata, Exception):

該方法會在消息從RecordAccumulator成功發送到Kafka Broker之後,或者在發送過程中失敗時調用。并且通常都是在producer回調邏輯觸發之前。onAcknowledgement運作在producer的IO線程中,是以不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發送效率。

(4)close:

關閉interceptor,主要用于執行一些資源清理工作

如前所述,interceptor可能被運作在多個線程中,是以在具體實作時使用者需要自行確定線程安全。另外倘若指定了多個interceptor,則producer将按照指定順序調用它們,并僅僅是捕獲每個interceptor可能抛出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特别留意。

4.3.2 攔截器案例

1)需求:

實作一個簡單的雙interceptor組成的攔截鍊。第一個interceptor會在消息發送前将時間戳資訊加到消息value的最前部;第二個interceptor會在消息發送後更新成功發送消息數或失敗發送消息數。

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

2)案例實操

(1)增加時間戳攔截器

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TimeInterceptor implements ProducerInterceptor<String, String> {

	@Override
	public void configure(Map<String, ?> configs) {

	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		// 建立一個新的record,把時間戳寫入消息體的最前部
		return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
				System.currentTimeMillis() + "," + record.value().toString());
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

	}

	@Override
	public void close() {

	}
}
           

(2)統計發送消息成功和發送失敗消息數,并在producer關閉時列印這兩個計數器

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor<String, String>{
    private int errorCounter = 0;
    private int successCounter = 0;

	@Override
	public void configure(Map<String, ?> configs) {
		
	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		 return record;
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
		// 統計成功和失敗的次數
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
	}

	@Override
	public void close() {
        // 儲存結果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
	}
}
           

(3)producer主程式

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class InterceptorProducer {

	public static void main(String[] args) throws Exception {
		// 1 設定配置資訊
		Properties props = new Properties();
		props.put("bootstrap.servers", "hadoop102:9092");
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		// 2 建構攔截鍊
		List<String> interceptors = new ArrayList<>();
		interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); 	interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor"); 
		props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
		 
		String topic = "first";
		Producer<String, String> producer = new KafkaProducer<>(props);
		
		// 3 發送消息
		for (int i = 0; i < 10; i++) {
			
		    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
		    producer.send(record);
		}
		 
		// 4 一定要關閉producer,這樣才會調用interceptor的close方法
		producer.close();
	}
}
           

3)測試

(1)在kafka上啟動消費者,然後運作用戶端java程式。

bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic first

1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9
           

五、Kafka監控

5.1 Kafka Monitor

1.上傳jar包KafkaOffsetMonitor-assembly-0.4.6.jar到叢集

2.在/opt/module/下建立kafka-offset-console檔案夾

3.将上傳的jar包放入剛建立的目錄下

4.在/opt/module/kafka-offset-console目錄下建立啟動腳本start.sh,内容如下:

!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers hadoop102:9092,hadoop103:9092,hadoop104:9092 \
--kafkaSecurityProtocol PLAINTEXT \
--zk hadoop102:2181,hadoop103:2181,hadoop104:2181 \
--port 8086 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka &
           

5.2 Kafka Manager

  1. 上傳壓縮包kafka-manager-1.3.3.15.zip到叢集
  2. 解壓到/opt/module
  3. 修改配置檔案conf/application.conf
kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
修改為
kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"
           
  1. 啟動kafka-manager bin/kafka-manager
  2. 登入hadoop102:9000頁面檢視詳細資訊

六、面試題

連結: 參見我這篇文章,後續持續更新.

完結!撒花!

本文主要内容引用【尚矽谷】某老師的講義,其中有些内容是我自己新加編輯的,轉載時請注明出處。

Kafka學習位址:點選

碼字整理不易,感謝各位大佬支援↓↓↓↓↓↓↓↓↓↓↓

Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題
  • 【微信】二維碼:
  • Kafka_快速入門一、概述二、Kafka快速入門三、Kafka架構深入四、Kafka API java五、Kafka監控六、面試題

繼續閱讀