天天看點

kafka原理剖析及實戰演練

一、消息系統概述

  一)消息系統按消息發送模型分類

  1、peer-to-peer(單點傳播)

  

kafka原理剖析及實戰演練
    特點:
  • 一般基于pull或polling接收消息
  • 發送對隊列中的消息被一個而且僅僅一個接收者所接收,即使有多個接收者在同一隊列中偵聽同一消息
  • 即支援異步“即發即棄”的消息傳送方式,也支援同步請求/應答傳送方式

  2、釋出/訂閱(支援單點傳播和多點傳播)

  

kafka原理剖析及實戰演練
    特點:
  • 釋出到一個主題的消息,可被多個訂閱者所接收
  • 釋出/訂閱可基于push消費資料,也可基于pull或者polling消費資料
  • 解耦能力比P2P模型更強

  二)消息系統使用場景

  • 解耦各位系統之間通過消息系統這個統一的接口交換資料,無須了解彼此的存在
  • 備援部分消息系統具有消息持久化能力,可規避消息處理前丢失的風險
  • 擴充消息系統是統一的資料接口,各系統可獨立擴充
  • 峰值處理能力消息系統可頂住峰值流量,業務系統可根據處理能力從消息系統中擷取并處理對應量的請求
  • 可恢複性系統中部分元件失效并不會影響整個系統,它恢複後仍然可從消息系統中擷取并處理資料
  • 異步通信在不需要立即處理請求的場景下,可以将請求放入消息系統,合适的時候再處理

  三)常用的消息系統對比

  1、RabbitMQ

同時支援Peer-to-Peer和釋出/訂閱模式

  使用場景:比較重量級,企業開發中。

  2、Redis

  基于Key-Value對的NoSQL資料庫,同時支援MQ功能,可做輕量級隊列服務使用。就入隊操作而言,Redis對短消息(小于10KB)的性能比RabbitMQ好,長消息的性能比RabbitMQ差。

  3、ZeroMQ

它實質上是一個庫,需要開發人員自己組合多種技術,使用複雜度高。

  特點:不支援資料的持久化,很難做到異步發送,做到的是點對點異步緩存。

  4、ActiveMQ

  JMS實作,Peer-to-Peer,支援持久化、 XA事務

  5、Kafka/Jafka

  高性能跨語言的分布式釋出/訂閱消息系統,資料持久化,全分布式,同時支援線上和離線處理

  7、MetaQ/RocketMQ

  純Java實作,釋出/訂閱消息系統,支援本地事務和XA分布式事務

二、kafka

​​  http://www.jasongj.com/tags/Kafka/​​

  一)kafka概述

  1、kafka簡介  

  Apache Kafka是分布式釋出-訂閱消息系統,在 kafka官網上對 kafka 的定義:一個分布式釋出-訂閱消息傳遞系統。 它最初由LinkedIn公司開發,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。Kafka是一種快速、可擴充的、設計内在就是分布式的,分區的和可複制的送出日志服務。

  2、kafka的設計目标

  • 高吞吐量、低延遲:在廉價的商用機器上單機可支援每秒100萬條消息的讀寫
  • 消息持久化、可靠性:所有消息均被持久化到磁盤,無消息丢失,支援消息重放
  • 可擴充性,完全分布式:producer、broker、consumer均支援水準擴充
  • 高并發:支援數千個用戶端同時讀寫
  • 容錯性:允許叢集中節點失敗(若副本數量為n,則允許n-1個節點失敗)
  • 同時滿足适應線上流處理和離線批處理

  3、kafka2.0.0版本新增改變了哪些功能詳細

  kafka2.0.0版本新增改變了哪些功能詳細:​​http://www.aboutyun.com/forum.php?mod=viewthread&tid=24981​​

  4、kafka的适應場景

  • 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
  • 消息系統:解耦和生産者和消費者、緩存消息等。
  • 使用者活動跟蹤:Kafka經常被用來記錄web使用者或者app使用者的各種活動,如浏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、資料倉庫中做離線分析和挖掘。
  • 營運名額:Kafka也經常用來記錄營運監控資料。包括收集各種分布式應用的資料,生産各種操作的集中回報,比如報警和報告。
  • 流式處理:比如spark streaming和storm
  • 事件源

  5、kafka最詳細原理總結

  ​​http://www.itkeyword.com/doc/3033455819328241799/kafka-apache-scala​​

  二)kafka架構

  1、kafka架構圖

  

