天天看點

Kafka

kafka最初是 linkedin 的一個内部基礎設施系統。最初開發的起因是,linkedin雖然有了資料庫和其他系統可以用來存儲資料,但是缺乏一個可以幫助處理持續資料流的元件。是以在設計理念上,開發者不想隻是開發一個能夠存儲資料的系統,如關系資料庫、nosql 資料庫、搜尋引擎等等,更希望把資料看成一個持續變化和不斷增長的流,并基于這樣的想法建構出一個資料系統,一個資料架構。

kafka外在表現很像消息系統,允許釋出和訂閱消息流,但是它和傳統的消息系統有很大的差異,

首先,kafka 是個現代分布式系統,以叢集的方式運作,可以自由伸縮。

其次,kafka 可以按照要求存儲資料,儲存多久都可以,

第三,流式處理将資料處理的層次提示到了新高度,消息系統隻會傳遞資料,kafka 的流式處理能力可以讓我們用很少的代碼就能動态地處理派生流和資料集。是以 kafka 不僅僅是個消息中間件。

kafka不僅僅是一個消息中間件,同時它是一個流平台,這個平台上可以釋出和訂閱資料流(kafka 的流,有一個單獨的包 stream 的處理),并把他們儲存起來,進行處理,這個是 kafka作者的設計理念。

大資料領域,kafka 還可以看成實時版的 hadoop,但是還是有些差別,hadoop 可以存儲和定期處理大量的資料檔案,往往以 tb 計數,而 kafka可以存儲和持續處理大型的資料流。hadoop 主要用在資料分析上,而 kafka因為低延遲,更适合于核心的業務應用上。是以國内的大公司一般會結合使用,比如京東在實時資料計算架構中就使用了到了 kafka,具體見《張開濤-海量資料下的應用系統架構實踐》

常見的大資料處理架構:storm、spark、flink、(blink 阿裡)

kafka名字的由來:卡夫卡與法國作家馬塞爾·普魯斯特,愛爾蘭作家詹姆斯·喬伊斯并稱為西方現代主義文學的先驅和大師。《變形記》是卡夫卡的短篇代表作,是卡夫卡的藝術成就中的一座高峰,被認為是 20 世紀最偉大的小說作品之一(達到管理層的高度應該多看下人文相關的書籍,增長管理知識和人格魅力)。

本文以 kafka_2.11-2.3.0 版本為主,其餘版本不予考慮,并且 kafka 是 scala 語言寫的,小衆語言,沒有必要研究其源碼,投入和産出比低,除非你的技術級别非常高或者需要去開發單獨的消息中間件。

  消息,kafka 裡的資料單元,也就是我們一般消息中間件裡的消息的概念(可以比作資料庫中一條記錄)。消息由位元組數組組成。消息還可以包含鍵(可選中繼資料,也是位元組數組),主要用于對消息選取分區。

作為一個高效的消息系統,為了提高效率,消息可以被分批寫入 kafka。批次就是一組消息,這些消息屬于同一個主題和分區。如果隻傳遞單個消息,會導緻大量的網絡開銷,把消息分成批次傳輸可以減少這開銷。但是,這個需要權衡(時間延遲和吞吐量之間),批次裡包含的消息越多,機關時間内處理的消息就越多,單個消息的傳輸時間就越長(吞吐量高延時也高)。如果進行壓縮,可以提升資料的傳輸和存儲能力,但需要更多的計算處理。

  對于 kafka來說,消息是晦澀難懂的位元組數組,一般我們使用序列化和反序列化技術,格式常用的有 json 和 xml,還有 avro(hadoop 開發的一款序列化架構),具體怎麼使用依據自身的業務來定。

  kafka裡的消息用主題進行分類(主題好比資料庫中的表),主題下有可以被分為若幹個分區(分表技術)。分區本質上是個送出日志檔案,有新消息,這個消息就會以追加的方式寫入分區(寫檔案的形式),然後用先入先出的順序讀取。

Kafka

  但是因為主題會有多個分區,是以在整個主題的範圍内,是無法保證消息的順序的,單個分區則可以保證。

  kafka通過分區來實作資料備援和伸縮性,因為分區可以分布在不同的伺服器上,那就是說一個主題可以跨越多個伺服器(這是 kafka 高性能的一個原因,多台伺服器的磁盤讀寫性能比單台更高)。

  前面我們說 kafka 可以看成一個流平台,很多時候,我們會把一個主題的資料看成一個流,不管有多少個分區。

  就是一般消息中間件裡生産者和消費者的概念。一些其他的進階用戶端 api,像資料管道 api 和流式處理的 kafka stream,都是使用了最基本的生産者和消費者作為内部元件,然後提供了進階功能。

  生産者預設情況下把消息均衡分布到主題的所有分區上,如果需要指定分區,則需要使用消息裡的消息鍵和分區器。

  消費者訂閱主題,一個或者多個,并且按照消息的生成順序讀取。消費者通過檢查所謂的偏移量來區分消息是否讀取過。偏移量是一種中繼資料,一個不斷遞增的整數值,建立消息的時候,kafka 會把他加入消息。在一個主題中一個分區裡,每個消息的偏移量是唯一的。每個分區最後讀取的消息偏移量會儲存到 zookeeper 或者 kafka 上,這樣分區的消費者關閉或者重新開機,讀取狀态都不會丢失。

  多個消費者可以構成一個消費者群組。怎麼構成?共同讀取一個主題的消費者們,就形成了一個群組。群組可以保證每個分區隻被一個消費者使用。

