天天看點

kafka:(4) broker

一、zookeeper

  Kafka 使用 Zookeeper 來維護叢集成員的資訊。每個 broker 都有一個唯一辨別符, 這個辨別符可以在配置檔案裡指定, 也可以自動生成。 在 broker 啟動的時候, 它通過建立臨時節點把自己的 ID 注冊到 zookeeper。Kafka 元件訂閱 Zookeeper 的/brokers/ids 路徑(broker 在 zookeeper 上的注冊路徑) , 當有 broker 加入叢集或退出叢集時,這些元件就可以獲得通知。

  如果你要啟動另一個具有相同 ID 的 broker, 會得到一個錯誤。

  在 broker 停機、出現網絡分區或長時間垃圾回收停頓時,broker 會從 Zookeeper 上斷開連接配接, 此時 broker 在啟動時建立的臨時節點會自動從 Zookeeper 上移除。監聽 broker 清單的 Kafka 元件會被告知該 broker 已移除。

  在關閉 broker 時, 它對應的節點也會消失, 不過它的 ID 會繼續存在于其他資料結構中。例如,主題的副本清單裡就可能包含這些 ID。在完全關閉一 個 broker 之後, 如果使用相同的 ID 啟動另一個全新的 broker,它會立即加入叢集, 并擁有與舊 broker 相同的分區和主題。

二、控制器

  控制器其實就是一個 broker,隻不過它除了具有一般 broker 的功能之外,還負責分區首領的選舉。叢集裡第一個啟動的 broker 通過在 Zookeeper 裡建立一個臨時節點/controller 讓自己成為控制器。其他 broker 在啟動時也會嘗試建立這個節點,不過它們會收到一個“節點已存在”的異常,說明叢集裡已經有一個控制器了。其他 broker 在控制器節點上建立 Zookeeperwatch 對象,這樣它們就可以收到這個節點 的變更通知。這種方式可以確定叢集裡一次隻有一個控制器存在。

  如果控制器被關閉或者與 Zookeeper 斷開連接配接,zookeeper 上的臨時節點就會消失。叢集裡的其他 broker 通過 watch 對象得到控制器節點消失的通知,它們會嘗試讓自己成為新的控制器。第一個在 Zookeeper 裡成功建立控制器節點的 broker 就會成為新的控制器,其他節點會收到“節點已存在”的異常,然後在新的控制器節點上再次建立 watch 對象。每個新選出的控制器通過Zookeeper 的條件遞增操作獲得一個全新的、數值更大的controller epoch,其他broker在知道目前controller epoch後,如果收到由控制器發出的包含較舊epoch的消息,就會忽略它們。

  當控制器發現一個 broker 已經離開叢集,它就知道,那些失去首領的分區需要一個新首領 (這些分區的首領剛好是在這個 broker 上)。控制器周遊這些分區,并确定誰應該成為新首領 (簡單來說就是分區副本清單裡的下一個副本) , 然後向所有包含新首領或現有跟随者的 broker 發送請求。該請求消息包含了誰是新首領以及誰是分區跟随者的資訊。随後,新首領開始處理來自生産者和消費者的情求,而跟随者開始從新首領那裡複制消息。

  當控制器發現一個 broker 加入叢集時,它會使用 broker ID 來檢査新加入的 broker 是否包含現有分區的副本。如果有,控制器就把變更通知發送給新加入的 broker 和其他 broker,新 broker 上的副本開始從首領那裡複制消息。

  簡而言之,Kafka 使用 Zookeeper 的臨時節點來選舉控制器,并在節點加入叢集或退出叢集時通知控制器。控制器負責在節點加入或離開叢集時進行分 區首領選舉。控制器使用epoch來避免腦裂。

三、複制

  複制功能是 Kafka 架構的核心。Kafka 把自己描述成“一個分布式的、可分區的、可複制的送出日志服務”。複制之是以這麼關鍵,是因為它可以在個别節點失效時仍能保證 Kafka 的可用性和持久性。

  Kafka 使用主題來組織資料,每個主題被分為若幹個分區,每個分區有多個副本。那些副本被儲存在 broker 上, 每個 broker 可以儲存成百上千個屬于不同主題和分區的副本。

  副本有以下兩種類型。