kafka原理剖析及實戰演練

  2、kafka架構元件

  • 話題(Topic):是特定類型的消息流。消息是位元組的有效負載(Payload),話題是消息的分類名或種子(Feed)名;
  • 生産者(Producer):是能夠釋出消息到話題的任何對象;
  • 服務代理(Broker):已釋出的消息儲存在一組伺服器中,它們被稱為代理(Broker)或Kafka叢集;
  • 消費者(Consumer):可以訂閱一個或多個話題,并從Broker拉資料,進而消費這些已釋出的消息;
    1、topic

  

kafka原理剖析及實戰演練
  • 邏輯概念:同一個topic的消息可分布在一個或多個節點(broker)上
  • 一個topic包含一個或者多個partition(partition均勻分布在叢集中)
  • 每條消息都屬于且僅屬于一個topic
  • producer釋出資料時,必須指定将改消息釋出到哪一個topic
  • consumer訂閱消息時,也必須指定訂閱那個topic的消息
    2、partition

  

kafka原理剖析及實戰演練
  • 實體概念:一個partition隻分布在一個broker上(不考慮備份的情況)
  • 一個partition實體上對應一個檔案夾
  • 一個partition包含多個segment(線段、部分)
  • 一個segment對應一個檔案
  • segment由一個個不可變記錄組成
  • 記錄隻會被append到segment中,不會被單獨删除或者修改
  • 清除過期日志時,直接删除一個或多個segment

  kafka的最小實體機關是partition,是以offset是記錄在partition中的(segment index中),那麼partition是跨機器的,offset的是partiton内管理的。

  kafka 提供兩種配置設定政策 range和roundrobin,由參數partition.assignment.strategy指定,預設是range政策。本文隻讨論range政策。所謂的range其實就是按照階段平均配置設定。

  3、sync(同步) producer和async(異步) producer

    1、sync producer特點
  • 低延遲
  • 低吞吐率
  • 無資料丢失
    2、async producer特點
  • 高延遲
  • 高吞吐率
  • 可能會有資料丢失

  4、consumer和partition

  kafka的配置要點:https://yq.aliyun.com/ziliao/417900

  更多内容見:http://www.open-open.com/lib/view/open1434551761926.html

  1. 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許并發的,是以consumer數不要大于partition數
  2. 如果consumer比partition少,一個consumer會對應于多個partitions,這裡主要合理配置設定consumer數和partition數,否則會導緻partition裡面的資料被取的不均勻
  3. 如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka隻保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同
  4. 增減consumer,broker,partition會導緻rebalance,是以rebalance後consumer對應的partition會發生變化
  5. High-level接口中擷取不到資料的時候是會block的

  三)單點版kafka的安裝使用

   1、kafka部署方式

  • 在虛拟機上部署kafka
  • 使用kafka帶的zookeeper起kafka:适用單獨部署kafka
  • 使用單獨的zookeeper起kafka:公司架構已有zookeeper
  • 使用kafka帶的zookeeper起kafka:适用單獨部署kafka
  • 使用單獨的zookeeper起kafka:公司架構已有zookeeper

   2、下載下傳kafka

  kafka下載下傳最新版位址:​​http://kafka.apache.org/downloads​​

  kafka下載下傳指定版本:​​http://archive.apache.org/dist/kafka/​​

  zookeeper下載下傳位址:​​https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/​​

  下載下傳二進制包

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz      

  3、kafka的目錄結構

kafka_2.11-2.0.0
├── bin
├── config
├── libs
├── LICENSE
├── NOTICE
└── site-docs

4 directories, 2      

  kafka配置檔案

############################# Server Basics
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true

############################# Socket Server Settings
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://kafka01.test.com:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://kafka01.test.com:9092

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics

# A comma seperated list of directories under which to store log files
log.dirs=/opt/ytd_data01/kafka

num.partitions=5
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy 
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=24
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

