天天看點

kafka的入門學習今日大綱1. Kafka簡介2. kafka的分布式安裝3. Kafka的基本操作4. flume和kafka的整合5. Kafka架構之道6. Kafka Leader Election7. Kafka高性能之道

今日大綱

  • Kafka簡介
  • Kafka分布式叢集安裝
  • Kafka-topic操作
  • Kafka-api
  • Kafka和Flume整合案例
  • Kafka架構之道
  • Kafka Leader Election
  • Kafka高性能之道

1. Kafka簡介

1.1. 消息隊列

1.1.1. 為甚要有消息隊列

1.1.2. 消息隊列

  • 消息 Message

    網絡中的兩台計算機或者兩個通訊裝置之間傳遞的資料。例如說:文本、音樂、視訊等内容。

  • 隊列 Queue

    一種特殊的線性表(資料元素首尾相接),特殊之處在于隻允許在首部删除元素和在尾部追加元素。入隊、出隊。

  • 消息隊列 MQ

    消息+隊列,儲存消息的隊列。消息的傳輸過程中的容器;主要提供生産、消費接口供外部調用做資料的存儲和擷取。

1.1.3. 消息隊列的分類

​ MQ主要分為兩類:點對點(p2p)、釋出訂閱(Pub/Sub)

  • Peer-to-Peer

    一般基于Pull或者Polling接收資料

    發送到隊列中的消息被一個而且僅僅一個接收者所接受,即使有多個接收者在同一個隊列中偵聽同一消息

    即支援異步“即發即收”的消息傳遞方式,也支援同步請求/應答傳送方式

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-AC5OkIcq-1584667570107)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1568684012083.png)]

  • 釋出訂閱

    釋出到同一個主題的消息,可被多個訂閱者所接收

    釋出/訂閱即可基于Push消費資料,也可基于Pull或者Polling消費資料

    解耦能力比P2P模型更強

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-rba5lfGV-1584667570117)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1568684181173.png)]

1.1.4. p2p和釋出訂閱MQ的比較

  • 共同點:

    消息生産者生産消息發送到queue中,然後消息消費者從queue中讀取并且消費消息。

  • 不同點:

    p2p模型包括:消息隊列(Queue)、發送者(Sender)、接收者(Receiver)

    一個生産者生産的消息隻有一個消費者(Consumer)(即一旦被消費,消息就不在消息隊列中)。比如說打電話。

    pub/Sub包含:消息隊列(Queue)、主題(Topic)、釋出者(Publisher)、訂閱者(Subscriber)

    每個消息可以有多個消費者,彼此互不影響。比如我釋出一個微網誌:關注我的人都能夠看到。

1.1.5. 消息系統的使用場景

  • 解耦 各系統之間通過消息系統這個統一的接口交換資料,無須了解彼此的存在
  • 備援 部分消息系統具有消息持久化能力,可規避消息處理前丢失的風險
  • 擴充 消息系統是統一的資料接口,各系統可獨立擴充
  • 峰值處理能力 消息系統可頂住峰值流量,業務系統可根據處理能力從消息系統中擷取并處理對應量的請求
  • 可恢複性 系統中部分鍵失效并不會影響整個系統,它恢複會仍然可從消息系統中擷取并處理資料
  • 異步通信 在不需要立即處理請求的場景下,可以将請求放入消息系統,合适的時候再處理

1.1.6. 常見的消息系統

  • RabbitMQ Erlang編寫,支援多協定AMQP,XMPP,SMTP,STOMP。支援負載均衡、資料持久化。同時支援Peer-to-Peer和釋出/訂閱模式。
  • Redis 基于Key-Value對的NoSQL資料庫,同時支援MQ功能,可做輕量級隊列服務使用。就入隊操作而言,Redis對短消息(小于10kb)的性能比RabbitMQ好,長消息性能比RabbitMQ差。
  • ZeroMQ 輕量級,不需要單獨的消息伺服器或中間件,應用程式本身扮演該角色,Peer-to-Peer。它實質上是一個庫,需要開發人員自己組合多種技術,使用複雜度高。
  • ActiveMQ JMS實作,Peer-to-Peer,支援持久化、XA(分布式)事務
  • Kafka/Jafka 高性能跨語言的分布式釋出/訂閱消息系統,資料持久化,全分布式,同時支援線上和離線處理
  • MetaQ/RocketMQ 純Java實作,釋出/訂閱消息系統,支援本地事務和XA分布式事務

1.2. Kafka簡介

1.2.1. 簡介

Kafka是分布式的釋出—訂閱消息系統。它最初由LinkedIn(領英)公司釋出,使用Scala語言編寫,與2010年12月份開源,成為Apache的頂級項目。Kafka是一個高吞吐量的、持久性的、分布式釋出訂閱消息系統。它主要用于處理活躍live的資料(登入、浏覽、點選、分享、喜歡等使用者行為産生的資料)。
           

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-0GmDrs18-1584667570120)(assets/kafka-apis.png)]

三大特點:

  • 高吞吐量

    可以滿足每秒百萬級别消息的生産和消費——生産消費。

  • 持久性

    有一套完善的消息存儲機制,確定資料的高效安全的持久化——中間存儲。

  • 分布式

    基于分布式的擴充和容錯機制;Kafka的資料都會複制到幾台伺服器上。當某一台故障失效時,生産者和消費者轉而使用其它的機器——整體

  • 健壯性。

1.2.2. 設計目标

  • 高吞吐率 在廉價的商用機器上單機可支援每秒100萬條消息的讀寫
  • 消息持久化 所有消息均被持久化到磁盤,無消息丢失,支援消息重放
  • 完全分布式 Producer,Broker,Consumer均支援水準擴充
  • 同時适應線上流處理和離線批處理

1.2.3. kafka核心的概念

一個MQ需要哪些部分?生産、消費、消息類别、存儲等等。

對于kafka而言,kafka服務就像是一個大的水池。不斷的生産、存儲、消費着各種類别的消息。那麼kafka由何組成呢?

Kafka服務:
  • Topic:主題,Kafka處理的消息的不同分類。
  • Broker:消息伺服器代理,Kafka叢集中的一個kafka服務節點稱為一個broker,主要存儲消息資料。存在硬碟中。每個topic都是有分區的。
  • Partition:Topic實體上的分組,一個topic在broker中被分為1個或者多個partition,分區在建立topic的時候指定。
  • Message:消息,是通信的基本機關,每個消息都屬于一個partition