首領副本:每個分區都有一個首領副本。為了保證一緻性,所有生産者請求和消費者請求都會經過這個副本。

跟随副本:首領以外的副本都是跟随者副本。跟随者副本不處理來自用戶端的請求,它們唯一的任務就是從首領那裡複制消息,保持與首領一緻的狀态。如果首領發生崩潰,其中的一個跟随者會被提升為新首領。

  首領的另一個任務是搞清楚哪個跟随者的狀态與自己是一緻的。跟随者為了保持與首領的狀态一緻,在有新消息到達時嘗試從首領那裡複制消息,不過有各種原因會導緻同步失敗。

  為了與首領保持同步, 跟随者向首領發送擷取資料的請求,這種請求與消費者為了讀取消息而發送的請求是一樣的。首領将響應消息發給跟随者。請求消息裡包含了跟随者想要擷取消息的偏移量,而且這些偏移量總是有序的 。

  一個跟随者副本先請求消息1,接着請求消息2,然後請求消息3,在收到這3個請求的響應之前,它是不會發送第 4 個請求消息的。如果跟随者發送了請求消息 4,那麼首領就知道它已經收到了前面 3 個請求的響應。通過査看每個跟随者請求的最新偏移量,首領就會知道每個跟随者複制的進度。如果跟随者在 10s 内沒有請求任何消息,或者雖然在請求消息,但在 10s 内沒有請求最新的資料,那麼它就會被認為是不同步的。如果一個副本無法與首領保持一緻,在首領發生失效時,它就不可能成為新首領,因為它沒有包含全部的消息。相反,持續請求得到的最新消息副本被稱為同步副本。在首領發生失效時,隻有同步副本才有可能被選為新首領。

  首選首領:建立主題時標明的首領就是分區的首選首領。之是以把它叫作首選首領,是因為在建立分區時,需要在 broker 之間均衡首領,避免讓包含了首領的broker負載過重。分區的副本清單裡的第一個副本一般就是首選首領,不管目前首領是哪一個副本,都不會改變這個事實。

四、處理請求

  broker 的大部分工作是處理用戶端、分區副本和控制器發送給分區首領的請求。用戶端發起連接配接并發送請求,broker 處理請求并作出響應。broker 按照請求到達的順序來處理它們一一這種順序保證讓 Kafka 具有了消息隊列的特性,同時保證儲存的消息也是有序的。

  broker 會在它所監聽的每一個端口上運作一個 Acceptor 線程,這個線程會建立一個連接配接,并把它交給 Processor 線程去處理。 Processor 線程(也被叫作“網絡線程”)的數量是可配置的。網絡線程負責從用戶端擷取請求消息,把它們放進請求隊列,然後從響應隊列擷取響應消息,把它們發送給用戶端。請求消息被放到請求隊列後,IO 線程會負責處理它們。

   

  生産請求:生産者發送的請求,它包含用戶端要寫入 broker 的消息。

  擷取請求:在消費者和跟随者副本需要從 broker 讀取消息時發送的請求。

  生産請求和擷取請求都必須發送給分區的首領副本。如果 broker 收到一個針對特定分區的請求,而該分區的首領在另一個 broker 上,那麼發送請求的用戶端會收到一個“非分區首領”的錯誤響應。當針對特定分區的擷取請求被發送到一個不含有該分區首領的 broker 上,也會出現同樣的錯誤。

  那麼用戶端怎麼知道該往哪裡發送請求呢?用戶端使用了另一種請求類型,也就是中繼資料請求。這種請求包含了用戶端感興趣的主題清單。伺服器端的響應消息裡指明了這些主題所包含的分區、每個分區都有哪些副本,以及哪個副本是首領。中繼資料請求可以發送給任意一個broker,因為所有broker都緩存了這些資訊。一般情況下,用戶端會把這些資訊緩存起來,并直接往目标 broker 上發送生産請求和擷取請求。它們需要時不時地通過發送中繼資料請求來重新整理這些資訊。

  acks 這個配置參數指定了需要多少個 broker 确認才可以認為一個消息寫入是成功的。如果 acks=1,那麼隻要首領收到消息就認為寫入成功;如果 acks=all,那麼需要所有同步副本收到消息才算寫入成功;如果 acks=0,那麼生産者在把消息發出去之後,完全不需要等待 broker 的響應。

  包含首領副本的 broker 在收到生産請求時,會對請求做一些驗證。 .

