一、消息系統概述
一)消息系統按消息發送模型分類
1、peer-to-peer(單點傳播)
特點:
- 一般基于pull或polling接收消息
- 發送對隊列中的消息被一個而且僅僅一個接收者所接收,即使有多個接收者在同一隊列中偵聽同一消息
- 即支援異步“即發即棄”的消息傳送方式,也支援同步請求/應答傳送方式
2、釋出/訂閱(支援單點傳播和多點傳播)
特點:
- 釋出到一個主題的消息,可被多個訂閱者所接收
- 釋出/訂閱可基于push消費資料,也可基于pull或者polling消費資料
- 解耦能力比P2P模型更強
二)消息系統使用場景
- 解耦各位系統之間通過消息系統這個統一的接口交換資料,無須了解彼此的存在
- 備援部分消息系統具有消息持久化能力,可規避消息處理前丢失的風險
- 擴充消息系統是統一的資料接口,各系統可獨立擴充
- 峰值處理能力消息系統可頂住峰值流量,業務系統可根據處理能力從消息系統中擷取并處理對應量的請求
- 可恢複性系統中部分元件失效并不會影響整個系統,它恢複後仍然可從消息系統中擷取并處理資料
- 異步通信在不需要立即處理請求的場景下,可以将請求放入消息系統,合适的時候再處理
三)常用的消息系統對比
1、RabbitMQ
同時支援Peer-to-Peer和釋出/訂閱模式
使用場景:比較重量級,企業開發中。
2、Redis
基于Key-Value對的NoSQL資料庫,同時支援MQ功能,可做輕量級隊列服務使用。就入隊操作而言,Redis對短消息(小于10KB)的性能比RabbitMQ好,長消息的性能比RabbitMQ差。
3、ZeroMQ
它實質上是一個庫,需要開發人員自己組合多種技術,使用複雜度高。
特點:不支援資料的持久化,很難做到異步發送,做到的是點對點異步緩存。
4、ActiveMQ
JMS實作,Peer-to-Peer,支援持久化、 XA事務
5、Kafka/Jafka
高性能跨語言的分布式釋出/訂閱消息系統,資料持久化,全分布式,同時支援線上和離線處理
7、MetaQ/RocketMQ
純Java實作,釋出/訂閱消息系統,支援本地事務和XA分布式事務
二、kafka
http://www.jasongj.com/tags/Kafka/
一)kafka概述
1、kafka簡介
Apache Kafka是分布式釋出-訂閱消息系統,在 kafka官網上對 kafka 的定義:一個分布式釋出-訂閱消息傳遞系統。 它最初由LinkedIn公司開發,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。Kafka是一種快速、可擴充的、設計内在就是分布式的,分區的和可複制的送出日志服務。
2、kafka的設計目标
- 高吞吐量、低延遲:在廉價的商用機器上單機可支援每秒100萬條消息的讀寫
- 消息持久化、可靠性:所有消息均被持久化到磁盤,無消息丢失,支援消息重放
- 可擴充性,完全分布式:producer、broker、consumer均支援水準擴充
- 高并發:支援數千個用戶端同時讀寫
- 容錯性:允許叢集中節點失敗(若副本數量為n,則允許n-1個節點失敗)
- 同時滿足适應線上流處理和離線批處理
3、kafka2.0.0版本新增改變了哪些功能詳細
kafka2.0.0版本新增改變了哪些功能詳細:http://www.aboutyun.com/forum.php?mod=viewthread&tid=24981
4、kafka的适應場景
- 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 消息系統:解耦和生産者和消費者、緩存消息等。
- 使用者活動跟蹤:Kafka經常被用來記錄web使用者或者app使用者的各種活動,如浏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、資料倉庫中做離線分析和挖掘。
- 營運名額:Kafka也經常用來記錄營運監控資料。包括收集各種分布式應用的資料,生産各種操作的集中回報,比如報警和報告。
- 流式處理:比如spark streaming和storm
- 事件源
5、kafka最詳細原理總結
http://www.itkeyword.com/doc/3033455819328241799/kafka-apache-scala
二)kafka架構
1、kafka架構圖
2、kafka架構元件
- 話題(Topic):是特定類型的消息流。消息是位元組的有效負載(Payload),話題是消息的分類名或種子(Feed)名;
- 生産者(Producer):是能夠釋出消息到話題的任何對象;
- 服務代理(Broker):已釋出的消息儲存在一組伺服器中,它們被稱為代理(Broker)或Kafka叢集;
- 消費者(Consumer):可以訂閱一個或多個話題,并從Broker拉資料,進而消費這些已釋出的消息;
1、topic
- 邏輯概念:同一個topic的消息可分布在一個或多個節點(broker)上
- 一個topic包含一個或者多個partition(partition均勻分布在叢集中)
- 每條消息都屬于且僅屬于一個topic
- producer釋出資料時,必須指定将改消息釋出到哪一個topic
- consumer訂閱消息時,也必須指定訂閱那個topic的消息
2、partition
- 實體概念:一個partition隻分布在一個broker上(不考慮備份的情況)
- 一個partition實體上對應一個檔案夾
- 一個partition包含多個segment(線段、部分)
- 一個segment對應一個檔案
- segment由一個個不可變記錄組成
- 記錄隻會被append到segment中,不會被單獨删除或者修改
- 清除過期日志時,直接删除一個或多個segment
kafka的最小實體機關是partition,是以offset是記錄在partition中的(segment index中),那麼partition是跨機器的,offset的是partiton内管理的。
kafka 提供兩種配置設定政策 range和roundrobin,由參數partition.assignment.strategy指定,預設是range政策。本文隻讨論range政策。所謂的range其實就是按照階段平均配置設定。
3、sync(同步) producer和async(異步) producer
1、sync producer特點
- 低延遲
- 低吞吐率
- 無資料丢失
2、async producer特點
- 高延遲
- 高吞吐率
- 可能會有資料丢失
4、consumer和partition
kafka的配置要點:https://yq.aliyun.com/ziliao/417900
更多内容見:http://www.open-open.com/lib/view/open1434551761926.html
- 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許并發的,是以consumer數不要大于partition數
- 如果consumer比partition少,一個consumer會對應于多個partitions,這裡主要合理配置設定consumer數和partition數,否則會導緻partition裡面的資料被取的不均勻
- 如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka隻保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同
- 增減consumer,broker,partition會導緻rebalance,是以rebalance後consumer對應的partition會發生變化
- High-level接口中擷取不到資料的時候是會block的
三)單點版kafka的安裝使用
1、kafka部署方式
- 在虛拟機上部署kafka
- 使用kafka帶的zookeeper起kafka:适用單獨部署kafka
- 使用單獨的zookeeper起kafka:公司架構已有zookeeper
- 使用kafka帶的zookeeper起kafka:适用單獨部署kafka
- 使用單獨的zookeeper起kafka:公司架構已有zookeeper
2、下載下傳kafka
kafka下載下傳最新版位址:http://kafka.apache.org/downloads
kafka下載下傳指定版本:http://archive.apache.org/dist/kafka/
zookeeper下載下傳位址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/
下載下傳二進制包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
3、kafka的目錄結構
kafka_2.11-2.0.0
├── bin
├── config
├── libs
├── LICENSE
├── NOTICE
└── site-docs
4 directories, 2
kafka配置檔案
############################# Server Basics
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true
############################# Socket Server Settings
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://kafka01.test.com:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://kafka01.test.com:9092
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics
# A comma seperated list of directories under which to store log files
log.dirs=/opt/ytd_data01/kafka
num.partitions=5
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=24
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper
# root directory for all kafka znodes.
zookeeper.connect=zk01.test.com:2181,zk02.test.com:2181,zk03.test.com:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings
group.initial.rebalance.delay.ms=0
4、四種部署方式
1、在虛拟上部署
使用kafka帶的zookeeper起kafka
cd /opt/ytd_soft
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar xvf kafka_2.11-2.0.0.tgz
cd kafka_2.11-2.0.0
#zookeeper預設是前台啟動,讓其背景啟動使用nohup command &&
#前台啟動
#./bin/zookeeper-server-start.sh config/zookeeper.properties
#背景啟動
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &&
ss -lutnp |grep 2181
#啟動kafka,預設非daemon啟動,-daemon啟動
./bin/kafka-server-start.sh -daemon config/server.properties
ss -lutnp|grep 9092
測試kafka是否可用
#建立topic
# cd /opt/ytd_soft/kafka_2.11-2.0.0
# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 3 --replication-factor 1
Created topic "test1".
#檢視topic詳情
# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1
Topic:test1 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
#模拟消費者consumer(kafka之前的版本參數是:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
#另外一個視窗,模拟生産者producer
# cd /opt/ytd_soft/kafka_2.11-2.0.0
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
>11
>111
>222
使用單獨的zookeeper起kafka
安裝啟動zookeeper
cd /opt/ytd_soft
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
tar xvf zookeeper-3.4.12.tar.gz
cd zookeeper-3.4.12/
bin/zkServer.sh start conf/zoo_sample.cfg
bin/zkServer.sh status conf/zoo_sample.cfg
ss -lutnp|grep 2181
部署kafka,更改kafka配置檔案server.properties(若zookeeper是叢集或zookeeper不在本機上必須更改),其他步驟和上面的一樣
zookeeper.connect=localhost:2181
2、使用docker部署(自己建立鏡像)
使用kafka帶的zookeeper起kafka
使用kafka帶的zookeeper起kafka
部署zookeeper
FROM centos:6.9
RUN cp -rp /etc/yum.repos.d/CentOS-Base.repo{,.bak} && curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
RUN yum -y install vim lsof wget tar bzip2 unzip vim-enhanced passwd sudo yum-utils hostname net-tools rsync man git make automake cmake patch logrotate python-devel libpng-devel libjpeg-devel pwgen python-pip
RUN mkdir /opt/java &&\
wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u191-b12/2787e4a523244c269598db4e85c51e0c/jdk-8u191-linux-x64.tar.gz -P /opt/java
RUN tar zxvf /opt/java/jdk-8u191-linux-x64.tar.gz -C /opt/java &&\
JAVA_HOME=/opt/java/jdk1.8.0_191 &&\
sed -i "/^PATH/i export JAVA_HOME=$JAVA_HOME" /root/.bash_profile &&\
sed -i "s%^PATH.*$%&:$JAVA_HOME/bin%g" /root/.bash_profile &&\
source /root/.bash_profile
ENV ZOOKEEPER_VERSION "3.4.12"
RUN mkdir /opt/zookeeper &&\
wget http://mirror.olnevhost.net/pub/apache/zookeeper/zookeeper-$ZOOKEEPER_VERSION/zookeeper-$ZOOKEEPER_VERSION.tar.gz -P /opt/zookeeper
RUN tar zxvf /opt/zookeeper/zookeeper*.tar.gz -C /opt/zookeeper
RUN echo "source /root/.bash_profile" > /opt/zookeeper/start.sh &&\
echo "cp /opt/zookeeper/zookeeper-"$ZOOKEEPER_VERSION"/conf/zoo_sample.cfg /opt/zookeeper/zookeeper-"$ZOOKEEPER_VERSION"/conf/zoo.cfg" >> /opt/zookeeper/start.sh &&\
echo "/opt/zookeeper/zookeeper-$"ZOOKEEPER_VERSION"/bin/zkServer.sh start-foreground" >> /opt/zookeeper/start.sh
EXPOSE 2181
ENTRYPOINT ["sh", "/opt/zookeeper/start.sh"]
jdk和zookeeper建立鏡像時下載下傳
jdk和zookeeper安裝包提前下載下傳
FROM centos:6.9
RUN cp -rp /etc/yum.repos.d/CentOS-Base.repo{,.bak} && curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
RUN yum -y install vim lsof wget tar bzip2 unzip vim-enhanced passwd sudo yum-utils hostname net-tools rsync man git make automake cmake patch logrotate python-devel libpng-devel libjpeg-devel pwgen python-pip
RUN mkdir /opt/java
ADD jdk-8u191-linux-x64.tar.gz /opt/java
RUN JAVA_HOME=/opt/java/jdk1.8.0_191 &&\
sed -i "/^PATH/i export JAVA_HOME=$JAVA_HOME" /root/.bash_profile &&\
sed -i "s%^PATH.*$%&:$JAVA_HOME/bin%g" /root/.bash_profile &&\
source /root/.bash_profile
ENV ZOOKEEPER_VERSION "3.4.12"
RUN mkdir /opt/zookeeper
ADD zookeeper-"$ZOOKEEPER_VERSION".tar.gz /opt/zookeeper
RUN echo "source /root/.bash_profile" > /opt/zookeeper/start.sh &&\
echo "cp /opt/zookeeper/zookeeper-"$ZOOKEEPER_VERSION"/conf/zoo_sample.cfg /opt/zookeeper/zookeeper-"$ZOOKEEPER_VERSION"/conf/zoo.cfg" >> /opt/zookeeper/start.sh &&\
echo "/opt/zookeeper/zookeeper-$"ZOOKEEPER_VERSION"/bin/zkServer.sh start-foreground" >> /opt/zookeeper/start.sh
EXPOSE 2181
ENTRYPOINT ["sh", "/opt/zookeeper/start.sh"]
部署kafka(jdk和kafka安裝包提前下載下傳)
FROM centos:6.9
ENV KAFKA_VERSION "2.11-2.0.0"
RUN cp -rp /etc/yum.repos.d/CentOS-Base.repo{,.bak} && curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
RUN yum -y install vim lsof wget tar bzip2 unzip vim-enhanced passwd sudo yum-utils hostname net-tools rsync man git make automake cmake patch logrotate python-devel libpng-devel libjpeg-devel pwgen python-pip
RUN mkdir /opt/java
ADD jdk-8u191-linux-x64.tar.gz /opt/java
RUN JAVA_HOME=/opt/java/jdk1.8.0_191 &&\
sed -i "/^PATH/i export JAVA_HOME=$JAVA_HOME" /root/.bash_profile &&\
sed -i "s%^PATH.*$%&:$JAVA_HOME/bin%g" /root/.bash_profile &&\
source /root/.bash_profile
RUN mkdir /opt/kafka
ADD kafka_$KAFKA_VERSION.tgz /opt/kafka
RUN sed -i 's/num.partitions.*$/num.partitions=3/g' /opt/kafka/kafka_$KAFKA_VERSION/config/server.properties
RUN echo "source /root/.bash_profile" > /opt/kafka/start.sh &&\
echo "cd /opt/kafka/kafka_"$KAFKA_VERSION >> /opt/kafka/start.sh &&\
echo "sed -i 's%zookeeper.connect=.*$%zookeeper.connect=zookeeper:2181%g' /opt/kafka/kafka_"$KAFKA_VERSION"/config/server.properties" >> /opt/kafka/start.sh &&\
echo "bin/kafka-server-start.sh config/server.properties" >> /opt/kafka/start.sh &&\
chmod a+x /opt/kafka/start.sh
EXPOSE 9092
ENTRYPOINT ["sh", "/opt/kafka/start.sh"]
建立鏡像
cd /opt/kafka/dockerfile
docker build -t zookeeper:v1.1 -f zookeeper.dockerfile .
docker build -t kafka:v1.0
啟動kafka和zookeeper容器,先啟動zookeeper
docker images|egrep "zookeeper|kafka"
docker run -itd --name zookeeper -h zookeeper -p 2181:2181 zookeeper:v1.1 /bin/bash
#--link 關聯容器
docker run -itd --name kafka -h kafka -p 9092:9092 --link zookeeper kafka:v1.0 /bin/bash
ss -lutnp|egrep "2181|9092"
測試kafka的可用性
docker exec -it kafka /bin/bash
source /root/.bash_profile
#建立topic
cd /opt/kafka/kafka_2.11-2.0.0
bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test1 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test2 --partitions 3 --replication-factor 1
#檢視topic詳情
bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test2
bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test1
#起consumer(注意版本差異,有可能參數不同)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
#單開一個會話,起producer(輸入測試資料,看consumer是否全部接收)
docker exec -it kafka /bin/bash
source /root/.bash_profile
bin/kafka-console-producer.sh --broker-list localhost:9092
四)kafka 高可用
參考:http://www.jasongj.com/2015/04/24/KafkaColumn2/#ACK%E5%89%8D%E9%9C%80%E8%A6%81%E4%BF%9D%E8%AF%81%E6%9C%89%E5%A4%9A%E5%B0%91%E4%B8%AA%E5%A4%87%E4%BB%BD
1、CAP理論(原則或定理)
1、簡述CAP理論
CAP原則又稱CAP定理,指的是在一個分布式系統中, Consistency(一緻性)、 Availability(可用性)、Partition tolerance(分區容錯性),三者不可得兼。
CAP原則是NOSQL資料庫的基石。Consistency(一緻性)。 Availability(可用性)。Partition tolerance(分區容錯性)。
分布式系統的CAP理論:理論首先把分布式系統中的三個特性進行了如下歸納:
- 一緻性(C):在分布式系統中的所有資料備份,在同一時刻是否同樣的值。(等同于所有節點通路同一份最新的資料副本)
- 通過某個節點的寫操作結果對後面通過其他節點的讀操作可見
- 若更新資料後,并發通路情況下可立即感覺該更新,稱為強制一緻性
- 若允許之後部分或者全部感覺不到該更新,稱為弱一緻性
- 若之後的一段時間(通常該事件不固定)後,一定可以感覺該更新,稱為最終一緻性
- 可用性(A):在叢集中一部分節點故障後,叢集整體是否還能響應用戶端的讀寫請求。(對資料更新具備高可用性)
- 任何一個個沒有發生故障的節點,必須在有限的時間内傳回合理的結果
- 分區容忍性(P):以實際效果而言,分區相當于對通信的時限要求。系統如果不能在時限内達成資料一緻性,就意味着發生了分區的情況,必須就目前操作在C和A之間做出選擇。
- 部分節點當機或者無法與節點通信時,各分區間還可保持分布式系統的功能
2、CAP理論特征
CAP理論:分布式系統中,一緻性、可用性、分區容忍性最多隻可同時滿足兩個
一般分區容忍性都要求有保障,是以很多時候在可用性與一緻性之間做權衡。
2、一緻性方案
1、master-slave
- RDBMS的讀寫分離是典型的master-slave方案
- 同步複制可保證一緻性,但會影響可用性
- 異步複制可提高可用性,但會降低一緻性
2、WNR
- 主要用于去中心話(P2P)的分布式系統中。dynamoDB和Cassandra是采用此方案
- N代表副本數,W代表每次寫操作要保證的最少寫成功的副本數,R代表每次讀至少讀取的副本數
- 當W+R>N時,可保證每次讀取的資料至少有一個副本具有最新的更新(大于)
- 多個寫操作的順序難以保證,可能導緻多副本間的寫操作書序不一緻,dynamo通過向量适中保證最終一緻性
3、paxos及其變種一緻性算法(更多使用的是其變種)
- Google的chubby,zookeeper的Zab,RAFT
3、replica(複制使用pull)
- 當某個topic的replication-factor為N且N大于1時,每個partion都會有N個副本(replica)
- replica的個數小于等于broker數:對每個partition而言每個broker上隻會有一個replica,是以broker ID表示replica
- 所有partition的所有replica預設情況會均勻分布到所有broker上
4、何時commit——ISR
如何處理replica恢複?
五)docker叢集版kafka的安裝使用
1、建立zookeeper和kafka鏡像
實驗環境是MAC的注意:docker在MAC電腦實作,它不支援MAC通過hosts綁定或IP直接通路kafka叢集的
解決(前提配置檔案中寫的hostname):1、hosts綁定改成:127.0.0.1 kafka01
2、連接配接程式連接配接位址寫成:localhost:9092
六)kafka如何使用zookeeper
1、配置管理
2、leader election
3、服務發現
七)kafka高性能之道
1、高效使用磁盤
- 順序寫磁盤 順序寫磁盤性能高于随機寫記憶體
- Append Only 資料不更新,無記錄級的資料删除(隻會整個segment删除)
- 充分利用Page Cache
- I/O Scheduler将連續的小塊寫組裝成大塊的實體寫進而提高性能
- I/O Scheduler會嘗試将一些寫操作重新按順序排好,進而減少磁盤頭的移動時間
- 充分利用所有空閑記憶體(非JVM記憶體)
- 應用層cache也會有對應的page cache與之對應,直接使用page cache可增大可用cache
- 如使用heap内的cache,會增加GC負擔
- 讀操作可直接在page cache内進行。如果程序重新開機,JVM内的cache會失效,但page cache仍然可用
- 可通過如下參數強制flush,但不建議
- log.flush.interval.messages=10000
- log.flush.interval.ms=1000
- 支援多directory(多使用多drive)
2、零拷貝
1、傳統模式下的拷貝
傳統模式下的拷貝:資料從檔案傳輸到網絡需要4次資料拷貝,4次上下文切換(使用者态和核心态)和2次系統調用
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);
2、零拷貝
通過NIO的transferTo/transferFrom調用作業系統的sendfile實作零拷貝。總共發生2次核心資料拷貝(沒有CPU參與(或沒有使用者态的)的拷貝),2次上下文切換和1次系統調用,消除了CPU資料拷貝
public void transferTo(long position, long
僞代碼
3、批處理和壓縮
- Producer和Consumer均支援批量處理資料,進而減少了網絡傳輸的開銷 (少次多量)
- Producer可将資料壓縮後發送給broker,進而減少網絡傳輸代價。目前支援Snappy, Gzip和LZ4壓縮
4、partition(可以設定成broker數量一緻)
- 通過Partition實作了并行處理和水準擴充
- Partition是Kafka(包括Kafka Stream)并行處理的最小機關
- 不同Partition可處于不同的Broker(節點),充分利用多機資源
- 同一Broker(節點)上的不同Partition可置于不同的Directory,如果節點上有多個Disk Drive,可将不同的Drive對應不同的Directory,進而使Kafka充分利用多Disk Drive的磁盤優勢
5、ISR
ISR(In-Sync Replicas)
對每個消息都做f+1的備份:以單個消息為進行備份的基本機關,進行可靠性保障
ISR最核心的思想:以一段時間而非以一個消息為基本機關,進行可靠性保障
ISR實作了可用性和一緻性的動态平衡 (會話失效後10秒删除節點)
replica.lag.time.max.ms=10000
ISR可容忍更多的節點失敗
- Majority Quorum如果要容忍f個節點失敗,則至少需要2f+1個節點
- ISR如果要容忍f個節點失敗,至少需要f+1個節點
如何處理Replica Crash
- Leader crash後,ISR中的任何replica皆可競選成為Leader
- 如果所有replica都crash,可選擇讓第一個recover的replica或者第一個在ISR中的replica成為leader
- unclean.leader.election.enable=true
八)kafka資料遷移
kafka叢集中資料自動遷移:http://blog.chinaunix.net/uid-30242191-id-5780549.html