Kafka

  消費者和分區之間的這種映射關系叫做消費者對分區的所有權關系,很明顯,一個分區隻有一個消費者,而一個消費者可以有多個分區。

  (吃飯的故事:一桌一個分區,多桌多個分區,生産者不斷生産消息(消費),消費者就是買單的人,消費者群組就是一群買單的人),一個分區隻能被消費者群組中的一個消費者消費(不能重複消費),如果有一個消費者挂掉了<james 跑路了>,另外的消費者接上)

  一個獨立的 kafka 伺服器叫 broker。broker 的主要工作是,接收生産者的消息,設定偏移量,送出消息到磁盤儲存;為消費者提供服務,響應請求,傳回消息。在合适的硬體上,單個 broker 可以處理上千個分區和每秒百萬級的消息量。(要達到這個目的需要做作業系統調優和 jvm 調優)

  多個 broker 可以組成一個叢集。每個叢集中 broker 會選舉出一個叢集控制器。控制器會進行管理,包括将分區配置設定給 broker 和監控 broker。

  叢集裡,一個分區從屬于一個 broker,這個 broker 被稱為首領。但是分區可以被配置設定給多個 broker,這個時候會發生分區複制。

  叢集中 kafka 内部一般使用管道技術進行高效的複制。

Kafka

分區複制帶來的好處是,提供了消息備援。一旦首領 broker 失效,其他 broker 可以接管上司權。當然相關的消費者和生産者都要重新連接配接到新的首領上。

  在一定期限内保留消息是 kafka 的一個重要特性,kafka broker 預設的保留政策是:要麼保留一段時間(7 天),要麼保留一定大小(比如 1 個 g)。

到了限制,舊消息過期并删除。但是每個主題可以根據業務需求配置自己的保留政策(開發時要注意,kafka 不像 mysql 之類的永久存儲)。

  多生産者和多消費者

  基于磁盤的資料存儲,換句話說,kafka 的資料天生就是持久化的。

  高伸縮性,kafka 一開始就被設計成一個具有靈活伸縮性的系統,對線上叢集的伸縮絲毫不影響整體系統的可用性。

  高性能,結合橫向擴充生産者、消費者和 broker,kafka 可以輕松處理巨大的資訊流(linkedin 公司每天處理萬億級資料),同時保證亞秒級的消息延遲。

  跟蹤網站使用者和前端應用發生的互動,比如頁面通路次數和點選,将這些資訊作為消息釋出到一個或者多個主題上,這樣就可以根據這些資料為機器學習提供資料,更新搜素結果等等(頭條、淘寶等總會推送你感興趣的内容,其實在資料分析之前就已經做了活動跟蹤)。

  标準消息中間件的功能

  收集應用程式和系統的度量監控名額,或者收集應用日志資訊,通過 kafka路由到專門的日志搜尋系統,比如 es。(國内用得較多)

  收集其他系統的變動日志,比如資料庫。可以把資料庫的更新釋出到 kafka上,應用通過監控事件流來接收資料庫的實時更新,或者通過事件流将資料庫的更新複制到遠端系統。

  還可以當其他系統發生了崩潰,通過重放日志來恢複系統的狀态。(異地災備)

  操作實時資料流,進行統計、轉換、複雜計算等等。随着大資料技術的不斷發展和成熟,無論是傳統企業還是網際網路公司都已經不再滿足于離線批處理,實時流處理的需求和重要性日益增長 。

  近年來業界一直在探索實時流計算引擎和 api,比如這幾年火爆的 spark streaming、kafka streaming、beam 和 flink,其中阿裡雙 11 會場展示的實時銷售金額,就用的是流計算,是基于 flink,然後阿裡在其上定制化的 blink。

  kafka是 java 生态圈下的一員,用 scala 編寫,運作在 java 虛拟機上,是以安裝運作和普通的 java 程式并沒有什麼差別。

  安裝 kafka官方說法,java 環境推薦 java8。

  kafka需要 zookeeper 儲存叢集的中繼資料資訊和消費者資訊。kafka一般會自帶 zookeeper,但是從穩定性考慮,應該使用單獨的 zookeeper,而且建構zookeeper 叢集。

  在 http://kafka.apache.org/downloads 上尋找合适的版本下載下傳,這裡選用的是 kafka_2.11-2.3.0,下載下傳完成後解壓到本地目錄。

  啟動 zookeeper

  進入 kafka目錄下的 bin\windows

  執行 kafka-server-start.bat ../../config/server.properties,出現以下畫面表示成功

  linux下與此類似,進入 bin 後,執行對應的 sh 檔案即可

