歡迎支援筆者新作:《深入了解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公衆号:朱小厮的部落格。

衆所周知,目前Kafka的最新版本已經到達1.0.0,很多公司運作的kafka也大多更新到了0.10.x版本,Kafka的Producer用戶端早已不再使用0.8.2.x就已基本停止維護的Scala版本的Producer了,那麼我們還有必要了解它麼?當然很有必要,通過Kafka Old Producer我們可以了解Kafka變遷更新的曆史:舊版的Old Producer模型相對簡單利于初始了解,通過對Old Producer的了解也可以慢慢的發現隐患的問題,這樣進一步可以研究探讨解決方法,最後再通過對新版Producer的學習來提升對Kafka的認知,與此同時也可以讓讀者在遇到相似問題的時候可以借鑒Kafka的優化過來來優化自己的應用。以銅為鑒,可以正衣冠。
在使用Scala版本的Kafka生産者用戶端kafka.javaapi.producer.Producer時,實際上調用的是kafka.producer.Producer類。
包括kafka-console-producer.sh的腳本(常用來測試發送消息之用)中,對于0.8.2.x版本如果不指定“-- new-producer”參數;或者對于.0.0版本如果指定“-- old-producer”參數的話,實際上内部調用的都是kafka.producer.Producer這個類。
對于kafka-console-producer.sh腳本的内容如下:
我們看到實際上kafka-console-producer.sh的内容就是運作kafka.tools.ConsoleProducer而已,可以看到main函數代碼塊中的config.useOldProducer,這個筆者看的是1.0.0版本的代碼,而0.8.2.2版本中的ConsoleProducer對應的是config.useNewProducer,稍有不同而已,不過如果都指定使用舊版的Scala的Producer,那麼都是指kafka.producer.OldProducer。
進一步剖析,kafka.producer.OldProducer的内部構造很簡單,關鍵代碼如下:
可以看到内部的producer最終還是執行個體化的kafka.producer.Producer。最終驗證了開篇所述的舊版的Kafka生産者用戶端即為Kafka.producer.Producer。
新版的Java版的Kafka用戶端是:org.apache.kafka.clients.producer.KafkaProducer,讀者請注意區分。對于新版的KafkaProducer在以後的文章中會有詳細介紹。
下面就來深入了解下Kafka.producer.Producer(下面如無特殊說明都将Kafka.producer.Producer此簡稱為Producer)了。當執行個體化Producer的時候,首先要讀取、解析以及校驗配置資訊的合法性,根據配置資訊來執行個體化Producer。Producer的配置項有18個,比如設定分區器、消息壓縮方式等,這些都比較好了解,而最主要的配置就是request.required.acks和producer.type這兩個配置。
request.required.acks是用來配置生産端消息确認的方式,在0.8.x這個系列的版本之中,可以配置為0,1,-1的值,也可以配置為其他的整數值,用來控制一條消息經由多少個ISR中的副本所在的Broker确認之後才向用戶端發送确認資訊,這個參數在之後的版本,比如1.0.0版本中就隻能設定0,1,-1(all)這3(4)種取值,分别表示:
當request.required.acks=0時,這意味着producer無需等待來自broker的确認而繼續發送下一批消息。這種情況下資料傳輸效率最高,但是資料可靠性确是最低的。
當request.required.acks=1(預設)時,這意味着producer在ISR中的leader已成功收到資料并得到确認。如果leader當機了,則會丢失資料。
當request.required.acks=-1時,producer需要等待ISR中的所有follower都确認接收到資料後才算一次發送完成,可靠性最高。但是這樣也不能保證資料不丢失,比如當ISR中隻有leader時,這樣就變成了acks=1的情況。為了提高資料的可靠性,可以通過min.insync.replicas參數來輔助作用,當同步副本數不足時,生産者會跑出異常。
有關kafka的消息可靠性的更深層次的講解可以參考我2017年初的一篇部落格:kafka資料可靠性深度解讀,這篇部落客要是針對0.8.2.x版本的kafka做深層次的探讨,後續會對1.0.0版本做進一步的說明。
Producer的發送模式分為同步(sync)和異步(async)兩種情況,這一點可以通過參數producer.type來配置。同步模式會将消息直接發往broker中,而異步模式則會将消息存入LinkedBlockingQueue中,然後通過一個ProducerSendThread來專門發送消息。為了便于說明,筆者這裡先對同步模式的情況來做說明,而異步模式隻是在同步模式的基礎上做了一些封裝而已。
在講述Producer的具體行為之前先來看一個發送方的Demo:
我們可以看到再初始化Producer的時候之用了ProducerConfig這一個類型的參數,而在Producer的類定義中還用到了EventHandler這個類型的參數。在Scala語言中隻有一個主構造函數,這個主構造函數的參數清單就是跟在類名後面括号中的各個的參數,如果要重載的話就需要自定義輔助構造函數,輔助構造函數必須調用主構造函數(this方法)。如此上面這個Demo中很顯然的就調用了輔助構造函數來進行執行個體化,那麼我們再來看下其對應的輔助構造函數:
這裡又引入了兩個新的東西:DefaultEventHandler和ProducerPool,這個DefaultEventHandler繼承了EventHandler這個類,這個是消息發送的關鍵。而ProducerPool内部是一個HashMap,其中的key是broker的id,而value就是每個broker對應的SyncProducer,這個SyncProducer就是真正的消息發送者。