Kafka服務相關

  • Producer:消息和資料的生産者,向Kafka的一個topic釋出消息。
  • Consumer:消息和資料的消費者,定于topic并處理其釋出的消息。
  • Zookeeper:協調kafka的正常運作。

2. kafka的分布式安裝

2.1. 版本下載下傳

安裝包:http://archive.apache.org/dist/kafka/1.1.1/kafka_2.12-1.1.1.tgz

源碼包:http://archive.apache.org/dist/kafka/1.1.1/kafka-1.1.1-src.tgz

2.2. 安裝過程

  1. 解壓

​ [bigda[email protected] app]$ tar -zxvf ~/soft/kafka_2.11-1.1.1.tgz -C app/

  1. 重命名

​ [[email protected] app]$ mv kafka_2.11-1.1.1/ kafka

  1. 添加環境變量

​ [[email protected] kafka]$ vim ~/.bash_profile

export KAFKA_HOME=/home/bigdata/app/kafka
export PATH=$PATH:$KAFKA_HOME/bin
           

​ [[email protected] kafka]$ source ~/.bash_profile

  1. 配置

​ 修改$KAFKA_HOME/config/server.properties

broker.id=11 ## 目前kafka執行個體的id,必須為整數,一個叢集中不可重複
log.dirs=/home/bigdata/data/kafka ## 生産到kafka中的資料存儲的目錄,目錄需要手動建立
zookeeper.connect=bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka ## kafka資料在zk中的存儲目錄
           
  1. 同步到其它機器
scp -r kafka/ [email protected]:/home/hadoop/app/
scp -r kafka/ [email protected]:/home/hadoop/app/
           
  1. 修改配置檔案中的broker.id
broker.id=12 ##bigdata02
broker.id=13 ##bigdata03
           
  1. 建立資料目錄
mkdir -p /home/bigdata/data/kafka
           
  1. 啟動kafka服務
~/app/kafka/bin/kafka-server-start.sh -daemon ~/app/kafka/config/server.properties 
           

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-3rxE49Jf-1584667570122)(assets/1564974407794.png)]

  1. kafka服務測試

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-enHaYrvT-1584667570123)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1564974489574.png)]

    ​ 隻要我們配置的kafka的服務都在zookeeper中進行了注冊,便說明kafka已經安裝啟動成功

2.3. kafka在zookeeper中的目錄說明

/kafka
    /cluster		
    	/id  {"version":"1","id":"Pks8sWZUT6GBJHqyVGQ5OA"}  ---》代表的是一個kafka叢集包含叢集的版本,和叢集的id
    /controller  {"version":1,"brokerid":11,"timestamp":"1564976668049"} -->controller是kafka中非常重要的一個角色,意為控制器,控制partition的leader選舉,topic的crud操作。brokerid意為由其id對應的broker承擔controller的角色。
    /controller_epoch 2 代表的是controller的紀元,換句話說是代表controller的更疊,每當controller的brokerid更換一次,controller_epoch就+1.
    /brokers
       /ids	 [11, 12, 13] --->存放目前kafka的broker執行個體清單
       /topics	[hadoop, __consumer_offsets] ---->目前kafka中的topic清單
       /seqid	系統的序列id
    /consumers --->老版本用于存儲kafka消費者的資訊,主要儲存對應的offset,新版本中基本不用,此時使用者的消費資訊,儲存在一個系統的topic中:__consumer_offsets
    /config	--->存放配置資訊
           

3. Kafka的基本操作

3.1. kafka的topic的操作

​ topic是kafka非常重要的核心概念,是用來存儲各種類型的資料的,是以最基本的就需要學會如何在kafka中建立、修改、删除的topic,以及如何向topic生産消費資料。

​ 關于topic的操作腳本:kafka-topics.sh

  1. 建立topic
[[email protected] kafka]$ bin/kafka-topics.sh --create \
--topic hadoop \	## 指定要建立的topic的名稱
--zookeeper 
hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka \ ##指定kafka關聯的zk位址
--partitions 3 \		##指定該topic的分區個數
--replication-factor 3	##指定副本因子

 bin/kafka-topics.sh --create --topic hadoop --zookeeper hadoop01:2181/kafka --partitions 3 --replication-factor 3
           

注意:指定副本因子的時候,不能大于broker執行個體個數,否則報錯:

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-SWRQALVN-1584667570125)(assets/1564976012893.png)]

zookeeper目錄變化

​ [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-Pgu7sQFi-1584667570127)(assets/1564976107745.png)]

kafka資料目錄的變化

​ [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-BCUkzhgu-1584667570130)(assets/1564976163903.png)]

  1. 檢視topic的清單
[[email protected] kafka]$ bin/kafka-topics.sh --list  \
--zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka
hadoop
           
  1. 檢視每一個topic的資訊
[[email protected] kafka]$ bin/kafka-topics.sh --describe --topic hadoop \
--zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka

Topic:hadoop	PartitionCount:3	ReplicationFactor:3		Configs:
Topic: hadoop	Partition: 0	Leader: 12	Replicas: 12,13,11	Isr: 12,13,11
Topic: hadoop	Partition: 1	Leader: 13	Replicas: 13,11,12	Isr: 13,11,12
Topic: hadoop	Partition: 2	Leader: 11	Replicas: 11,12,13	Isr: 11,12,13

Partition:	目前topic對應的分區編号
Replicas :  副本因子,目前kafka對應的partition所在的broker執行個體的broker.id的清單
Leader	 :  該partition的所有副本中的leader上司者,處理所有kafka該partition讀寫請求
ISR		 :  該partition的存活的副本對應的broker執行個體的broker.id的清單
           
  1. 修改一個topic
[[email protected] kafka]$ bin/kafka-topics.sh --alter --topic hadoop --partitions 4 --zookeeper bigdata01:2181/kafka
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
           

但是注意:partition個數,隻能增加,不能減少:

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-ptthEQf6-1584667570132)(assets/1564976954334.png)]

  1. 删除一個topic

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-ltfsUVww-1584667570133)(assets/1564977032114.png)]

  1. 生産資料
[[email protected] kafka]$ bin/kafka-console-producer.sh \
--topic hadoop \	-->指定資料被生産的topic
--broker-list bigdata01:9092,bigdata02:9092,bigdata03:9092 --->指定kafka的broker清單
           

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-sWE5hAU8-1584667570137)(assets/1564977283073.png)]

  1. 消費資料