############################# Zookeeper
# root directory for all kafka znodes.
zookeeper.connect=zk01.test.com:2181,zk02.test.com:2181,zk03.test.com:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings 
group.initial.rebalance.delay.ms=0      

  4、四種部署方式

    1、在虛拟上部署

  使用kafka帶的zookeeper起kafka

cd /opt/ytd_soft
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar xvf kafka_2.11-2.0.0.tgz
cd kafka_2.11-2.0.0
#zookeeper預設是前台啟動,讓其背景啟動使用nohup command &&
#前台啟動
#./bin/zookeeper-server-start.sh config/zookeeper.properties
#背景啟動
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &&
ss -lutnp |grep 2181

#啟動kafka,預設非daemon啟動,-daemon啟動
./bin/kafka-server-start.sh -daemon config/server.properties
ss -lutnp|grep 9092      

  測試kafka是否可用

#建立topic
# cd /opt/ytd_soft/kafka_2.11-2.0.0
# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 3 --replication-factor 1
Created topic "test1".
#檢視topic詳情
# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1
Topic:test1    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: test1    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: test1    Partition: 2    Leader: 0    Replicas: 0    Isr: 0

#模拟消費者consumer(kafka之前的版本參數是:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1  --from-beginning

#另外一個視窗,模拟生産者producer
# cd /opt/ytd_soft/kafka_2.11-2.0.0
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
>11
>111
>222      

  使用單獨的zookeeper起kafka

  安裝啟動zookeeper

cd /opt/ytd_soft
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
tar xvf zookeeper-3.4.12.tar.gz 
cd zookeeper-3.4.12/
bin/zkServer.sh start conf/zoo_sample.cfg 
bin/zkServer.sh status conf/zoo_sample.cfg 
ss -lutnp|grep 2181      

  部署kafka,更改kafka配置檔案server.properties(若zookeeper是叢集或zookeeper不在本機上必須更改),其他步驟和上面的一樣

zookeeper.connect=localhost:2181      
    2、使用docker部署(自己建立鏡像)

  使用kafka帶的zookeeper起kafka

  使用kafka帶的zookeeper起kafka

   部署zookeeper

FROM centos:6.9

RUN cp -rp /etc/yum.repos.d/CentOS-Base.repo{,.bak} && curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo

RUN yum -y install vim lsof wget tar bzip2 unzip vim-enhanced passwd sudo yum-utils hostname net-tools rsync man git make automake cmake patch logrotate python-devel libpng-devel libjpeg-devel pwgen python-pip

RUN mkdir /opt/java &&\
    wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u191-b12/2787e4a523244c269598db4e85c51e0c/jdk-8u191-linux-x64.tar.gz -P /opt/java

RUN tar zxvf /opt/java/jdk-8u191-linux-x64.tar.gz -C /opt/java &&\
    JAVA_HOME=/opt/java/jdk1.8.0_191 &&\
    sed -i "/^PATH/i export JAVA_HOME=$JAVA_HOME" /root/.bash_profile &&\
    sed -i "s%^PATH.*$%&:$JAVA_HOME/bin%g" /root/.bash_profile &&\
    source /root/.bash_profile

ENV ZOOKEEPER_VERSION "3.4.12"

RUN mkdir /opt/zookeeper &&\
    wget http://mirror.olnevhost.net/pub/apache/zookeeper/zookeeper-$ZOOKEEPER_VERSION/zookeeper-$ZOOKEEPER_VERSION.tar.gz -P /opt/zookeeper

RUN tar zxvf /opt/zookeeper/zookeeper*.tar.gz -C /opt/zookeeper

RUN echo "source /root/.bash_profile" > /opt/zookeeper/start.sh &&\
    echo "cp /opt/zookeeper/zookeeper-"$ZOOKEEPER_VERSION"/conf/zoo_sample.cfg /opt/zookeeper/zookeeper-"$ZOOKEEPER_VERSION"/conf/zoo.cfg" >> /opt/zookeeper/start.sh &&\
    echo "/opt/zookeeper/zookeeper-$"ZOOKEEPER_VERSION"/bin/zkServer.sh start-foreground" >> /opt/zookeeper/start.sh

EXPOSE 2181

ENTRYPOINT ["sh", "/opt/zookeeper/start.sh"]      