發送資料的使用者是否有主題寫入權限?

請求裡包含的 acks 值是否有效(隻允許出現0、1 或 all) ?

如果 acks=all,是否有足夠多的同步副本保證消息已經被安全寫入? 

  之後,消息被寫入本地磁盤。在消息被寫入分區的首領之後,broker 開始檢查 acks 配置參數,如果 acks 被設為 0 或 1, 那麼 broker 立即傳回響應;如果 acks 被設為 all,那麼請求會被儲存在一個叫作煉獄的緩沖區裡,直到首領發現所有跟随者副本都複制了消息,晌應才會被傳回給用戶端。

  用戶端發送請求,向 broker 請求主題分區裡具有特定偏移量的消息,好像在說: “請把主題 Test 分區 0 偏移量從 53 開始 的消息以及主題 Test 分區 3 偏移量從 64 開始的消息發給我。”

  用戶端還可以指定 broker 最多可以從一個分區裡傳回多少資料。

  請求需要先到達指定的分區首領上,然後用戶端通過查詢中繼資料來確定請求的路由是正确的。首領在收到請求時,它會先檢查請求是否有效一一比如,指定的偏移量在分區上是否存在?如果用戶端請求的是已經被删除的資料,或者請求的偏移量不存在,那麼 broker 将傳回一個錯誤。 如果請求的偏移量存在,broker 将按照用戶端指定的數量上限從分區裡讀取消息,再把消息傳回給用戶端。 Kafka 使用零複制技術向用戶端發送消息一一也就是說, Kafka 直接把消息從檔案(或者更确切地說是 Linux 檔案系統緩存)裡發送到網絡通道,而不需要經過任何中間緩沖區。這項技術避免了位元組複制,也不需要管理記憶體緩沖區,進而獲得更好的性能。

  用戶端除了可以設定 broker 傳回資料的上限,也可以設定下限。用戶端發送一個請求,broker 等到有足夠的資料時才把它們傳回給用戶端,然後用戶端再送出請求,而不是讓用戶端每隔幾毫秒就發送一次請求,每次隻能得到很少的資料甚至沒有資料。

  并不是所有儲存在分區首領上的資料都可以被用戶端讀取。大部分用戶端隻能讀取已經被寫入所有同步副本的消息(跟随者副本也不行,盡管它們也是消費者,否則複制功能就無法工作)。分區首領知道每個消息會被複制到哪個副本上,在消息還沒有被寫入所有同步副本之前,是不會發送給消費者的一一嘗試擷取這些消息的請求會得到空的響應而不是錯誤。

  因為還沒有被足夠多副本複制的消息被認為是“不安全”的一一如果首領發生崩潰,另一個副本成為新首領,那麼這些消息就丢失了。如果我們允許消費者讀取這些消息,可能就會破壞一緻性。 這也意味着,如果 broker 間的消息複制因為某些原因變慢,那麼消息到達消費者的時間也會随之變長(因為我們會先等待消息複制完畢)。延遲時間可以通過參數 replica.lag.time.max.ms 來配置,它指定了副本在複制消息時可被允許的最大延遲時間。

  

kafka:(4) broker

  到此為止,我們讨論了 Kafka 最為常見的幾種請求類型:中繼資料請求、生産請求和擷取請求。broker 之間也使用同樣的通信協定。它們之間的請求發生在 Kafka 内部,用戶端不應該使用這些請求。例如,當一個新首領被選舉出來,控制器會發送 LeaderAndIsr 請求給新首領(這樣它就可以開始接收來自用戶端的請求)和跟随者(這樣它們就知道要開始跟随新首領)。

  之前的 Kafka 消費者使用 Zookeeper 來跟蹤偏移量,在消費者啟動的時候,它通過檢查儲存在 Zookeeper 上的偏移量就可以知道從哪裡開始處理消息。因為各種原因,我們決定不再使用 Zookeeper 來儲存偏移量,而是把偏移量儲存在特定的 Kafka 主題上。

