天天看點

消息隊列與kafka

14.6 kafka

為什麼用消息隊列

舉例

轉化為計算機思想

場景

消息隊列工作流程

隊列産品

一個app系統消息隊列工作流程

在流式計算中,Kafka一般用來緩存資料,Storm通過消費Kafka的資料進行計算。

1)Apache Kafka是一個開源消息系統,由Scala寫成。是由Apache軟體基金會開發的一個開源消息系統項目。

2)Kafka最初是由LinkedIn公司開發,并于 2011年初開源。2012年10月從Apache Incubator畢業。該項目的目标是為處理實時資料提供一個統一、高通量、低等待的平台。

3)Kafka是一個分布式消息隊列。Kafka對消息儲存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka叢集有多個kafka執行個體組成,每個執行個體(server)成為broker。

4)無論是kafka叢集,還是producer和consumer都依賴于zookeeper叢集儲存一些meta資訊,來保證系統可用性。

點對點模式(一對一,消費者主動拉取資料,輪詢機制,消息收到後消息清除,ack确認機制)

點對點模型通常是一個基于<code>拉取</code>或者<code>輪詢</code>的消息傳送模型,這種模型從隊列中請求資訊,而不是将消息推送到用戶端。

這個模型的特點是發送到隊列的消息被一個且隻有一個接收者接收處理,即使有多個消息監聽者也是如此。

釋出/訂閱模式(一對多,資料生産後,推送給所有訂閱者)

釋出訂閱模型則是一個基于推送的消息傳送模型。

釋出訂閱模型可以有多種不同的訂閱者,臨時訂閱者隻在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的所有消息,即使目前訂閱者不可用,處于離線狀态。

1)程式解耦

允許你獨立的擴充或修改兩邊的處理過程,隻要確定它們遵守同樣的接口限制。

2)備援:

消息隊列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丢失風險。

許多消息隊列所采用的"插入-擷取-删除"範式中,在把一個消息從隊列中删除之前,需要你的處理系統明确的指出該消息已經被處理完畢,進而確定你的資料被安全的儲存直到你使用完畢。

3)峰值處理能力:

(大白話,就是本來公司業務隻需要5台機器,但是臨時的秒殺活動,5台機器肯定受不了這個壓力,我們又不可能将整體伺服器架構提升到10台,那在秒殺活動後,機器不就浪費了嗎?是以引入消息隊列)

在通路量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。

如果為以能處理這類峰值通路為标準來投入資源随時待命無疑是巨大的浪費。

使用消息隊列能夠使關鍵元件頂住突發的通路壓力,而不會因為突發的超負荷的請求而完全崩潰。

4)可恢複性:

系統的一部分元件失效時,不會影響到整個系統。

消息隊列降低了程序間的耦合度,是以即使一個處理消息的程序挂掉,加入隊列中的消息仍然可以在系統恢複後被處理。

5)順序保證:

在大多使用場景下,資料處理的順序都很重要。

大部分消息隊列本來就是排序的,并且能保證資料會按照特定的順序來處理。(Kafka保證一個Partition内的消息的有序性)

6)緩沖:

有助于控制和優化資料流經過系統的速度,解決生産消息和消費消息的處理速度不一緻的情況。

7)異步通信:

很多時候,使用者不想也不需要立即處理消息。比如發紅包,發短信等流程。

消息隊列提供了異步處理機制,允許使用者把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然後在需要的時候再去處理它們。

1)Producer :消息生産者,就是向kafka broker發消息的用戶端。

2)Consumer :消息消費者,向kafka broker取消息的用戶端

3)Topic :主題,可以了解為一個隊列。

4) Consumer Group (CG):這是kafka用來實作一個topic消息的廣播(發給所有的consumer)和單點傳播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會複制-給consumer。如果需要實作廣播,隻要每個consumer有一個獨立的CG就可以了。要實作單點傳播隻要所有的consumer在同一個CG。用CG還可以将consumer進行自由的分組而不需要多次發送消息到不同的topic。

5)Broker :一台kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic。

6)Partition:為了實作擴充性,一個非常大的topic可以分布到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被配置設定一個有序的id(offset)。kafka隻保證按一個partition中的順序将消息發給consumer,不保證一個topic的整體(多個partition間)的順序。