jdk和zookeeper建立鏡像時下載下傳

   jdk和zookeeper安裝包提前下載下傳

FROM centos:6.9

RUN cp -rp /etc/yum.repos.d/CentOS-Base.repo{,.bak} && curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo

RUN yum -y install vim lsof wget tar bzip2 unzip vim-enhanced passwd sudo yum-utils hostname net-tools rsync man git make automake cmake patch logrotate python-devel libpng-devel libjpeg-devel pwgen python-pip

RUN mkdir /opt/java
    
ADD jdk-8u191-linux-x64.tar.gz /opt/java

RUN JAVA_HOME=/opt/java/jdk1.8.0_191 &&\
    sed -i "/^PATH/i export JAVA_HOME=$JAVA_HOME" /root/.bash_profile &&\
    sed -i "s%^PATH.*$%&:$JAVA_HOME/bin%g" /root/.bash_profile &&\
    source /root/.bash_profile

ENV ZOOKEEPER_VERSION "3.4.12"

RUN mkdir /opt/zookeeper

ADD zookeeper-"$ZOOKEEPER_VERSION".tar.gz /opt/zookeeper

RUN echo "source /root/.bash_profile" > /opt/zookeeper/start.sh &&\
    echo "cp /opt/zookeeper/zookeeper-"$ZOOKEEPER_VERSION"/conf/zoo_sample.cfg /opt/zookeeper/zookeeper-"$ZOOKEEPER_VERSION"/conf/zoo.cfg" >> /opt/zookeeper/start.sh &&\
    echo "/opt/zookeeper/zookeeper-$"ZOOKEEPER_VERSION"/bin/zkServer.sh start-foreground" >> /opt/zookeeper/start.sh

EXPOSE 2181

ENTRYPOINT ["sh", "/opt/zookeeper/start.sh"]      

  部署kafka(jdk和kafka安裝包提前下載下傳)

FROM centos:6.9

ENV KAFKA_VERSION "2.11-2.0.0"

RUN cp -rp /etc/yum.repos.d/CentOS-Base.repo{,.bak} && curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo

RUN yum -y install vim lsof wget tar bzip2 unzip vim-enhanced passwd sudo yum-utils hostname net-tools rsync man git make automake cmake patch logrotate python-devel libpng-devel libjpeg-devel pwgen python-pip

RUN mkdir /opt/java

ADD jdk-8u191-linux-x64.tar.gz /opt/java

RUN JAVA_HOME=/opt/java/jdk1.8.0_191 &&\
    sed -i "/^PATH/i export JAVA_HOME=$JAVA_HOME" /root/.bash_profile &&\
    sed -i "s%^PATH.*$%&:$JAVA_HOME/bin%g" /root/.bash_profile &&\
    source /root/.bash_profile

RUN mkdir /opt/kafka

ADD kafka_$KAFKA_VERSION.tgz /opt/kafka 

RUN sed -i 's/num.partitions.*$/num.partitions=3/g' /opt/kafka/kafka_$KAFKA_VERSION/config/server.properties

RUN echo "source /root/.bash_profile" > /opt/kafka/start.sh &&\
    echo "cd /opt/kafka/kafka_"$KAFKA_VERSION >> /opt/kafka/start.sh &&\
    echo "sed -i 's%zookeeper.connect=.*$%zookeeper.connect=zookeeper:2181%g'  /opt/kafka/kafka_"$KAFKA_VERSION"/config/server.properties" >> /opt/kafka/start.sh &&\
    echo "bin/kafka-server-start.sh config/server.properties" >> /opt/kafka/start.sh &&\
    chmod a+x /opt/kafka/start.sh

EXPOSE 9092

ENTRYPOINT ["sh", "/opt/kafka/start.sh"]      

  建立鏡像

cd /opt/kafka/dockerfile
docker build -t zookeeper:v1.1 -f zookeeper.dockerfile .
docker build -t kafka:v1.0      

  啟動kafka和zookeeper容器,先啟動zookeeper

docker images|egrep "zookeeper|kafka"

docker run -itd  --name  zookeeper -h zookeeper -p 2181:2181 zookeeper:v1.1 /bin/bash
#--link 關聯容器
docker run -itd --name kafka -h kafka -p 9092:9092 --link zookeeper kafka:v1.0 /bin/bash