五、實體存儲

  Kafka 的基本存儲單元是分區。分區無法在多個 broker 間進行再細分,也無法在同一個 broker 的多個磁盤上進行再細分。

  在配置 Kafka 的時候,管理者指定了一個用于存儲分區的目錄清單,log.dirs 參數的值。

  在建立主題時,Kafka 首先會決定如何在 broker 間配置設定分區。假設你有 6 個 broker,打算建立一個包含 10 個分區的主題,并且複制系數為 3。那麼 Kafka 就會有 30 個分區副本,它們可以被配置設定給 6 個 broker。在進行分區配置設定時,我們要達到如下的目标。

在 broker 間平均地分布分區副本。對于我們的例子來說,就是要保證每個 broker 可以分到5個副本。

確定每個分區的每個副本分布在不同的 broker 上。

如果為 broker 指定了機架資訊,那麼盡可能把每個分區的副本配置設定到不同機架的 broker 上。

  為分區和副本選好合适的 broker 之後,接下來要決定這些分區應該使用哪個目錄。我們單獨為每個分區配置設定目錄,規則很簡單: 計算每個目錄裡的分區數量,新的分區總是被添加到數量最小的那個目錄裡。

  注意磁盤空間:在為 broker 配置設定分區時并沒有考慮可用空間和工作負載問題,但在将分區配置設定到磁盤上時會考慮分區數量,不考慮分區大小。 也就是說, 如果有些 broker 的磁盤空間比其他 broker 要大,有些分區異常大,或者同一個 broker 上有大小不同的磁盤,那麼在配置設定分區時要格外小心。

  保留資料是 Kafka 的一個基本特性,Kafka 不會一直保留資料,也不會等到所有消費者都讀取了消息之後才删除消息。相反,Kafka 管理者為每個主題配置了資料保留期限,規定資料被删除之前可以保留多長時間,或者清理資料之前可以保留的資料量大小。

  因為在一個大檔案裡查找和删除消息是很費時的,是以我們把分區分成若幹個片段。 預設情況下,每個片段包含 1GB 或一周的資料,如果達到片段上限,就關閉目前檔案,并打開一個新檔案。目前正在寫入資料的片段叫作活躍片段。活動片段永遠不會被删除,是以如果你要保留資料 1 天,但片段裡包含了 5 天的資料,那麼這些資料會被保留 5 天,因為在片段被關閉之前這些資料無法被删除。

  我們把 Kafka 的消息和偏移量儲存在檔案裡。儲存在磁盤上的資料格式與從生産者發送過來或者發送給消費者的消息格式是一樣的。因為使用了相同的消息格式進行磁盤存儲和網絡傳輸,Kafka 可以使用零複制技術給消費者發送消息,同時避免了對生産者已經壓縮過的消息進行解壓和再壓縮。

  如果生産者發送的是壓縮過的消息,那麼同一個批次的消息會被壓縮在一起,被當作“包裝消息”進行發送。broker 就會收到一個這樣的消息,然後再把它發送給消費者。消費者在解壓這個消息之後,會看到整個批次的消息,它們都有自己的時間戳和偏移量。如果在生産者端使用了壓縮功能,那麼發送的批次越大,就意味着在網絡傳輸和磁盤存儲方面會獲得越好的壓縮性能。

  消費者可以從 Kafka 的任意可用偏移量位置開始讀取消息。假設消費者要讀取從偏移量 100 開始的 1MB 消息,那麼 broker 必須立即定位到偏移量 100 (可能是在分區的任意一個片段裡),然後開始從這個位置讀取消息。為了幫助 broker 更快地定位到指定的偏移量,Kafka 為每個分區維護了一個索引。索引把偏移量映射到片段檔案和偏移量在檔案裡的位置。 索引也被分成片段,是以在删除消息時,也可以删除相應的索引。Kafka 不維護索引的校驗和。如果索引出現損壞, Kafka 會通過重新讀取消息并錄制偏移量和位置來重新生成索引。如果有必要,管理者可以删除索引,這樣做是絕對安全的,Kafka 會自動重新生成這些索引。

  一般情況下, Kafka 會根據設定的時間保留資料,把超過時效的舊資料删除掉。Kafka 通過改變主題的保留政策來滿足這些使用場景。早于保留時間的舊事件會被删除,為每個鍵保留最新的值,進而達到清理的效果。隻有當應用程式生成的事件裡包含了鍵值對時,為這些主題設定 compact 政策才有意義。如果主題包含 null 鍵,清理就會失敗。

  每個日志片段可以分為以下兩個部分。

  幹淨的部分 : 這些消息之前被清理過,每個鍵隻有一個對應的值,這個值是上一次清理時保留下來的。

  污濁的部分 : 這些消息是在上一次清理之後寫入的。

  如果在 Kafka 啟動時啟用了清理功能(log.cleaner.enabled),每個 broker 會啟動一個清理管理器線程和多個清理線程,它們負責執行清理任務。這些線程會選擇污濁率(污濁消息占分區總大小的比例)較高的分區進行清理。

  為了清理分區,清理線程會讀取分區的污濁部分,并在記憶體裡建立一個 map。 map 裡的每個元素包含了消息鍵的散列值和消息的偏移量。

  清理線程在建立好偏移量 map 後,開始從幹淨的片段處讀取消息,從最舊的消息開始,把它們的内容與 map 裡的内容進行比對。它會檢查消息的鍵是否存在于 map 中,如果不存在,那麼說明消息的值是最新的,就把消息複制到替換片段上。 如果鍵已存在,消息會被忽略,因為在分區的後部已經有一個具有相同鍵的消息存在。在複制完所有的消息之後,我們就将替換片段與原始片段進行交換,然後開始清理下一個片段。完成整個清理過程之後,每個鍵對應一個不同的消息——這些消息的值都是最新的。

  為了徹底把一個鍵從系統裡删除,應用程式必須發送一個包含該鍵且值為 null 的消息。清理線程發現該悄息時,會先進行正常的清理,隻保留值為 null 的消息。

  在 0.10.0 和更早的版本裡, Kafka 會在包含髒記錄的主題數量達到 50% 時進行清理。這樣做的目的是避免太過頻繁的清理(因為清理會影響主題的讀寫性能),同時也避免存在太多髒記錄(因為它們會占用磁盤空間)。

