天天看點

高性能消息系統——Kafka什麼是Kafka?它與傳統的mq差別?基本術語基本互動原理基本CLI操作建立一個Producer建立一個Consumer

什麼是Kafka?

引用官方原文: “ Kafka is a distributed, partitioned, replicated commit log service.” 它提供了一個非常特殊的消息機制,不同于傳統的mq。 官網:https://kafka.apache.org

它與傳統的mq差別?

  • 更快!單機上萬TPS
  • 傳統的MQ,消息被消化掉後會被mq删除,而kafka中消息被消化後不會被删除,而是到配置的expire時間後,才删除
  • 傳統的MQ,消息的Offset是由MQ維護,而kafka中消息的Offset是由用戶端自己維護
  • 分布式,把寫入壓力均攤到各個節點。可以通過增加節點降低壓力

基本術語

為友善了解,我用對比傳統MQ的方式闡述這些基本術語。 Producer

Consumer 這兩個與傳統的MQ一樣,不解釋了 Topic Kafka中的topic其實對應傳統MQ的channel,即消息管道,例如同一業務用同一根管道 Broker 叢集中的KafkaServer,用來提供Partition服務 Partition  假如說傳統的MQ,傳輸消息的通道(channel)是一條雙車道公路,那麼Kafka中,Topic就是一個N車道的高速公路。每個車道都可以行車,而每個車道就是Partition。

  • 一個Topic中可以有一個或多個partition。
  • 一個Broker上可以跑一個或多個Partition。叢集中盡量保證partition的均勻分布,例如定義了一個有3個partition的topic,而隻有兩個broker,那麼一個broker上跑兩個partition,而另一個是1個。但是如果有3個broker,必然是3個broker上各跑一個partition。
  • Partition中嚴格按照消息進入的順序排序
  • 一個從Producer發送來的消息,隻會進入Topic的某一個Partition(除非特殊實作Producer要求消息進入所有Partition)
  • Consumer可以自己決定從哪個Partition讀取資料

Offset 單個Partition中的消息的順序ID,例如第一個進入的Offset為0,第二個為1,以此類推。傳統的MQ,Offset是由MQ自己維護,而kafka是由client維護

Replica Kafka從0.8版本開始,支援消息的HA,通過消息複制的方式。在建立時,我們可以指定一個topic有幾個partition,以及每個partition有幾個複制。複制的過程有同步和異步兩種,根據性能需要選取。 正常情況下,寫和讀都是通路leader,隻有當leader挂掉或者手動要求重新選舉,kafka會從幾個複制中選舉新的leader。 Kafka會統計replica與leader的同步情況。當一個replica與leader資料相差不大,會被認為是一個"in-sync" replica。隻有"in-sync" replica才有資格參與重新選舉。

ConsumerGroup 一個或多個Consumer構成一個ConsumerGroup,一個消息應該隻能被同一個ConsumerGroup中的一個Consumer消化掉,但是可以同時發送到不同ConsumerGroup。 通常的做法,一個Consumer去對應一個Partition。 傳統MQ中有queuing(消息)和publish-subscribe(訂閱)模式,Kafka中也支援:

  • 當所有Consumer具有相同的ConsumerGroup時,該ConsumerGroup中隻有一個Consumer能收到消息,就是queuing模式
  • 當所有Consumer具有不同的ConsumerGroup時,每個ConsumerGroup會收到相同的消息,就是publish-subscribe模式

基本互動原理

每個Topic被建立後,在zookeeper上存放有其metadata,包含其分區資訊、replica資訊、LogAndOffset等

預設路徑/brokers/topics/<topic_id>/partitions/<partition_index>/state Producer可以通過zookeeper獲得topic的broker資訊,進而得知需要往哪寫資料。 Consumer也從zookeeper上獲得該資訊,進而得知要監聽哪個partition。

基本CLI操作

1. 建立Topic ./kafka-create-topic.sh --zookeeper 10.1.110.21:2181 --replica 2 --partition 3 --topic test

2. 檢視Topic資訊 ./kafka-list-topic.sh --topic test --zookeeper 10.1.110.24:2181

3. 增加Partition ./kafka-add-partitions.sh --partition 4 --topic test --zookeeper 10.1.110.24:2181 更多指令參見:https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools

建立一個Producer

Kafka提供了java api,Producer特别的簡單,舉傳輸byte[] 為例

Properties p = new Properties();
props.put("metadata.broker.list", "10.1.110.21:9092");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer<String, byte[]>(config);
producer.send(byte[] msg);
           

更具體的參見: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

建立一個Consumer

Kafka提供了兩種java的Consumer API:High Level Consumer和Simple Consumer 看上去前者似乎要更牛B一點,事實上,前者做了更多的封裝,比後者要Simple的多…… 具體例子我就不寫了,參見 High Level Consumer: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Simple Consumer: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

繼續閱讀