[[email protected] kafka]$ bin/kafka-console-consumer.sh \
--topic hadoop \
--bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092
           

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-L8weFDuH-1584667570137)(assets/1564977398804.png)]

沒資料,原因在于消費者後于生産者啟動,在消費者啟動之前生産者消費的資料變不能直接擷取。

如果想要擷取消費者啟動之前生産者生産的資料,可以添加一個參數–from-beginning。

如圖所示:

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-mrG4eMdn-1584667570138)(assets/1564977543576.png)]

3.2. Kafka的資料消費的總結

​ kafka消費者在消費資料的時候,都是分組别的。不同組的消費不受影響,相同組内的消費,需要注意,如果partition有3個,消費者有3個,那麼便是每一個消費者消費其中一個partition對應的資料;如果有2個消費者,此時一個消費者消費其中一個partition資料,另一個消費者消費2個partition的資料。如果有超過3個的消費者,同一時間隻能最多有3個消費者能消費得到資料。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-XO49m37e-1584667570139)(assets/consumer-groups.png)]

bin/kafka-console-consumer.sh --topic spark \
 --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 \
 --group haha \		---》消費者對應的消費者組
 --offset earliest \		--->從什麼位置(消息的偏移量)開始消費
 --partition 2		---->消費哪一個分區中的資料	
           
offset:是kafka的topic中的partition中的每一條消息的辨別,如何區分該條消息在kafka對應的partition的位置,就是用該偏移量。offset的資料類型是Long,8個位元組長度。offset在分區内是有序的,分區間是不一定有序。如果想要kafka中的資料全局有序,就隻能讓partition個數為1。

​ [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-1Hi3lmql-1584667570140)(assets/log_anatomy.png)]

​ 在組内,kafka的topic的partition個數,代表了kafka的topic的并行度,同一時間最多可以有多個線程來消費topic的資料,是以如果要想提高kafka的topic的消費能力,應該增大partition的個數。

3.3. kafka的程式設計的api

3.3.1. 建立kafka的項目

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-MoRjIpnD-1584667570142)(assets/1564987565272.png)]

指定maven坐标

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-MvmWIFw3-1584667570148)(assets/1564987594037.png)]

指定存放目錄

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-eRnbq0yq-1584667570149)(assets/1564987640236.png)]

導入maven依賴

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>
<!-- 下面的依賴,包含了上面的kafka-clients,是以隻需要引入下面即可 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
</dependency>
           

3.3.2. kafka生産者的api操作

​ 入口類:Producer

  • 入門案例
public class MyKafkaProducer {
    public static void main(String[] args) throws IOException {
        /*
            K: --->代表的是向topic中發送的每一條消息的key的類型,key可以為null
            V: --->代表的是向topic中發送的每一條消息的value的類型
         */
        Properties properties = new Properties();
//        properties.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
        properties.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
        Producer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);

        //發送資料
        ProducerRecord<Integer, String> record = new ProducerRecord("spark", "11111");
        producer.send(record);
        producer.close();
    }
}
           

配置:

bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
           
  • 建立producer時需要指定的配置資訊
bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092 ## kafka的伺服器
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer ##Key的序列化器
value.serializer=org.apache.kafka.common.serialization.StringSerializer ##value的序列化器
acks=[0|-1|1|all] ##消息确認機制
	0:	不做确認,直管發送消息即可
	-1|all: 不僅leader需要将資料寫入本地磁盤,并确認,還需要同步的等待其它followers進行确認
	1:隻需要leader進行消息确認即可,後期follower可以從leader進行同步
batch.size=1024 #每個分區内的使用者緩存未發送record記錄的空間大小
## 如果緩存區中的資料,沒有沾滿,也就是任然有未用的空間,那麼也會将請求發送出去,為了較少請求次數,我們可以配置linger.ms大于0,
linger.ms=10 ## 不管緩沖區是否被占滿,延遲10ms發送request
buffer.memory=10240 #控制的是一個producer中的所有的緩存空間
retries=0 #發送消息失敗之後的重試次數
           
  • 修改配置檢視生産資料情況

配置檔案

bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
# 輸入進入分區的方式
#partitioner.class=

# the maximum amount of time the client will wait for the response of a request
# 請求逾時時間
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
# 使用send方法最大消息阻塞時間
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
linger.ms=5000

# the maximum size of a request in bytes
## 最大的請求大小
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
batch.size=1024

buffer.memory=10240
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
           

代碼

public class MyKafkaProducer {
    public static void main(String[] args) throws Exception {
        /*
            K: --->代表的是向topic中發送的每一條消息的key的類型,key可以為null
            V: --->代表的是向topic中發送的每一條消息的value的類型
         */
        Properties properties = new Properties();
//        properties.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
        properties.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
        Producer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);

        int start = 10;
        int end = start + 10;
        for (int i = start; i < end; i++) {
            //發送資料
            ProducerRecord<Integer, String> record = new ProducerRecord("spark", i,"11111");
            producer.send(record);
        }
        Thread.sleep(10000);
        producer.close();
    }
}
           

看到的現象

​ 延遲等待了5秒之後一次性将資料發送到cluster。

3.3.3. kafka消費者的api操作

​ 入口類:Consumer

配置檔案

bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092

# consumer group id
group.id=test-consumer-group

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
           

執行代碼

public class MyKafkaConsumer {
    public static void main(String[] args) throws Exception {
        //消費者
        Properties properties = new Properties();
        properties.load(MyKafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
        Consumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(properties);

        //訂閱topic
        consumer.subscribe(Arrays.asList("spark"));

        //從kafka對應的topic中拉取資料

        while (true) {
            ConsumerRecords<Integer, String> consumerRecords = consumer.poll(1000);
            for (ConsumerRecord<Integer, String> record : consumerRecords) {
                Integer key = record.key();
                String value = record.value();

                int partition = record.partition();
                long offset = record.offset();
                String topic = record.topic();
                System.out.println(String.format("topic:%s\tpartition:%d\toffset:%d\tkey:%d\tvalue:%s",
                        topic,
                        partition,
                        offset,
                        key,
                        value)
                );
            }
        }
    }
}
           

3.3.4. record進入分區的政策

​ 每一條producerRecord有,topic名稱、可選的partition分區編号,以及一對可選的key和value組成。

​ 三種政策進入分區

<p>
 If a valid partition number is specified that partition will be used when sending the record. If no partition is* specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is* present a partition will be assigned in a round-robin fashion.
</p>
           

1、如果指定的partition,那麼直接進入該partition

2、如果沒有指定partition,但是指定了key,使用key的hash選擇partition

3、如果既沒有指定partition,也沒有指定key,使用輪詢的方式進入partition

3.3.5. 自定義分區