五、日志存儲

  Kafka 中的消息是以主題為基本機關進行歸類的,各個主題在邏輯上互相獨立。每個主題又可以分為一個或多個分區,分區的數量可以在主題建立的時候指定,也可以在之後修改。每條消息在發送的時候會根據分區規則被追加到指定的分區中,分區中的每條消息都會被配置設定一個唯一的序列号,也就是通常所說的偏移量。

  不考慮多副本的情況,一個分區對應一個日志(Log)。為了防止 Log 過大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分為多個LogSegment,相當于一個巨型檔案被平均配置設定為多個相對較小的檔案,這樣也便于消息的維護和清理。Log 在實體上隻以檔案夾的形式存儲,而每個LogSegment 對應于磁盤上的一個日志檔案和兩個索引檔案。

  向Log 中追加消息時是順序寫入的,隻有最後一個 LogSegment 才能執行寫入操作,在此之前所有的 LogSegment 都不能寫入資料。為了友善描述,我們将最後一個 LogSegment 稱為“activeSegment”,即表示目前活躍的日志分段。随着消息的不斷寫入,當activeSegment滿足一定的條件時,就需要建立新的activeSegment,之後追加的消息将寫入新的activeSegment。為了便于消息的檢索,每個LogSegment中的日志檔案(以“.log”為檔案字尾)都有對應的兩個索引檔案:偏移量索引檔案(以“.index”為檔案字尾)和時間戳索引檔案(以“.timeindex”為檔案字尾)。

kafka:(4) broker

繼續閱讀