7)Offset:kafka的存儲檔案都是按照offset.kafka來命名,用offset做名字的好處是友善查找。例如你想找位于2049的位置,隻要找到2048.kafka的檔案即可。當然the first offset就是00000000000.kafka

​ Kafka每個主題的多個分區日志分布式地存儲在Kafka叢集上,同時為了故障容錯,每個(partition)分區都會以副本的方式複制到多個消息代理節點上。

其中一個節點會作為主副本(Leader),其他節點作為備份副本(Follower,也叫作從副本)。主副本會負責所有的用戶端讀寫操作,備份副本僅僅從主副本同步資料。當主副本出現故障時,備份副本中的一個副本會被選擇為新的主副本。因為每個分區的副本中隻有主副本接受讀寫,是以每個伺服器端都會作為某些分區的主副本,以及另外一些分區的備份副本,這樣Kafka叢集的所有服務端整體上對用戶端是負載均衡的。

Kafka的生産者和消費者相對于伺服器端而言都是用戶端。

Kafka生産者用戶端釋出消息到服務端的指定主題,會指定消息所屬的分區。

生産者釋出消息時根據消息是否有鍵,采用不同的分區政策。消息沒有鍵時,通過輪詢方式進行用戶端負載均衡;消息有鍵時,根據分區語義(例如hash)確定相同鍵的消息總是發送到同一分區。

Kafka的消費者通過訂閱主題來消費消息,并且每個消費者都會設定一個消費組名稱。因為生産者釋出到主題的每一條消息都隻會發送給消費者組的一個消費者。

是以,如果要實作傳統消息系統的“隊列”模型,可以讓每個消費者都擁有相同的消費組名稱,這樣消息就會負責均衡到所有的消費者;如果要實作“釋出-訂閱”模型,則每個消費者的消費者組名稱都不相同,這樣每條消息就會廣播給所有的消費者。

分區是消費者現場模型的最小并行機關。

如下圖(圖1)所示,生産者釋出消息到一台伺服器的3個分區時,隻有一個消費者消費所有的3個分區。在下圖(圖2)中,3個分區分布在3台伺服器上,同時有3個消費者分别消費不同的分區。假設每個伺服器的吞吐量時300MB,在下圖(圖1)中分攤到每個分區隻有100MB,而在下圖(圖2)中,叢集整體的吞吐量有900MB。可以看到,增加伺服器節點會提升叢集的性能,增加消費者數量會提升處理性能。

同一個消費組下多個消費者互相協調消費工作,Kafka會将所有的分區平均地配置設定給所有的消費者執行個體,這樣每個消費者都可以配置設定到數量均等的分區。Kafka的消費組管理協定會動态地維護消費組的成員清單,當一個新消費者加入消費者組,或者有消費者離開消費組,都會觸發再平衡操作。

Kafka的消費者消費消息時,隻保證在一個分區内的消息的完全有序性,并不保證同一個主題彙中多個分區的消息順序。而且,消費者讀取一個分區消息的順序和生産者寫入到這個分區的順序是一緻的。比如,生産者寫入“hello”和“Kafka”兩條消息到分區P1,則消費者讀取到的順序也一定是“hello”和“Kafka”。如果業務上需要保證所有消息完全一緻,隻能通過設定一個分區完成,但這種做法的缺點是最多隻能有一個消費者進行消費。一般來說,隻需要保證每個分區的有序性,再對消息假設鍵來保證相同鍵的所有消息落入同一分區,就可以滿足絕大多數的應用。

配置jdk環境

解壓縮,配置java環境變量

配置zookeeper環境,配置環境變量

zookeeper端口解釋

本文以standalone模式運作,并非叢集模式

zookeeper-3.4.14/conf/zoo.cfg修改如下參數

參數解釋

啟動zk服務端

/opt/kafka_2.11-2.2.0/config/server.properties修改如下參數

如果修改了kafka的啟動位址參數,注意可能出現的權限問題,或者删除logs目錄下的資料檔案

9092是kafka服務端

修改linux的PATH環境變量,支援kafka指令

啟動kafka服務端,指定配置檔案,背景啟動

看到如下提示,代表kafka啟動成功

環境準備

Python子產品安裝

生産者

消費者

繼續閱讀