  • 核心類
    public interface Partitioner extends Configurable, Closeable {
    
        /**
         * Compute the partition for the given record.
         *	計算給定記錄的分區
         * @param topic The topic name
         * @param key The key to partition on (or null if no key)
         * @param keyBytes key序列之後的位元組數組的形式
         * @param value The value to partition on or null
         * @param valueBytes value序列之後的位元組數組的形式
         * @param cluster The current cluster metadata 目前cluster的中繼資料資訊
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
        /**
         * This is called when partitioner is closed.
         	分區結束之後被調用
         */
        public void close(); 
    }
               
    public interface Configurable {
    
        /**
         * Configure this class with the given key-value pairs
         	指定目前producer的配置資訊
         */
        void configure(Map<String, ?> configs);
    
    }
               
  • 随機分區方式
    public class RandomPartitioner implements Partitioner {
    
        public void close() {
        }
    
        public void configure(Map<String, ?> configs) {
        }
        private Random random = new Random();
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            Integer partitionCount = cluster.partitionCountForTopic(topic);//傳回目前topic的partition個數
            int partition = random.nextInt(partitionCount);
            System.out.println("partition: " + partition);
            return partition;
        }
    }
               
    注冊使用
    partitioner.class=com.desheng.bigdata.kafka.partitioner.RandomPartitioner
               
  • hash分區方式
    public class HashPartitioner implements Partitioner {
        public void close() {
        }
    
        public void configure(Map<String, ?> configs) {
        }
    
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            Integer partCount = cluster.partitionCountForTopic(topic);
            int partition = Math.abs(key.hashCode()) % partCount;
            System.out.println("key: " + key + "partition: " + partition);
            return partition;
        }
    }
               
    注冊使用
    partitioner.class=com.desheng.bigdata.kafka.partitioner.HashPartitioner
               
  • 輪詢分區方式
    public class RoundRobinPartitioner implements Partitioner {
    
        public void close() {
        }
    
        public void configure(Map<String, ?> configs) {
        }
        //定義一個原子計數器
        private AtomicInteger count = new AtomicInteger();
        
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            int parCount = cluster.partitionCountForTopic(topic);
            int partition = count.getAndIncrement() % parCount;
            System.out.println("key: " + key + "\tpartition: " + partition);
            return partition;
        }
    }
               
    注冊使用
    partitioner.class=com.desheng.bigdata.kafka.partitioner.RoundRobinPartitioner
               

4. flume和kafka的整合

​ flume主要是做日志資料(離線或實時)地采集。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-BXMJ6oAl-1584667570151)(assets/1564995281225.png)]

上圖,顯示的是flume采集完畢資料之後,進行的離線處理和實時處理兩條業務線,現在再來學習flume和kafka的整合處理。

  • 建立整合的topic
    [[email protected] kafka]$ bin/kafka-topics.sh --create \
    --topic flume-kafka \
    --zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka \
    --partitions 3 \
    --replication-factor 3
    Created topic "flume-kafka".
               
  • 調整flume-agent配置檔案

flume-kafka-sink-1903.conf

##a1就是flume agent的名稱
## source r1
## channel c1
## sink k1
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444

# 修改sink為kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sinks.k1.kafka.topic = flume-kafka
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           
  • 啟動flume和kafka的整合測試
    1. 消費者監聽讀取的資料
    [[email protected] kafka]$ bin/kafka-console-consumer.sh --topic flume-kafka \
    --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 \
    --from-beginning
               
    1. 啟動flume-agent
    [[email protected] flume]$ nohup bin/flume-ng agent -n a1 -c conf -f conf/flume-kafka-sink-1903.conf >/dev/null 2>&1 &
               
    ​ 3. 發送資料

​ [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-w7RACahz-1584667570152)(assets/1564996200020.png)]

​ 4. 接收資料

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-QM8XiZ1y-1584667570153)(assets/1564996233047.png)]

5. Kafka架構之道

5.1. Kafka基本架構

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-aQsAnu2J-1584667570154)(assets/1565054112297.png)]

​ 補充說明:

​ 其一:Kafka中的broker對于調用者而言都是透明的,也就是說各個broker的地位都是一樣的,但是在kafka内部有區分,主要就是controller和非controller之分,controller的角色我們可以在zookeeper的對應目錄/kafka/controller中擷取對應的brokerid。