Kafka

##列出所有主題

kafka-topics.bat --zookeeper localhost:2181 --list

##列出所有主題的詳細資訊

kafka-topics.bat --zookeeper localhost:2181 --describe

##建立主題 主題名 my-topic,1 副本,8 分區

kafka-topics.bat --zookeeper localhost:2181 --create --topic my-topic --replication-factor 1 --partitions 8

##增加分區,注意:分區無法被删除

kafka-topics.bat --zookeeper localhost:2181 --alter --topic my-topic --partitions 16

##删除主題

kafka-topics.bat --zookeeper localhost:2181 --delete --topic my-topic

##建立生産者(控制台)

kafka-console-producer.bat --broker-list localhost:9092 --topic my-topic

##建立消費者(控制台)

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning

##列出消費者群組(僅 linux)

kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --list

##列出消費者群組詳細資訊(僅 linux)

kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --describe --group 群組名

  配置檔案放在 kafka目錄下的 config 目錄中,主要是 server.properties 檔案

  在單機時無需修改,但在叢集下部署時往往需要修改。它是個每一個 broker 在叢集中的唯一表示,要求是正數。當該伺服器的 ip 位址發生改變時,broker.id 沒有變化,則不會影響 consumers 的消息情況

  監聽清單(以逗号分隔 不同的協定(如 plaintext,trace,ssl、不同的 ip 和端口)),hostname 如果設定為 0.0.0.0 則綁定所有的網卡位址;如果 hostname 為空則綁定預設的網卡。

如果沒有配置則預設為 java.net.inetaddress.getcanonicalhostname()。

如:plaintext://myhost:9092,trace://:9091 或 plaintext://0.0.0.0:9092,

  zookeeper 叢集的位址,可以是多個,多個之間用逗号分割。(一組 hostname:port/path 清單,hostname 是 zk 的機器名或 ip、port 是 zk 的端口、/path是可選 zk 的路徑,如果不指定,預設使用根路徑)

  kafka把所有的消息都儲存在磁盤上,存放這些資料的目錄通過 log.dirs 指定。可以使用多路徑,使用逗号分隔。如果是多路徑,kafka會根據“最少使用”原則,把同一個分區的日志片段儲存到同一路徑下。會往擁有最少資料分區的路徑新增分區。

  每資料目錄用于日志恢複啟動和關閉時的線程數量。因為這些線程隻是伺服器啟動(正常啟動和崩潰後重新開機)和關閉時會用到。是以完全可以設定大量的線程來達到并行操作的目的。注意,這個參數指的是每個日志目錄的線程數,比如本參數設定為 8,而 log.dirs 設定為了三個路徑,則總共會啟動24 個線程。

  是否允許自動建立主題。如果設為 true,那麼 produce(生産者往主題寫消息),consume(消費者從主題讀消息)或者 fetch metadata(任意用戶端向主題發送中繼資料請求時)一個不存在的主題時,就會自動建立。預設為 true。

  建立主題的預設參數

  每個建立主題的分區個數(分區個數隻能增加,不能減少 )。這個參數一般要評估,比如,每秒鐘要寫入和讀取 1000m 資料,如果現在每個消費者每秒鐘可以處理 50mb 的資料,那麼需要 20 個分區,這樣就可以讓 20 個消費者同時讀取這些分區,進而達到設計目标。(一般經驗,把分區大小限制在25g 之内比較理想)

  日志儲存時間,預設為 7 天(168 小時)。超過這個時間會清理資料。bytes 和 minutes 無論哪個先達到都會觸發。與此類似還有 log.retention.minutes和log.retention.ms,都設定的話,優先使用具有最小值的那個。(提示:時間保留資料是通過檢查磁盤上日志片段檔案的最後修改時間來實作的。也就是最後修改時間是指日志片段的關閉時間,也就是檔案裡最後一個消息的時間戳)

  topic 每個分區的最大檔案大小,一個 topic 的大小限制 = 分區數*log.retention.bytes。-1 沒有大小限制。log.retention.bytes 和 log.retention.minutes任意一個達到要求,都會執行删除。(注意如果是 log.retention.bytes 先達到了,則是删除多出來的部分資料),一般不推薦使用最大檔案删除政策,而是推薦使用檔案過期删除政策。

  分區的日志存放在某個目錄下諸多檔案中,這些檔案将分區的日志切分成一段一段的,我們稱為日志片段。這個屬性就是每個檔案的最大尺寸;當尺寸達到這個數值時,就會關閉目前檔案,并建立新檔案。被關閉的檔案就開始等待過期。預設為 1g。

