天天看點

Kafka 菜鳥學習筆記kafka:五大API:前三個常用Consumer用戶端操作offset:Stream API:Connect APIKafka叢集Kafka叢集監控

kafka:

  • 叢集模式,即便隻有一個節點,也是叢集
  • 基于zookeeper的分布式消息系統,分布式流平台,并不單純是個消息隊列
  • 具有高吞吐率、高性能、實時及高可靠等特點

基本概念:

  •     broker:    一個獨立的kafka伺服器,接受來自生産者的消息
  •     brkoer叢集:若幹個broker組合起來的叢集
  •     Topic:        主題。一個虛拟的概念,代表消息的類型,一個主題可以有多個分區,分區存放在不同的broken上
  •     Partition:    分區。實際消息存儲機關
  •     Producer:    消息生産者
  •     Consumer:    消息消費者

五大API:前三個常用

  1.   Producers
  2.   Consumers
  3.   Stream Processors
  4.   Connectors
  5.   Admin

AdminClient API:

  •     AdmminClient:建立AdminClient用戶端對象
  •     NewTopic:建立Topic
  •     CreateTopicsResult:建立Topic的傳回結果
  •     ListTopicsResult:查詢Topic清單
  •     ListTopicsOptions:查詢Topic清單及選項
  •     DescribeTopicsResult:查詢Topics
  •     DescribeConfigsResult:查詢Topics配置項

Prodecers API:

發送模式:

同步發送、異步發送、異步回調發送

KafkaProducer:

  1. MetricConfig
  2. 加載負載均衡器
  3. 初始化Serializer
  4. 初始化RecordAccumulator,類似于計數器
  5. 啟動newSender,守護線程

源碼通知:

  1. Producer是線程安全的
  2. Producer并不是接到一條發一條
  3. Producer是批量發送,減少IO操作(一次寫入大量資料),日志檔案追加

producer.send(record):

  1. 計算分區:消息具體進入哪一個partition
  2. 計算批次:accumulator.append(),往待發送批次裡增加記錄
  3. 主要内容:
    1. 建立批次
    2. 向批次中追加記錄

Producer發送原了解析:

  1. 直接發送:kafka的producer會直接把消息發送到分區leader的主機上,一般不會涉及到其他幹預
  2. 負載均衡:沒有自定義則預設,資料是可以被控制在哪個分區上的,由用戶端決定。預設時為僞随機發送
  3. 異步發送:本身是Future對象(可以不擷取),可以批量發送減少單次IO,增大吞吐量

消息傳遞保障:

依賴于Producer和Consumer共同實作,主要依賴于Producer

        producer需要server接收到資料之後發出的确認接收的信号,此項配置就是指producer需要多少個這樣的确認信号。代表了資料備份的可用性

        1.最多一次:收到0到1次保障(速度最快)

        2.至少一次:收到1到多次(其次)

        3.正好一次:有且僅有一次

Consumer用戶端操作

配置配置檔案,然後設定訂閱哪一個Topic或者幾個,用循環批量拉取

在拉取時,可以設定offset自增,這樣是最簡單的使用方式,但處理資料失敗後無法復原

        通過consumer.commitAsync()可以手動更新一個批次

注意事項:

  1. 單個分區的消息隻能由ConsumerGroup中某個Consumer消費    即:一個分區的消息隻能給一個Consumer,但一個Consumer可以從多個分區中拉取消息
  2. Consumer從分區中消費消息是順序的,預設從頭開始消費
  3. 單個ConsumerGroup會消費分區中的消息

最優:一個分區由一個Consumer去消費,充分利用資源性能

        consumer.assign()制定訂閱的分區

多線程情況:

        并不是線程安全的,需要自行解決

        經典模式:(新手建議)

            簡單來說,線程類自帶consumer屬性,也就是每一個線程對象都有consumer對象,這樣便是線程安全的

            但每個線程都需要一個consumer對象,建立和銷毀都比較占資源

        分發模式:(适合流式資料)

            由一個consumer拉取消息,然後将資料分發給不同的線程

            快速處理資料,但是因為無法監聽線程回饋導緻業務無法復原

offset:

手動控制offset,當出現程式錯誤時,可重複消費一次

        consumer.seek()

  1. 第一次從0開始消費(一般情況)
  2. 比如一次消費了100條嗎,則offet設定為101并且存入Redis
  3. 每次拉取之前從redis中擷取最新的offset位置
  4. 每次從這個位置開始消費

Stream API:

基本概念:

  1. 處理分析存儲在Kafka資料的用戶端程式庫
  2. Stream通過state store可以實作高效狀态操作
  3. 支援原語Processor和高層抽象DSL
  •         流及流處理器: 資料流,對資料進行處理的節點
  •         流處理拓撲:流的走向,流程圖
  •         源處理器和Sink處理器:資料的源頭,來源      資料的終點,出口

通過input主題和output主題來實作資料源和出

//建立流
Properties pros = .....
StreamsBuilder sB =....
KafkaStreams streams =  new KafkaStreams(sB.build(),props)
streams.start();
           

Connect API

Connect是Kafka流式計算的一部分

    主要用來與其他中間件建立流式通道

    支援流式和批量處理內建

Kafka叢集

Kafka天然支援叢集

依賴于Zookeeper進行協調

通過brokerId區分不同節點

Kafka副本集:

将日志複制多份

可以為每個Topic設定副本集

可以通過配置設定預設副本集數量

Kafka核心概念

  • Broker:Kafka的部署節點
  • Leader:用于處理消息的接受和消費等請求
  • Follower:主要用于備份消息資料

節點故障

  • Kafka與Zookeeper心跳未保持視為節點故障
  • follower消息落後leader太多也視為節點故障
  • Kafka會對故障節點進行移除

故障處理方式

  • 基本不會因為節點故障而丢失資料
  • 語義擔保很大程度避免資料丢失
  • 會對消息進行叢集内平衡,減少消息在某些節點熱度過高,即不要放一個籃子上

Leader選舉

  • 并沒有采用多數投票來選舉leader
  • 會動态維護一組Leader資料的副本(ISR)
  • 在ISR中選擇一個速度比較快的設為Leader

Kafka有一種無奈的情況,ISR中副本全部當機,對于這種情況,會進行unclean leader(髒選舉)

1.死等,等到有一個恢複正常

2.在ISR以外的節點使用,確定快速恢複

Leader選舉配置建議:.

  • 禁用“unclean leader”髒選舉
  • 手動指定最小ISR

Kafka叢集監控