​ 其二:在kafka1.0以下的版本中使用zk來儲存kafka消費者的offset(目錄為/kafka/consumers/**),但是在kafka1.0以上,不再使用zookeeper來儲存,主要原因在于,避免zookeeper負載過高,造成相關聯的架構無法使用,此時在kafka提供了一個系統級别的topic:__consumer_offsets來報錯偏移量資訊。

5.2. Topic & Partition

  • Topic

    邏輯概念,同一個Topic的消息可分布在一個或多個節點(Broker)上

    一個Topic包含一個或者多個Partition

    每條資訊都屬于且僅屬于一個Topic

    Producer釋出資料是,必須制定該消息釋出到哪一個Topic

    Consumer訂閱消息時,也必須制定訂閱哪個Topic的消息

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-rNNp5PGA-1584667570154)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565054853621.png)]

  • Partition

    實體概念,一個Partition隻分布在一個Broker上(不考慮備份)

    一個partition實體上對應一個檔案夾

    一個Partition包含多個Segment(Segment對使用者透明)

    一個Segment對應一個檔案,Segment由一個個不可變記錄組成

    記錄隻會被append到Segment中,不會被單獨删除或者修改

    清除過期日志時,直接删除一個或多個Segment

    segment檔案(log檔案)檔案名規範: 這個檔案裡面第一條消息的offset - 1

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-j0ROqiM1-1584667570155)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565054873724.png)]

5.3. Kafka消息flush和Retention政策

  • flush政策
    ############################# Log Flush Policy #############################
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    ## 每當每一個topic接收到10000條message的時候,就會将資料flush到磁盤
    log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #每個1s flush一次資料
    log.flush.interval.ms=1000
               
    ​ 為了提供kafka的讀寫資料能力,首先接收資料到kafka記憶體,不可能無限制的儲存在記憶體,是以必然會将資料flush到磁盤(partition的segement)檔案,在flush的時候做了Durability和Latency和Throughput的權衡與取舍。
  • retention政策
    ############################# Log Retention Policy #############################
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    # 日志最小的保留時間:7天,超過這個時間,資料可能會被清理掉
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned(裁剪) from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    ## segement檔案如果超過log.retention.bytes的配置,将會可能被裁剪,直到小于log.retention.bytes配置
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    # 一個segment檔案最大的大小,超過log.segment.bytes一個G,将會建立一個新的segment檔案
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    ## 每隔5分鐘,檢測一次retention政策是否達到
    log.retention.check.interval.ms=300000
               
    ​ partition對應的檔案,就儲存在一個個的segment檔案中,每一個檔案預設大小是1G,但是log.retention.check.interval.ms監測頻率是5分鐘一次,是以segment檔案可能會超過1G,此時就會啟動retion政策,将檔案裁剪到log.retention.bytes配置,如果超過了log.segment.bytes=1G配置,将會建立一個新的segment檔案;預設情況,segment檔案會保留7天。

5.4. Kafka消息檢索原理

  • message的實體結構

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-7pLDj1rp-1584667570157)(assets/1565057999975.png)]

  • .index檔案和.log檔案說明

    partition分區目錄下的檔案清單,主要包含兩種類型的檔案 x.index索引檔案和x.log segment檔案,其中x.log儲存的是message資訊,x.index儲存的是索引資料。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-NDNoFRs7-1584667570161)(assets/1565058218765.png)]

​ 這二者檔案的大緻結果如下:

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-Sgbl1A7f-1584667570163)(assets/1565058346263.png)]

為什麼會出現消息offset和檔案中的偏移量不一樣的問題?

因為一個partition下面有多個segment檔案,segment檔案當達到retention政策之後将會被裁剪或删除,同時partition中的offset是單調遞增的,從0開始增加,但是segment檔案中的消息在該檔案中的偏移量指的是檔案開頭到該檔案走過的位元組長度,顯然這兩個不一樣。

​ 是以,直接根據msg的offset是無法直接讀取到消息的,那怎麼辦?是以此時就需要俺們的x.index中儲存的相對偏移量來幫忙了。

​ x.index中儲存的内容:

​ a. index檔案的序号就是message在日志檔案中的相對偏移量

​ b. OffsetIndex是稀疏索引,也就是說不會存儲所有的消息的相對offset和position

​ 也就是說index檔案的序号對應的是log檔案中的消息偏移量;index檔案中的位址欄對應的是log檔案中檔案中的便宜位元組。

  • 通過指令檢視segment檔案内容

​ 這些資訊都是可以通過kafka的指令來檢視的。

kafka-run-class.sh kafka.tools.DumpLogSegments \
--print-data-log \		--->列印讀取到的segment日志檔案内容
--files 00000000000000000000.log	--->指定讀取的segment日志檔案
           

讀取到的資料格式如下:

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-olx6M3oZ-1584667570164)(assets/1565059427033.png)]

其中的offset是該條message在該partition中的偏移量,position為該條消息在該檔案中的位元組偏移量。

  • 消息檢索過程

    以這個partition目錄下面,00000000001560140916為例

    定位offset 為1560140921的message

  1. 定位到具體的segment日志檔案

    由于log日志檔案的檔案名是這個檔案中第一條消息的(offset-1). 是以可以根據offset定位到這個消息所在日志檔案:00000000001560140916.log

  2. 計算查找的offset在日志檔案的相對偏移量

    segment檔案中第一條消息的offset = 1560140917

    計算message相對偏移量:需要定位的offset - segment檔案中第一條消息的offset + 1 = 1560140921 - 1560140917 + 1 = 5

    查找index索引檔案, 可以定位到該消息在日志檔案中的偏移位元組為456. 綜上, 直接讀取檔案夾00000000001560140916.log中偏移456位元組的資料即可。

    1560140922 -1560140917 + 1 = 6

    如果查找的offset在日志檔案的相對偏移量在index索引檔案不存在, 可根據其在index索引檔案最接近的上限偏移量,往下順序查找

6. Kafka Leader Election

6.0. zookeeper基本複習

  • zookeeper簡介&分布式事務

​ ZooKeeper是一個高性能分布式應用協調服務,本質上就是一個分布式檔案系統。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-8sfEsega-1584667570164)(assets/1565060171357.png)]

​ 主要的作用:Name Service 配置管理、Leader Election 服務發現、Group Service 組服務、分布式隊列、兩階段送出。

​ 這裡提到了一個兩段式送出(two-phase-commit),又名分布式事務。分布式事務是相對于集中式事務而言,集中式事務非常簡單,事務的開啟,事務的送出都在一台機器上面完成;下面來看分布式事務:

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-q9ix7nJc-1584667570166)(assets/1565061147113.png)]

  • zookeeper工作的方式

    Zookeeper叢集包含1個Leader,多個Follower

    所有的Follower都可提供讀服務

    所有的寫操作都會被forward到Leader

    Client與Server通過NIO通信

    全局串行化所有的寫操作

    保證同一用戶端的指令被FIFO執行

    保證消息通知的FIFO

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-IkBgkHbV-1584667570167)(assets/1565062272147.png)]

6.1. zookeeper zab協定

​ zab(zookeeper atomic boradcast)協定被稱之為zookeeper原子廣播協定,zab協定保證所有的zookeeper的讀寫操作的正常執行。

​ zab協定有兩種模式:廣播模式(正常情況)和恢複模式(非正常情況)

  • 廣播模式

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-looPgeDy-1584667570168)(assets/1565062935428.png)]

  1. Leader将所有更新(稱之為proposal),順序發送給Follower
  2. 當Leader收到半數以上的Follower對此proposal的ACK是,即向所有的Follower發送commit消息,并在本地commit該消息
  3. Follower收到Proposal後即将該Proposal寫入磁盤,寫入成功即傳回ACK給Leader
  4. 每個Proposal都有一個唯一的單調遞增的proposal ID,即zxid

說明:

1、查詢和維護管理者指令之包含1和6,即不會和Leader打交道,響應是及時響應。

2、Create、Delete、SetACL、SetData、CreateSession、CloseSession等指令要經過上圖的六個過程。

3、Client與Follower之間采用NIO通信,Leader與Follower之間采用的是TCP/IP模式。

  • 恢複模式
  • 進入恢複模式 當Leader當機或者丢失大多數Follower後,即進入恢複模式
  • 結束恢複模式 新上司被選舉出來,且大多數Follower完成了與Leader的狀态同步後,恢複模式即結束,進而進入廣播模式
  • 恢複模式的意義
  1. 發現叢集中被commit的proposal的最大zxid
  2. 建立新的epoch,進而保證之前的Leader不能再commit新的Proposal
  3. 叢集中大部分節點都commit過前一個Leader commit過的消息,而新的Leader是被大部分節點所支援的,是以被之前Leader commit過的Proposal不會丢失,至少被一個節點所儲存
  4. 新Leader會與所有Follower通信,進而保證大部分節點都擁有最新的資料

zxid的構成

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-GDST3GLx-1584667570168)(assets/1565063369452.png)]

​ 由兩部分組成,高32位為epoch紀元,每當leader發生一次變換,epoch+1;低32位為counter計數器,每當資料被更新一次,counter+1。

​ - 恢複模式中的保證階段

  1. 若一條消息在一台機器上被deliver,那麼該消息必須将在每台機器上deliver,及時那台機器故障了
  2. 一個被skip的消息,必須仍然需要被skip

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-60q19X3i-1584667570169)(assets/1565063715998.png)]

  • zookeeper的一緻性

    學習cap定義:http://www.ruanyifeng.com/blog/2018/07/cap.html

    順序一緻性 從一個用戶端發出的更新操作會按發送順序被順序執行

    原子性 更新操作要麼成功要麼失敗,無中間狀态

    單一系統鏡像 一個用戶端隻會看到同一個view,無論它連到哪台伺服器

    可靠性

    一旦一個更新被應用,該更新将被持久化,直到用戶端更新該結果

    如果一個用戶端得到更新成功的狀态碼,則該更新一定已經生效

    任何一個被用戶端通過讀或者更新“看到”的結果,将不會被復原,即使是從失敗中恢複

    實時性 保證用戶端可在一定時間(通常是幾十秒)内看到最新的視圖

  • zookeeper在使用過程需要注意的問題

    隻保證同一用戶端的單一系統鏡像,并不保證多個不同用戶端在同一時刻一定看到同一系統鏡像,如果要實作這種效果,需要在讀取資料之前調用sync操作。

    zookeeper讀性能好于寫性能,因為任何Server均可提供讀服務,而隻有Leader可提供寫服務

    為了保證ZooKeeper本身的Leader Election順利進行,通常将Server設定為奇數

    若需容忍f個Server的失敗,必須保證有2f+1個以上的Server

6.2. zookeeper的基本操作

6.2.1. zookeeper原生操作

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-v9swkcPZ-1584667570170)(assets/1565074029195.png)]

public class ZooKeeperTest1 {
    static ZooKeeper zooKeeper = null;
    public static void main(String[] args) throws Exception {
        String connectString = "bigdata01:2181,bigdata02:2181,bigdata03:2181";
        int sessionTimeout = 6000;

        Watcher watcher = new Watcher() {
            //一旦監聽到對應目錄的變化,該方法将會被觸發
            public void process(WatchedEvent event) {
                String path = event.getPath();
                switch (event.getType().getIntValue()) {
                    case 1 ://NodeCreated
                        System.out.println("節點被建立");
                    break;
                    case 2 ://NodeDeleted
                        System.out.println("節點被删除");
                    break;
                    case 3 ://NodeDataChanged
                        System.out.println("節點資料被更新");
                    break;
                    case 4 ://NodeChildrenChanged
                        System.out.println("子節點被更新");
                    break;
                    default:
                        break;
                }
                System.out.println("path: " + path);
            }
        };
        zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
        zooKeeper.register(watcher);
        String path = "/test/seq";
        zooKeeper.create(path, "seq".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        Thread.sleep(10000);
        zooKeeper.close();
    }
}
           

6.2.2. zookeeper的用戶端架構——curator

  • 簡介

    網址:http://curator.apache.org

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-8HPOBueB-1584667570171)(assets/1565075585322.png)]

  • maven依賴
<!-- curator -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>2.6.0</version>
</dependency>
           
  • 基本操作——CRUD
public class CuratorFrameworkTest {
    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                                          .connectString("bigdata01:2181,bigdata02:2181,bigdata03:2181")
                                          .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                                          .build();
        //使用之前必須要start
        client.start();

        //c
        String path = client.create()
                              .creatingParentsIfNeeded()//如是多級目錄需要建立,會進行多級目錄遞歸建立
                              .withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                              .forPath("/zk/one", "one".getBytes());
        System.out.println("path: " + path);
        // 查詢
        List<String> children = client.getChildren().forPath("/zk");
        for(String child : children) {
            System.out.println(child);
        }
        // 擷取資料
        byte[] bytes = client.getData().forPath("/zk/one");
        System.out.println("/zk/one對應的資料:" + new String(bytes));
        //删除
        client.delete()
                .deletingChildrenIfNeeded()//遞歸删除
                .forPath("/zk/one");
        client.close();
    }
}
           
  • curator完成服務的監聽
/**
 * 使用curatorFramework來完成服務的發現
 *  既然要進行服務的發現,是以就需要一個監聽器watcher
 *  監聽/zk目錄
 */
public class CuratorServiceDiscover implements Watcher {
    private String path = "/zk";
    private CuratorFramework client;
    private List<String> children;
    public CuratorServiceDiscover() {
        try {
            client = CuratorFrameworkFactory.builder()
                                  .connectString("bigdata01:2181,bigdata02:2181,bigdata03:2181")
                                  .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                                  .build();
            //使用之前必須要start
            client.start();
            //注冊監聽的服務
            children = client.getChildren().usingWatcher(this).forPath(path);
            System.out.println("初始監聽目錄節點資訊:" + children);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {
        try {
            System.out.println("目前方法會被調用-----");
            //要想以後反複的監聽對應的目錄,就需要重新注冊監聽服務
            List<String> newChildren = client.getChildren().usingWatcher(this).forPath(path);
            if(newChildren.size() > children.size()) {//新增 a b | a b c
                for(String child : newChildren) {
                    if(!children.contains(child)) {
                        System.out.println("新增節點:" + child);
                    }
                }
            } else {//減少
                for(String child : children) {
                    if(!newChildren.contains(child)) {
                        System.out.println("被删除的節點:" + child);
                        MailUtil.sendMail("大寫的警告!!!", "相關伺服器當機啦!!!!" + child);
                    }
                }
            }
            children = newChildren;//更新資料
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        CuratorServiceDiscover csd = new CuratorServiceDiscover();
        csd.start();//執行服務的發現
    }

    private void start() {
        for(;;){}
    }
}
           
  • 發送郵件相關

    依賴

    <dependency>
        <groupId>javax.mail</groupId>
        <artifactId>mail</artifactId>
        <version>1.4</version>
    </dependency>
               
    工具類
public class MailUtil {
    public static void sendMail(String subject, String content) {
        Transport t = null;
        Session session = null;
        try {
            // 會話需要的相關資訊
            Properties prop = new Properties();
            prop.setProperty("mail.transport.protocol", "smtp");// 發送郵件的協定
            prop.setProperty("mail.smtp.host", "smtp.126.com");// 使用的郵箱伺服器
            prop.setProperty("mail.smtp.auth", "true");
            session = Session.getInstance(prop);

            // 建立郵件
            MimeMessage message = new MimeMessage(session);
            InternetAddress fromAddr = new InternetAddress("xxx", "老李");// 發件人的資訊
            InternetAddress toAddr = new InternetAddress("xxx", "小美");// 收件人的資訊
            message.setFrom(fromAddr);// 在信封上寫上
            message.setRecipient(Message.RecipientType.TO, toAddr);

            message.setSubject(subject);
            message.setContent(content, "text/html;charset=UTF-8");

            // 發送郵件
            t = session.getTransport();
            t.connect("xxx", "abc123");//這裡登陸的時候最好使用126郵箱經過認證之後的密碼
            t.sendMessage(message, message.getAllRecipients());
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                if(t != null) {
                    t.close();
                }
            } catch (MessagingException e) {
                e.printStackTrace();
            }
        }
    }
}
           

6.3. zookeeper的leader選舉政策

6.3.1. 原生zookeeper的leader的選舉政策

  • 非公平模式

是一個搶注Leader節點的操作

  1. 建立Leader父節點,如/chroot,并将期設定為persist節點
  2. 各用戶端通過在/chroot下建立Leader節點,如/chroot/leader,來競争Leader。該節點應被設定為ephemeral
  3. 若某建立Leader節點成功,則該用戶端成功競選為Leader
  4. 若建立Leader節點失敗,則競選Leader失敗,在/chroot/leader節點上注冊exist的watch,一旦該節點被删除則獲得通知
  5. Leader可通過删除Leader節點來放棄Leader
  6. 如果Leader當機,由于Leader節點被設定為ephemeral,Leader節點會自行删除。而其它節點由于在Leader節點上注冊了watch,故可得到通知,參與下一輪競選,進而保證總有用戶端以Leader角色工作
  • 公平模式

按照先來後到的資料進行leader選舉

  1. 建立Leader父節點,如/chroot,并将其設定為persist節點
  2. 各用戶端通過在/chroot下建立Leader節點,如/chroot/leader,來競争Leader。該節點應被設定為ephemeral_sequential
  3. 用戶端通過getChildren方法擷取/chroot/下所有子節點,如果其注冊的節點的id在所有子節點中最小,則目前用戶端競選Leader成功
  4. 否則,在前面一個節點上注冊watch,一旦前者被删除,則它得到通知,傳回step 3(并不能直接認為自己成為新Leader,因為可能前面的節點隻是當機了)
  5. Leader節點可通過自行删除自己建立的節點以放棄Leader

6.3.2. curator的leader的選舉政策

  • maven依賴
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.6.0</version>
    </dependency>
               
  • latch

    spark中zookeeper的操作使用的curator,完成leader選舉是基于leaderLatch的方式進行選舉。

    1. 競選為Leader後,不可自行放棄上司權(leadership)
    2. 隻能通過close方法放棄上司權
    3. 強烈建議增加ConnectionStateListener,當連接配接SUSPENDED或者LOST時視為丢失上司權
    4. 可通過await方法等待成功擷取上司權,并可加入timeout
    5. 可通過hasLeadership方法判斷是否為Leader
    6. 可通過getLeader方法擷取目前Leader
    7. 可通過getParticipants方法擷取目前競選Leader的參與方
    代碼實作過程:
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.leader.LeaderLatch;
    import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    public class LeaderLatchDemo {
        public static void main(String[] args) throws Exception {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
            CuratorFramework curator = CuratorFrameworkFactory.newClient("bigdata01:2181", retryPolicy);
    
    
            LeaderLatch leaderLatch = new LeaderLatch(curator, "/leaderlatch", "participant1");
            leaderLatch.addListener(new LeaderLatchListener() {
                @Override
                public void isLeader() {
                    System.out.println("I'm the leader now");
                }
    
                @Override
                public void notLeader() {
                    System.out.println("I relinquish the leadership");
                }
            });
            curator.start();
            leaderLatch.start();
            leaderLatch.await();
            System.out.println("Is leader " + leaderLatch.hasLeadership());
            System.in.read();
            System.out.println("After delete node, Is leader " + leaderLatch.hasLeadership());
            System.in.read();
            System.out.println("After delete node, Is leader " + leaderLatch.hasLeadership());
            System.in.read();
            System.out.println("After reelect node, Is leader " + leaderLatch.hasLeadership());
            leaderLatch.close();
            System.in.read();
            System.out.println("After close, Is leader " + leaderLatch.hasLeadership());
            curator.close();
            Thread.sleep(100000);
        }
    }
               
    通過代碼,我們可以看到,leaderLatch的方式在zk的相關目錄下面建立的順序的臨時節點ephemeral_sequential,最先建立則為leader,當leader挂掉之後,後序節點通過zk的公平模式選舉。
  • selector
    1. 競選Leader成功後回調takeLeadership方法
    2. 可在takeLeadership方法中實作業務邏輯
    3. 一旦takeLeadership方法傳回,即視為放棄上司權
    4. 可通過autoRequeue方法循環擷取上司權
    5. 可通過hasLeadership方法判斷是否為Leader
    6. 可通過getLeader方法擷取目前Leader
    7. 可通過getParticipants方法擷取目前競選Leader的參與方

代碼實作過程:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class LeaderSelectorDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
        CuratorFramework curator = CuratorFrameworkFactory.newClient("bigdata01:2181", retryPolicy);

        LeaderSelector leaderSelector = new LeaderSelector(curator, "/leaderselector",
                new CustomizedAdapter());
        leaderSelector.autoRequeue();//循環擷取leader的上司權
        curator.start();
        leaderSelector.start();//通過start方法發起leader競争,競争成功之後調用takeLeadership

        Thread.sleep(1000000);
        leaderSelector.close();
        curator.close();
    }
    public static class CustomizedAdapter extends LeaderSelectorListenerAdapter {
        /**
         * 競争成功之後調用takeLeadership,當該方法執行完畢之後釋放leadership
         * @param client
         * @throws Exception
         */
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            System.out.println("Take the leadership --> ");
            Thread.sleep(3000);
        }
    }
}
           