ss -lutnp|egrep "2181|9092"      

  測試kafka的可用性

docker exec -it kafka /bin/bash
source /root/.bash_profile

#建立topic
cd /opt/kafka/kafka_2.11-2.0.0
bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test1 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test2 --partitions 3 --replication-factor 1

#檢視topic詳情
bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test2

bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test1

#起consumer(注意版本差異,有可能參數不同)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1  --from-beginning

#單開一個會話,起producer(輸入測試資料,看consumer是否全部接收)
docker exec -it kafka /bin/bash
source /root/.bash_profile
bin/kafka-console-producer.sh --broker-list localhost:9092      

  四)kafka 高可用

  參考:​​http://www.jasongj.com/2015/04/24/KafkaColumn2/#ACK%E5%89%8D%E9%9C%80%E8%A6%81%E4%BF%9D%E8%AF%81%E6%9C%89%E5%A4%9A%E5%B0%91%E4%B8%AA%E5%A4%87%E4%BB%BD​​

  1、CAP理論(原則或定理)

    1、簡述CAP理論

  CAP原則又稱CAP定理,指的是在一個分布式系統中, Consistency(一緻性)、 Availability(可用性)、Partition tolerance(分區容錯性),三者不可得兼。

  CAP原則是NOSQL資料庫的基石。Consistency(一緻性)。 Availability(可用性)。Partition tolerance(分區容錯性)。

  分布式系統的CAP理論:理論首先把分布式系統中的三個特性進行了如下歸納:

  • 一緻性(C):在分布式系統中的所有資料備份,在同一時刻是否同樣的值。(等同于所有節點通路同一份最新的資料副本)
  • 通過某個節點的寫操作結果對後面通過其他節點的讀操作可見
  • 若更新資料後,并發通路情況下可立即感覺該更新,稱為強制一緻性
  • 若允許之後部分或者全部感覺不到該更新,稱為弱一緻性
  • 若之後的一段時間(通常該事件不固定)後,一定可以感覺該更新,稱為最終一緻性
  • 可用性(A):在叢集中一部分節點故障後,叢集整體是否還能響應用戶端的讀寫請求。(對資料更新具備高可用性)
  • 任何一個個沒有發生故障的節點,必須在有限的時間内傳回合理的結果
  • 分區容忍性(P):以實際效果而言,分區相當于對通信的時限要求。系統如果不能在時限内達成資料一緻性,就意味着發生了分區的情況,必須就目前操作在C和A之間做出選擇。
  • 部分節點當機或者無法與節點通信時,各分區間還可保持分布式系統的功能
    2、CAP理論特征

  CAP理論:分布式系統中,一緻性、可用性、分區容忍性最多隻可同時滿足兩個

  一般分區容忍性都要求有保障,是以很多時候在可用性與一緻性之間做權衡。

  

kafka原理剖析及實戰演練

   2、一緻性方案

     1、master-slave
  • RDBMS的讀寫分離是典型的master-slave方案
  • 同步複制可保證一緻性,但會影響可用性
  • 異步複制可提高可用性,但會降低一緻性
    2、WNR
  • 主要用于去中心話(P2P)的分布式系統中。dynamoDB和Cassandra是采用此方案
  • N代表副本數,W代表每次寫操作要保證的最少寫成功的副本數,R代表每次讀至少讀取的副本數
  • 當W+R>N時,可保證每次讀取的資料至少有一個副本具有最新的更新(大于)
  • 多個寫操作的順序難以保證,可能導緻多副本間的寫操作書序不一緻,dynamo通過向量适中保證最終一緻性
    3、paxos及其變種一緻性算法(更多使用的是其變種)
  • Google的chubby,zookeeper的Zab,RAFT

  3、replica(複制使用pull)

  • 當某個topic的replication-factor為N且N大于1時,每個partion都會有N個副本(replica)
  • replica的個數小于等于broker數:對每個partition而言每個broker上隻會有一個replica,是以broker ID表示replica
  • 所有partition的所有replica預設情況會均勻分布到所有broker上

  

kafka原理剖析及實戰演練

  4、何時commit——ISR

  如何處理replica恢複?

  