如果一個主題每天隻接受 100mb 的消息,那麼根據預設設定,需要 10 天才能填滿一個檔案。而且因為日志片段在關閉之前,消息是不會過期的,是以如果 log.retention.hours 保持預設值的話,那麼這個日志片段需要 17 天才過期。因為關閉日志片段需要 10 天,等待過期又需要 7 天。

Kafka

  作用和 log.segment.bytes 類似,隻不過判斷依據是時間。同樣的,兩個參數,以先到的為準。這個參數預設是不開啟的。

  表示一個伺服器能夠接收處理的消息的最大位元組數,注意這個值 producer 和 consumer 必須設定一緻,且不要大于 fetch.message.max.bytes 屬性的值(消費者能讀取的最大消息,這個值應該大于或等于 message.max.bytes)。該值預設是 1000000 位元組,大概 900kb~1mb。如果啟動壓縮,判斷壓縮後的值。

這個值的大小對性能影響很大,值越大,網絡和 io 的時間越長,還會增加磁盤寫入的大小。

kafka 設計的初衷是迅速處理短小的消息,一般 10k 大小的消息吞吐性能最好(linkedin 的 kafka性能測試)

  為 kafka 選擇合适的硬體更像是一門藝術,就跟它的名字一樣,我們分别從磁盤、記憶體、網絡和 cpu 上來分析,确定了這些關注點,就可以在預算範圍之内選擇最優的硬體配置。

  磁盤吞吐量(iops 每秒的讀寫次數)會影響生産者的性能。因為生産者的消息必須被送出到伺服器儲存,大多數的用戶端都會一直等待,直到至少有一個伺服器确認消息已經成功送出為止。也就是說,磁盤寫入速度越快,生成消息的延遲就越低。(ssd固态貴單個速度快,hdd 機械偏移可以多買幾個,設定多個目錄加快速度,具體情況具體分析)

  磁盤容量的大小,則主要看需要儲存的消息數量。如果每天收到 1tb 的資料,并保留 7 天,那麼磁盤就需要 7tb 的資料。

  kafka本身并不需要太大記憶體,記憶體則主要是影響消費者性能。在大多數業務情況下,消費者消費的資料一般會從記憶體(頁面緩存,從系統記憶體中分)

中擷取,這比在磁盤上讀取肯定要快的多。一般來說運作 kafka 的 jvm 不需要太多的記憶體,剩餘的系統記憶體可以作為頁面緩存,或者用來緩存正在使用的日志片段,是以我們一般 kafka不會同其他的重要應用系統部署在一台伺服器上,因為他們需要共享頁面緩存,這個會降低 kafka 消費者的性能。

Kafka

  網絡吞吐量決定了 kafka能夠處理的最大資料流量。它和磁盤是制約 kafka 拓展規模的主要因素。對于生産者、消費者寫入資料和讀取資料都要瓜分網絡流量。同時做叢集複制也非常消耗網絡。

  kafka對 cpu的要求不高,主要是用在對消息解壓和壓縮上。是以 cpu 的性能不是在使用 kafka的首要考慮因素。

  我們要為 kafka選擇合适的硬體時,優先考慮存儲,包括存儲的大小,然後考慮生産者的性能(也就是磁盤的吞吐量),選好存儲以後,再來選擇cpu 和記憶體就容易得多。網絡的選擇要根據業務上的情況來定,也是非常重要的一環。

Kafka

  本地開發,一台 kafka足夠使用。在實際生産中,叢集可以跨伺服器進行負載均衡,再則可以使用複制功能來避免單獨故障造成的資料丢失。同時叢集可以提供高可用性。

  要估量以下幾個因素:

    需要多少磁盤空間保留資料,和每個 broker 上有多少空間可以用。比如,如果一個叢集有 10tb 的資料需要保留,而每個 broker 可以存儲 2tb,那麼至少需要 5 個 broker。如果啟用了資料複制,則還需要一倍的空間,那麼這個叢集需要 10 個 broker。

    叢集處理請求的能力。如果因為磁盤吞吐量和記憶體不足造成性能問題,可以通過擴充 broker 來解決。

  非常簡單,隻需要兩個參數。第一,配置 zookeeper.connect,第二,為新增的 broker 設定一個叢集内的唯一性 id。

  kafka中的叢集是可以動态擴容的。

繼續閱讀