​ 通過代碼示範,我們發現,一直邢灣takeLeadership方法,則會釋放leadership,是以在生産中如果業務邏輯相對較少,那麼會造成leadership的頻繁切換。一般情況下不用這種方式進行leader的選舉。

6.4. Kafka controller的選舉過程

​ 說明:kafka的controller的選舉是在所有kafka節點啟動的時候發生的,或者當controller挂掉,再從其餘的broker中選舉出一台作為controller。

​ 是以檢視controller的選舉入口,最簡單就是kafka的啟動,通過kafka-server-start.sh腳本,發現該類為kafka.Kafka。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-LuHaIKyD-1584667570171)(assets/01kafka啟動并選舉controller.png)]

7. Kafka高性能之道

7.1. 高性能原因

  • 高效使用磁盤
    1. 順序寫磁盤 順序寫磁盤性能高于随機寫記憶體
    2. Append Only 資料不更新,無記錄級的資料删除(隻會整個segment删除)
    3. 充分利用Page Cache

      I/O Scheduler将連續的小塊寫組裝成大塊的實體寫進而提高性能

      I/O Scheduler會嘗試将一些寫操作重新按順序排好,進而減少磁盤頭的移動時間

    4. 充分利用所有空閑記憶體(非JVM記憶體)

      應用層cache也會有對應的page cache與之對應,直接使用page cache可增大可用cache,如使用heap内的cache,會增加GC負擔

    5. 讀操作可直接在page cache内進行。如果程序重新開機,JVM内的cache會失效,但page cache仍然可用
    6. 可通過如下參數強制flush,但并不建議這麼做

      log.flush.interval.messages=10000

      log.flush.interval.ms=1000

      8 . 支援多Directory

  • 零拷貝

    傳統模式下資料從檔案傳輸到網絡需要4次資料拷貝,4次上下文切換和2次系統調用

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-Q91hfsAA-1584667570172)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565082317862.png)]

    通過NIO的transferTo/transferFrom調用作業系統的sendfile實作零拷貝。總共2次核心資料拷貝,2次上下文切換和1次系統調用,消除了CPU資料拷貝

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-DCVJ0I5Y-1584667570172)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565082333709.png)]

  • 批處理和壓縮
    1. Producer和Consumer均支援批量處理資料,進而減少了網絡傳輸的開銷
    2. Producer可将資料壓縮後發送給broker,進而減少網絡傳輸代價。目前支援Snappy,Gzip和LZ4壓縮
  • Partition
    1. 通過Partition實作了并行處理和水準擴充
    2. Partition是Kafka(包括kafka Stream)并行處理的最小機關
    3. 不同Partition可處于不同的Broker,充分利用多機資源
    4. 同一Broker上的不同Partition可置于不同的Directory,如果節點上有多個Disk Drive,可将不同的Drive對應的Directory,進而是Kafka充分利用Disk Drive的磁盤優勢
  • ISR
    1. ISR實作了可用性和一緻性的動态平衡

      replica.log.time.max.ms=10000

      replica.log.max.messages=4000

    2. ISR可容忍更多的節點失敗

      Majority Quorum如果要容忍f個節點失敗,至少需要2f+1個節點

      ISR如果要容忍f個節點失敗,至少需要f+1個節點

    3. 如何處理Replica Crach

      Leader crash後,ISR中的任何replica皆可競選稱為Leader

      如果所有replica都crash,可選擇讓第一個recover的replica或者第一個在ISR中的replica稱為leader

      unclean.leader.election.enable=true

