版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/qq1010885678/article/details/47302557
目前使用的事件總線采用的是Kafka分布式消息隊列來完成的,近來項目需要接入到事件總線中,故開啟了kafka的學習之旅(之前一直在聽說kafka這玩意兒,但是學習計劃中還沒有将它安排進去,借着這個機會學習kafka也算是彌補了這方面的一些遺憾~)
關于kafka是神馬東西這裡就不在累述了,網上的資料一大堆
下面分享一些自己對kafka的了解,如有不妥之處還望指出~
(1)何為分布式消息隊列?有何特點?
1、一旦涉及到分布式這個概念,其就必須解決兩個問題:可靠性和可擴充性。
kafka通過事件回溯機制來實作高可靠性:當出現一些不可預料的異常導緻生産的消息沒有被相應的消費者消費掉時,kafka可以進行復原操作,将消息隊列恢複到這個異常事件的地方讓消費者可以重新消費,保證每個消息都有相應的消費者消費掉
kafka支援水準擴充,brocker(kafka叢集上的一個或多個伺服器)越多,叢集的吞吐率越高,如果你覺得你的kafka不夠用了,簡單,加大叢集規模,就像使用調節按鈕一樣簡單友善
2、既然叫消息隊列,那麼它就有隊列的特征,順序、按個存取。保證事件能夠按照規定的順序被處理掉,就像挂号排隊一樣,一個個領号,在按順序一個個處理
(2)那麼為什麼要用這個分布式消息隊列呢?
大多數的應用都需要處理一些事件,例如某個使用者點選了什麼東西,這個動作要進行一些額外的處理,但是處理的行為定義在另外一個程式中(當系統架構大了之後這種行為很常見)。正常的解決方法就是調用接口
但是這麼做有一個很明顯的問題,兩個應用之間産生了耦合,它們需要知道對方的存在,接口的調用方式是什麼。
當系統不是很大的時候這麼做并沒有什麼缺陷,反而會比較友善
但假如應用中要處理n個不同類型的事件,分别由n個不同的程式提供接口進行處理,在應用中必須要知道每個處理程式的調用方式,一方的修改會影響到另外一方。如果有n個應用中有n個事件需要n個程式來處理,互相調來調去,堆在一起很容易就變成了死亡代碼
這時候獨立的消息隊列就可以很簡單的解決這個問題,生産事件的應用隻管往隊列上發事件,不關心有沒有人去處理,怎麼處理,發了之後就不管了
而處理事件的程式從消息隊列中取得事件,不用關心是誰發的,怎麼發的
于是消息隊列将整個系統都解耦了
之前講到過,kafka中的消息都是按順序,一個個存取的,那麼問題來了:按順序一個個處理的話,如果應用A要處理消息a,應用B要處理消息b,消息a排在消息b之前,現在應用A不處理消息a了,而應用B馬上就要處理消息b,怎麼解決?
其實很簡單,kafka并不隻有一個隊列,不同應用發送的事件可以存放在不同的topic中而不互相影響,狹義上來講可以将topic這個概念就看成是一個消息隊列,而kafka中可以存在n個topic
(3)kafka的構架:
kafka由以下幾個部分組成:
Brocker:Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker。生産者向Brocker發送事件,消費者從Brocker中拿事件
Topic:之前提到過,可以将topic看成是一個消息隊列,每個事件都必須指定其要存在在哪個topic中(這裡倒是可以當成事件的分類來看待),topic是存在于brocker之中的
Partition:每個topic都可以包含一個或者多個partition,顧名思義,就是分區的意思,這裡和hadoop的mr程式的partitioner分區有些相似,可以将topic中的各個事件按照規則分開存放
Producer:即生産者,負責發送事件到kafka brocker
Consumer:即消費者,從kafka brocker讀取事件
Consumer Group:每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于預設的group)。
kafka通過Consumer Group來實作消息的廣播和單點傳播,一個事件發送到一個Consumer Group中,那麼這個Group中隻能有一個Consumer消費這個事件,但是其他的Consumer Group也可以由其中的一個Consumer來消費這個事件
示意圖:
如果要實作廣播,讓每個Consumer都能收到某個Topic中的事件,隻要讓各個Consumer處在不同的Consumer Group中即可;單點傳播則是将所有的Consumer放在一個Consumer Group中
kafka構架圖:
圖中的zk起到的作用就是負載均衡,将叢集中的變化及時同步到各個節點中,保證叢集是一緻的
(4)kafka下載下傳、運作測試
Apache下載下傳位址進入之後選擇Binary Downloads
選擇任意的一個版本即可(官方建議下載下傳2.10)
下載下傳下來的tgz包lib目錄中有開發kafka用戶端程式的相關jar包
将其解壓縮之後直接就可使用(因為公司已有kafka叢集,可以僅僅拿來當用戶端來連接配接測試使用)
進入kafka目錄執行以下指令:
顯示kafka中的topic清單
bin/kafka-topics.sh –list –zookeeper 54.223.171.230:2181
54.223.171.230:2181為叢集zk節點對應的hostname:port
生産事件
bin/kafka-console-producer.sh –broker-list amazontest:9092 –topic omni
amazontest:9092–為brocker的hostname:port
omni–為要發送到的topic
消費事件
bin/kafka-console-consumer.sh –zookeeper 54.223.171.230:2181 –topic omni –from-beginning
54.223.171.230:2181–消費事件要從zk中獲得,是以使用zk的位址
omni–從指定的topic中獲得事件
運作過程中出現一個異常,如下圖:
原因是我在本地連接配接遠端kafka叢集,而amazontest是定義在程式伺服器上的主機名,本地并不知道這個hostname映射到哪個ip,在/etc/hosts中加入此映射關系即可正常
最後推薦一篇文章(上面的圖都是偷這裡面的)
Kafka剖析(一):Kafka背景及架構介紹