kafka原理剖析及實戰演練

  五)docker叢集版kafka的安裝使用

  1、建立zookeeper和kafka鏡像

  實驗環境是MAC的注意:docker在MAC電腦實作,它不支援MAC通過hosts綁定或IP直接通路kafka叢集的

    解決(前提配置檔案中寫的hostname):1、hosts綁定改成:127.0.0.1 kafka01

       2、連接配接程式連接配接位址寫成:localhost:9092

  六)kafka如何使用zookeeper

  1、配置管理

  2、leader election

  3、服務發現

  七)kafka高性能之道

  1、高效使用磁盤

  • 順序寫磁盤 順序寫磁盤性能高于随機寫記憶體
  • Append Only 資料不更新,無記錄級的資料删除(隻會整個segment删除)
  • 充分利用Page Cache
  • I/O Scheduler将連續的小塊寫組裝成大塊的實體寫進而提高性能
  • I/O Scheduler會嘗試将一些寫操作重新按順序排好,進而減少磁盤頭的移動時間
  • 充分利用所有空閑記憶體(非JVM記憶體)
  • 應用層cache也會有對應的page cache與之對應,直接使用page cache可增大可用cache
  • 如使用heap内的cache,會增加GC負擔
  • 讀操作可直接在page cache内進行。如果程序重新開機,JVM内的cache會失效,但page cache仍然可用
  • 可通過如下參數強制flush,但不建議
  • log.flush.interval.messages=10000
  • log.flush.interval.ms=1000
  • 支援多directory(多使用多drive)

  2、零拷貝

    1、傳統模式下的拷貝

    傳統模式下的拷貝:資料從檔案傳輸到網絡需要4次資料拷貝,4次上下文切換(使用者态和核心态)和2次系統調用

File.read(fileDesc, buf, len); 
Socket.send(socket, buf, len);      

  

kafka原理剖析及實戰演練
    2、零拷貝

  通過NIO的transferTo/transferFrom調用作業系統的sendfile實作零拷貝。總共發生2次核心資料拷貝(沒有CPU參與(或沒有使用者态的)的拷貝),2次上下文切換和1次系統調用,消除了CPU資料拷貝 

kafka原理剖析及實戰演練
kafka原理剖析及實戰演練
public void transferTo(long position, long      

僞代碼  

kafka原理剖析及實戰演練

  

kafka原理剖析及實戰演練

  3、批處理和壓縮

  • Producer和Consumer均支援批量處理資料,進而減少了網絡傳輸的開銷 (少次多量)
  • Producer可将資料壓縮後發送給broker,進而減少網絡傳輸代價。目前支援Snappy, Gzip和LZ4壓縮

  4、partition(可以設定成broker數量一緻)

  • 通過Partition實作了并行處理和水準擴充
  • Partition是Kafka(包括Kafka Stream)并行處理的最小機關
  • 不同Partition可處于不同的Broker(節點),充分利用多機資源
  • 同一Broker(節點)上的不同Partition可置于不同的Directory,如果節點上有多個Disk Drive,可将不同的Drive對應不同的Directory,進而使Kafka充分利用多Disk Drive的磁盤優勢

  5、ISR

   ISR(In-Sync Replicas)

  對每個消息都做f+1的備份:以單個消息為進行備份的基本機關,進行可靠性保障

  ISR最核心的思想:以一段時間而非以一個消息為基本機關,進行可靠性保障

ISR實作了可用性和一緻性的動态平衡 (會話失效後10秒删除節點)
replica.lag.time.max.ms=10000      
ISR可容忍更多的節點失敗 
  • Majority Quorum如果要容忍f個節點失敗,則至少需要2f+1個節點
  • ISR如果要容忍f個節點失敗,至少需要f+1個節點
如何處理Replica Crash 
  • Leader crash後,ISR中的任何replica皆可競選成為Leader
  • 如果所有replica都crash,可選擇讓第一個recover的replica或者第一個在ISR中的replica成為leader
  • unclean.leader.election.enable=true

  八)kafka資料遷移

  kafka叢集中資料自動遷移:​​http://blog.chinaunix.net/uid-30242191-id-5780549.html​​