7.2. kafka性能影響因子

  • producer

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-VUtFM5AF-1584667570172)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565082879488.png)]

    producer和吞吐量成正比

  • consumer

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-xmpy3YNF-1584667570173)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565082958412.png)]

    ​ consumer資料量在沒有達到partition個數之前,和消費的吞吐量成正比。

  • partition

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-1AN3iO84-1584667570176)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565083078640.png)]

    分區格式和生成的吞吐量,在一定範圍内,先增長,當達到某一個值之後區域穩定,在上下浮動。

  • message-size

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-2MmfWLa7-1584667570176)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565083235267.png)]

    随着message size的增大,生産者對應的每秒生成的記錄數在成下降趨勢,區裡的資料體積成上升趨勢。

  • replication

    [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-E3VTM41l-1584667570177)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565083352898.png)]

    ​ 副本越大,自然需要同步資料的量就越多,自然kafka的生成的吞吐量就越低。

  • 借助kafka腳本來檢視kafka叢集性能
    1. kafka-producer-perf-test.sh

      通過該腳本檢視kafka生産者的性能。

      [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-JVDav9tq-1584667570177)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565083730492.png)]

      bin/kafka-producer-perf-test.sh --topic spark \
      --num-records 100000 \		-->測試生成多少條記錄
      --throughput 10000 \		--->生産這的吞吐量,約等于messages/sec
      --record-size 10 \			-->每條消息的大小
      --producer.config config/producer.properties
                 
    2. kafka-consumer-perf-test.sh

      通過該腳本檢視kafka消費者的性能。

      [外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-i8F0htSw-1584667570178)(E:/%E5%A4%A7%E6%95%B0%E6%8D%AE/spark/kafka1/%E6%96%87%E6%A1%A3/assets/1565083980326.png)]

      bin/kafka-consumer-perf-test.sh --topic spark \
      --broker-list bigdata01:9092,bigdata02:9092,bigdata03:9092 \
      --messages 100000 ---->總共要讀取多少條記錄
                 
      讀取到的結果
      start.time=2019-08-06 02:31:23:738	--->開始時間
      end.time=2019-08-06 02:31:24:735	--->結束時間
      data.consumed.in.MB=0.9534			--->總共消費的資料體積
      MB.sec=0.9562						--->每秒鐘消費資料體積
      data.consumed.in.nMsg=100000		--->總共消費的資料記錄數
      nMsg.sec=100300.9027				--->每秒鐘消費記錄數
      rebalance.time.ms=47				--->進行rebalance的時間
      fetch.time.ms=950					--->抓取這10w條資料總共花費多長時間
      fetch.MB.sec=1.0035					--->每秒鐘抓取資料體積
      fetch.nMsg.sec=105263.1579			--->每秒鐘抓取資料記錄數