天天看點

kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建

問題導讀:

1.zookeeper在kafka的作用是什麼?

2.kafka中幾乎不允許對消息進行“随機讀寫”的原因是什麼?

3.kafka叢集consumer和producer狀态資訊是如何儲存的?

4.partitions設計的目的的根本原因是什麼?

kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建

一、入門

    1、簡介

    kafka is a distributed,partitioned,replicated commit logservice。它提供了類似于jms的特性,但是在設計實作上完全不同,此外它并不是jms規範的實作。kafka對消息儲存時根據topic進行歸類,發送消息者成為producer,消息接受者成為consumer,此外kafka叢集有多個kafka執行個體組成,每個執行個體(server)成為broker。無論是kafka叢集,還是producer和consumer都依賴于zookeeper來保證系統可用性叢集儲存一些meta資訊。

kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建

   2、topics/logs

    一個topic可以認為是一類消息,每個topic将被分成多個partition(區),每個partition在存儲層面是append log檔案。任何釋出到此partition的消息都會被直接追加到log檔案的尾部,每條消息在檔案中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一标記一條消息。它唯一的标記一條消息。kafka并沒有提供其他額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行“随機讀寫”。

kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建

    kafka和jms(java message service)實作(activemq)不同的是:即使消息被消費,消息仍然不會被立即删除.日志檔案将會根據broker中的配置要求,保留一定的時間之後删除;比如log檔案保留2天,那麼兩天後,檔案會被清除,無論其中的消息是否被消費.kafka通過這種簡單的手段,來釋放磁盤空間,以及減少消息消費之後對檔案内容改動的磁盤io開支.

    對于consumer而言,它需要儲存消費消息的offset,對于offset的儲存和使用,有consumer來控制;當consumer正常消費消息時,offset将會"線性"的向前驅動,即消息将依次順序被消費.事實上consumer可以使用任意順序消費消息,它隻需要将offset重置為任意值..(offset将會儲存在zookeeper中,參見下文)

    kafka叢集幾乎不需要維護任何consumer和producer狀态資訊,這些資訊有zookeeper儲存;是以producer和consumer的用戶端實作非常輕量級,它們可以随意離開,而不會對叢集造成額外的影響.

    partitions的設計目的有多個.最根本原因是kafka基于檔案存儲.通過分區,可以将日志内容分散到多個server上,來避免檔案尺寸達到單機磁盤的上限,每個partiton都會被目前server(kafka執行個體)儲存;可以将一個topic切分多任意多個partitions,來消息儲存/消費的效率.此外越多的partitions意味着可以容納更多的consumer,有效提升并發消費的能力.(具體原理參見下文).

    3、distribution

    一個topic的多個partitions,被分布在kafka叢集中的多個server上;每個server(kafka執行個體)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition将會被備份到多台機器上,以提高可用性.

    基于replicated方案,那麼就意味着需要對多個備份進行排程;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那麼将會有其他follower來接管(成為新的leader);follower隻是單調的和leader跟進,同步消息即可..由此可見作為leader的server承載了全部的請求壓力,是以從叢集的整體考慮,有多少個partitions就意味着有多少個"leader",kafka會将"leader"均衡的分散在每個執行個體上,來確定整體的性能穩定.

    producers

    producer将消息釋出到指定的topic中,同時producer也能決定将此消息歸屬于哪個partition;比如基于"round-robin"方式或者通過其他的一些算法等.

    consumers

    本質上kafka隻支援topic.每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發送到topic的消息,隻會被訂閱此topic的每個group中的一個consumer消費.

    如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息将會在consumers之間負載均衡.

    如果所有的consumer都具有不同的group,那這就是"釋出-訂閱";消息将會廣播給所有的消費者.

    在kafka中,一個partition中的消息隻會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個topic中的每個partions,隻會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息.kafka隻能保證一個partition中的消息被某個consumer消費時,消息是順序的.事實上,從topic角度來說,消息仍不是有序的.

    kafka的設計原理決定,對于一個topic,同一個group中不能有多于partitions個數的consumer同時消費,否則将意味着某些consumer将無法得到消息.

    guarantees

    1) 發送到partitions中的消息将會按照它接收的順序追加到日志中

    2) 對于消費者而言,它們消費消息的順序和日志中消息順序一緻.

    3) 如果topic的"replicationfactor"為n,那麼允許n-1個kafka執行個體失效.

二、使用場景

    1、messaging   

    對于一些正常的消息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴充性和性能優勢.不過到目前為止,我們應該很清楚認識到,kafka并沒有提供jms中的"事務性""消息傳輸擔保(消息确認機制)""消息分組"等企業級特性;kafka隻能使用作為"正常"的消息系統,在一定程度上,尚未確定消息的發送與接收絕對可靠(比如,消息重發,消息發送丢失等)

    2、websit activity tracking

    kafka可以作為"網站活性跟蹤"的最佳工具;可以将網頁/使用者操作等資訊發送到kafka中.并實時監控,或者離線統計分析等

    3、log aggregation

    kafka的特性決定它非常适合作為"日志收集中心";application可以将記錄檔"批量""異步"的發送到kafka叢集中,而不是儲存在本地或者db中;kafka可以批量送出消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統化的存儲和分析系統.

三、設計原理

    kafka的設計初衷是希望作為一個統一的資訊收集平台,能夠實時的收集回報資訊,并需要能夠支撐較大的資料量,且具備良好的容錯能力.

    1、持久性

    kafka使用檔案存儲消息,這就直接決定kafka在性能上嚴重依賴檔案系統的本身特性.且無論任何os下,對檔案系統本身的優化幾乎沒有可能.檔案緩存/直接記憶體映射等是常用的手段.因為kafka是對日志檔案進行append操作,是以磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數,broker會将消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤io調用的次數.

2、性能

    需要考慮的影響性能點很多,除磁盤io之外,我們還需要考慮網絡io,這直接關系到kafka的吞吐量問題.kafka并沒有提供太多高超的技巧;對于producer端,可以将消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對于consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置檔案來指定.對于kafka broker端,似乎有個sendfile系統調用可以潛在的提升網絡io的性能:将檔案的資料映射到系統記憶體中,socket直接讀取相應的記憶體區域即可,而無需程序再次copy和交換.

其實對于producer/consumer/broker三者而言,cpu的開支應該都不大,是以啟用消息壓縮機制是一個良好的政策;壓縮需要消耗少量的cpu資源,不過對于kafka而言,網絡io更應該需要考慮.可以将任何在網絡上傳輸的消息都經過壓縮.kafka支援gzip/snappy等多種壓縮方式.

    3、生産者

    負載均衡: producer将會和topic下所有partition leader保持socket連接配接;消息由producer直接通過socket發送到broker,中間不會經過任何"路由層".事實上,消息被路由到哪個partition上,有producer用戶端決定.比如可以采用"random""key-hash""輪詢"等,如果一個topic中有多個partitions,那麼在producer端實作"消息均衡分發"是必要的.

    其中partition leader的位置(host:port)注冊在zookeeper中,producer作為zookeeper client,已經注冊了watch用來監聽partition leader的變更事件.

    異步發送:将多條消息暫且在用戶端buffer起來,并将他們批量的發送到broker,小資料io太多,會拖慢整體的網絡延遲,批量延遲發送事實上提升了網絡效率。不過這也有一定的隐患,比如說當producer失效時,那些尚未發送的消息将會丢失。

    4、消費者

    consumer端向broker發送"fetch"請求,并告知其擷取消息的offset;此後consumer将會獲得一定條數的消息;consumer端也可以重置offset來重新消費消息.

    在jms實作中,topic模型基于push方式,即broker将消息推送給consumer端.不過在kafka中,采用了pull方式,即consumer在和broker建立連接配接之後,主動去pull(或者說fetch)消息;這中模式有些優點,首先consumer端可以根據自己的消費能力适時的去fetch消息并處理,且可以控制消息消費的進度(offset);此外,消費者可以良好的控制消息消費的數量,batch fetch.

    其他jms實作,消息消費的位置是有prodiver保留,以便避免重複發送消息或者将沒有消費成功的消息重發等,同時還要控制消息的狀态.這就要求jms broker需要太多額外的工作.在kafka中,partition中的消息隻有一個consumer在消費,且不存在消息狀态的控制,也沒有複雜的消息确認機制,可見kafka broker端是相當輕量級的.當消息被consumer接收之後,consumer可以在本地儲存最後消息的offset,并間歇性的向zookeeper注冊offset.由此可見,consumer用戶端也很輕量級.

kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建

    5、消息傳送機制

    對于jms實作,消息傳輸擔保非常直接:有且隻有一次(exactly once).在kafka中稍有不同:

    1) at most once: 最多一次,這個和jms中"非持久化"消息類似.發送一次,無論成敗,将不會重發.

    2) at least once: 消息至少發送一次,如果消息未能接受成功,可能會重發,直到接收成功.

    3) exactly once: 消息隻會發送一次.

    at most once: 消費者fetch消息,然後儲存offset,然後處理消息;當client儲存offset之後,但是在消息處理過程中出現了異常,導緻部分消息未能繼續處理.那麼此後"未處理"的消息将不能被fetch到,這就是"at most once".

    at least once: 消費者fetch消息,然後處理消息,然後儲存offset.如果消息處理成功之後,但是在儲存offset階段zookeeper異常導緻儲存操作未能執行成功,這就導緻接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once",原因offset沒有及時的送出給zookeeper,zookeeper恢複正常還是之前offset狀态.

    exactly once: kafka中并沒有嚴格的去實作(基于2階段送出,事務),我們認為這種政策在kafka中是沒有必要的.

    通常情況下"at-least-once"是我們搜選.(相比at most once而言,重複接收資料總比丢失資料要好).

    6、複制備份

    kafka将每個partition資料複制到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置檔案來設定.leader處理所有的read-write請求,follower需要和leader保持同步.follower和consumer一樣,消費消息并儲存在本地日志中;leader負責跟蹤所有的follower狀态,如果follower"落後"太多或者失效,leader将會把它從replicas同步清單中删除.當所有的follower都将一條消息儲存成功,此消息才被認為是"committed",那麼此時consumer才能消費它.即使隻有一個replicas執行個體存活,仍然可以保證消息的正常發送和接收,隻要zookeeper叢集存活即可.(不同于其他分布式存儲,比如hbase需要"多數派"存活才行)

    當leader失效時,需在followers中選取出新的leader,可能此時follower落後于leader,是以需要選擇一個"up-to-date"的follower.選擇follower時需要兼顧一個問題,就是新leaderserver上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味着此server将承受着更多的io壓力.在選舉新leader,需要考慮到"負載均衡".

    7.日志

    如果一個topic的名稱為"my_topic",它有2個partitions,那麼日志将會儲存在my_topic_0和my_topic_1兩個目錄中;日志檔案中儲存了一序列"log entries"(日志條目),每個log entry格式為"4個位元組的數字n表示消息的長度" + "n個位元組的消息内容";每個日志都有一個offset來唯一的标記一條消息,offset的值為8個位元組的數字,表示此消息在此partition中所處的起始位置..每個partition在實體存儲層面,有多個log file組成(稱為segment).segmentfile的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建

    其中每個partiton中所持有的segments清單資訊會存儲在zookeeper中.

    當segment檔案尺寸達到一定閥值時(可以通過配置檔案設定,預設1g),将會建立一個新的檔案;當buffer中消息的條數達到閥值時将會觸發日志資訊flush到日志檔案中,同時如果"距離最近一次flush的時間差"達到閥值時,也會觸發flush到日志檔案.如果broker失效,極有可能會丢失那些尚未flush到檔案的消息.因為server意外實作,仍然會導緻log檔案格式的破壞(檔案尾部),那麼就要求當server啟東是需要檢測最後一個segment的檔案結構是否合法并進行必要的修複.

    擷取消息時,需要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大擷取消息的總長度(間接的表示消息的條數).根據offset,可以找到此消息所在segment檔案,然後根據segment的最小offset取內插補點,得到它在file中的相對位置,直接讀取輸出即可.

    日志檔案的删除政策非常簡單:啟動一個背景線程定期掃描log file清單,把儲存時間超過閥值的檔案直接删除(根據檔案的建立時間).為了避免删除檔案時仍然有read操作(consumer消費),采取copy-on-write方式.

    8、配置設定

    kafka使用zookeeper來存儲一些meta資訊,并使用了zookeeper watch機制來發現meta資訊的變更并作出相應的動作(比如consumer失效,觸發負載均衡等)

    1) broker node registry: 當一個kafkabroker啟動後,首先會向zookeeper注冊自己的節點資訊(臨時znode),同時當broker和zookeeper斷開連接配接時,此znode也會被删除.

    格式: /broker/ids/[0...n]   -->host:port;其中[0..n]表示broker id,每個broker的配置檔案中都需要指定一個數字類型的id(全局不可重複),znode的值為此broker的host:port資訊.

    2) broker topic registry: 當一個broker啟動時,會向zookeeper注冊自己持有的topic和partitions資訊,仍然是一個臨時znode.

    格式: /broker/topics/[topic]/[0...n]  其中[0..n]表示partition索引号.

    3) consumer and consumer group: 每個consumer用戶端被建立時,會向zookeeper注冊自己的資訊;此作用主要是為了"負載均衡".

    一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能考慮,讓partition相對均衡的分散到每個consumer上.

    4) consumer id registry: 每個consumer都有一個唯一的id(host:uuid,可以通過配置檔案指定,也可以由系統生成),此id用來标記消費者資訊.

    格式:/consumers/[group_id]/ids/[consumer_id]

    仍然是一個臨時的znode,此節點的值為{"topic_name":#streams...},即表示此consumer目前所消費的topic + partitions清單.

    5) consumer offset tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.

    格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value

    此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.

    6) partition owner registry: 用來标記partition被哪個consumer消費.臨時znode

    格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id當consumer啟動時,所觸發的操作:

    a) 首先進行"consumer id registry";

    b) 然後在"consumer id registry"節點下注冊一個watch用來監聽目前group中其他consumer的"leave"和"join";隻要此znode path下節點清單變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那麼其他consumer接管partitions).

    c) 在"broker id registry"節點下,注冊一個watch用來監聽broker的存活情況;如果broker清單變更,将會觸發所有的groups下的consumer重新balance.

kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建

    1) producer端使用zookeeper用來"發現"broker清單,以及和topic下每個partition leader建立socket連接配接并發送消息.

    2) broker端使用zookeeper用來注冊broker資訊,已經監測partitionleader存活性.

    3) consumer端使用zookeeper用來注冊consumer資訊,其中包括consumer消費的partition清單等,同時也用來發現broker清單,并和partition leader建立socket連接配接,并擷取消息.

四、主要配置

    1、broker配置

kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建

    2.consumer主要配置

kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建

3.producer主要配置

kafka入門:簡介、使用場景、設計原理、主要配置及叢集搭建

以上是關于kafka一些基礎說明,在其中我們知道如果要kafka正常運作,必須配置zookeeper,否則無論是kafka叢集還是用戶端的生存者和消費者都無法正常的工作的,以下是對zookeeper進行一些簡單的介紹:

五、zookeeper叢集

    zookeeper是一個為分布式應用提供一緻性服務的軟體,它是開源的hadoop項目的一個子項目,并根據google發表的一篇論文來實作的。zookeeper為分布式系統提供了高笑且易于使用的協同服務,它可以為分布式應用提供相當多的服務,諸如統一命名服務,配置管理,狀态同步群組服務等。zookeeper接口簡單,我們不必過多地糾結在分布式系統程式設計難于處理的同步和一緻性問題上,你可以使用zookeeper提供的現成(off-the-shelf)服務來實作來實作分布式系統額配置管理,組管理,leader選舉等功能。

    zookeeper叢集的安裝,準備三台伺服器server1:192.168.0.1,server2:192.168.0.2,

    server3:192.168.0.3.

    1)下載下傳zookeeper

    2)安裝zookeeper

    先在伺服器server分别執行a-c步驟

    a)解壓  

    tar -zxvf zookeeper-3.4.5.tar.gz

    解壓完成後在目錄~下會發現多出一個目錄zookeeper-3.4.5,重新指令為zookeeper

    b)配置

    将conf/zoo_sample.cfg拷貝一份命名為zoo.cfg,也放在conf目錄下。然後按照如下值修改其中的配置:

    # the number of milliseconds of each tick

    ticktime=2000

    # the number of ticks that the initial

    # synchronization phase can take

    initlimit=10

    # the number of ticks that can pass between

    # sending a request and getting an acknowledgement

    synclimit=5

    # the directory where the snapshot is stored.

    # do not use /tmp for storage, /tmp here is just

    # example sakes.

    datadir=/home/wwb/zookeeper /data

    datalogdir=/home/wwb/zookeeper/logs

    # the port at which the clients will connect

    clientport=2181

    #

    # be sure to read the maintenance section of the

    # administrator guide before turning on autopurge.

    # the number of snapshots to retain in datadir

    #autopurge.snapretaincount=3

    # purge task interval in hours

    # set to "0" to disable auto purge feature

    #autopurge.purgeinterval=1

    server.1=192.168.0.1:3888:4888

    server.2=192.168.0.2:3888:4888

    server.3=192.168.0.3:3888:4888

    ticktime:這個時間是作為 zookeeper 伺服器之間或用戶端與伺服器之間維持心跳的時間間隔,也就是每個 ticktime 時間就會發送一個心跳。

    datadir:顧名思義就是 zookeeper 儲存資料的目錄,預設情況下,zookeeper 将寫資料的日志檔案也儲存在這個目錄裡。

    clientport:這個端口就是用戶端連接配接 zookeeper 伺服器的端口,zookeeper 會監聽這個端口,接受用戶端的通路請求。

    initlimit:這個配置項是用來配置 zookeeper 接受用戶端(這裡所說的用戶端不是使用者連接配接 zookeeper 伺服器的用戶端,而是 zookeeper 伺服器叢集中連接配接到 leader 的 follower 伺服器)初始化連接配接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 ticktime)長度後 zookeeper 伺服器還沒有收到用戶端的傳回資訊,那麼表明這個用戶端連接配接失敗。總的時間長度就是 5*2000=10 秒

    synclimit:這個配置項辨別 leader 與follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 ticktime 的時間長度,總的時間長度就是2*2000=4 秒

    server.a=b:c:d:其中 a 是一個數字,表示這個是第幾号伺服器;b 是這個伺服器的 ip 位址;c 表示的是這個伺服器與叢集中的 leader 伺服器交換資訊的端口;d 表示的是萬一叢集中的 leader 伺服器挂了,需要一個端口來重新進行選舉,選出一個新的 leader,而這個端口就是用來執行選舉時伺服器互相通信的端口。如果是僞叢集的配置方式,由于 b 都是一樣,是以不同的 zookeeper 執行個體通信端口号不能一樣,是以要給它們配置設定不同的端口号

注意:datadir,datalogdir中的wwb是目前登入使用者名,data,logs目錄開始是不存在,需要使用mkdir指令建立相應的目錄。并且在該目錄下建立檔案myid,serve1,server2,server3該檔案内容分别為1,2,3。

針對伺服器server2,server3可以将server1複制到相應的目錄,不過需要注意datadir,datalogdir目錄,并且檔案myid内容分别為2,3

    3)依次啟動server1,server2,server3的zookeeper.

    /home/wwb/zookeeper/bin/zkserver.sh start,出現類似以下内容

    jmx enabled by default

    using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg

    starting zookeeper ... started

   4) 測試zookeeper是否正常工作,在server1上執行以下指令

    /home/wwb/zookeeper/bin/zkcli.sh -server192.168.0.2:2181,出現類似以下内容

    jline support is enabled

    2013-11-27 19:59:40,560 - info      [main-sendthread(localhost.localdomain:2181):clientcnxn$sendthread@736]- session   establishmentcomplete on server localhost.localdomain/127.0.0.1:2181, sessionid =    0x1429cdb49220000, negotiatedtimeout = 30000

    watcher::

    watchedevent state:syncconnected type:none path:null

    [zk: 127.0.0.1:2181(connected) 0] [root@localhostzookeeper2]#  

    即代表叢集建構成功了,如果出現錯誤那應該是第三部時沒有啟動好叢集,

運作,先利用

    ps aux | grep zookeeper檢視是否有相應的程序的,沒有話,說明叢集啟動出現問題,可以在每個伺服器上使用

    ./home/wwb/zookeeper/bin/zkserver.sh stop。再依次使用./home/wwb/zookeeper/binzkserver.sh start,這時在執行4一般是沒有問題,如果還是有問題,那麼先stop再到bin的上級目錄執行./bin/zkserver.shstart試試。

注意:zookeeper叢集時,zookeeper要求半數以上的機器可用,zookeeper才能提供服務。

六、kafka叢集

(利用上面server1,server2,server3,下面以server1為執行個體)

    2)解壓 tar -zxvf kafka-0.8.0-beta1-src.tgz,産生檔案夾kafka-0.8.0-beta1-src更改為kafka01   

3)配置

    修改kafka01/config/server.properties,其中broker.id,log.dirs,zookeeper.connect必須根據實際情況進行修改,其他項根據需要自行斟酌。大緻如下:

     broker.id=1  

     port=9091  

     num.network.threads=2  

     num.io.threads=2  

     socket.send.buffer.bytes=1048576  

    socket.receive.buffer.bytes=1048576  

     socket.request.max.bytes=104857600  

    log.dir=./logs  

    num.partitions=2  

    log.flush.interval.messages=10000  

    log.flush.interval.ms=1000  

    log.retention.hours=168  

    #log.retention.bytes=1073741824  

    log.segment.bytes=536870912  

    num.replica.fetchers=2  

    log.cleanup.interval.mins=10  

    zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183  

    zookeeper.connection.timeout.ms=1000000  

    kafka.metrics.polling.interval.secs=5  

    kafka.metrics.reporters=kafka.metrics.kafkacsvmetricsreporter  

    kafka.csv.metrics.dir=/tmp/kafka_metrics  

    kafka.csv.metrics.reporter.enabled=false

4)初始化因為kafka用scala語言編寫,是以運作kafka需要首先準備scala相關環境。

    > cd kafka01  

    > ./sbt update  

    > ./sbt package  

    > ./sbt assembly-package-dependency

在第二個指令時可能需要一定時間,由于要下載下傳更新一些依賴包。是以請大家 耐心點。

5) 啟動kafka01

    >jmx_port=9997 bin/kafka-server-start.sh config/server.properties &  

a)kafka02操作步驟與kafka01雷同,不同的地方如下

    修改kafka02/config/server.properties

    broker.id=2

    port=9092

    ##其他配置和kafka-0保持一緻

    啟動kafka02

    jmx_port=9998 bin/kafka-server-start.shconfig/server.properties &  

b)kafka03操作步驟與kafka01雷同,不同的地方如下

    修改kafka03/config/server.properties

    broker.id=3

    port=9093

    jmx_port=9999 bin/kafka-server-start.shconfig/server.properties &

6)建立topic(包含一個分區,三個副本)

    >bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic

7)檢視topic情況

    >bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181

    topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0

8)建立發送者

   >bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic

    my test message1

    my test message2

    ^c

9)建立消費者

    >bin/kafka-console-consumer.sh --zookeeper127.0.0.1:2181 --from-beginning --topic my-replicated-topic

    ...

^c

10)殺掉server1上的broker

  >pkill -9 -f config/server.properties

11)檢視topic

  >bin/kafka-list-top.sh --zookeeper192.168.0.1:2181

  topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0

發現topic還正常的存在

11)建立消費者,看是否能查詢到消息

    >bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic

    my test message 1

    my test message 2

說明一切都是正常的。

ok,以上就是對kafka個人的了解,不對之處請大家及時指出。

補充說明:

1、public map<string, list<kafkastream<byte[], byte[]>>> createmessagestreams(map<string, integer> topiccountmap),其中該方法的參數map的key為topic名稱,value為topic對應的分區數,譬如說如果在kafka中不存在相應的topic時,則會建立一個topic,分區數為value,如果存在的話,該處的value則不起什麼作用

2、關于生産者向指定的分區發送資料,通過設定partitioner.class的屬性來指定向那個分區發送資料,如果自己指定必須編寫相應的程式,預設是kafka.producer.defaultpartitioner,分區程式是基于散列的鍵。

3、在多個消費者讀取同一個topic的資料,為了保證每個消費者讀取資料的唯一性,必須将這些消費者group_id定義為同一個值,這樣就建構了一個類似隊列的資料結構,如果定義不同,則類似一種廣播結構的。

4、在consumerapi中,參數設計到數字部分,類似map<string,integer>,

numstream,指的都是在topic不存在的時,會建立一個topic,并且分區個數為integer,numstream,注意如果數字大于broker的配置中num.partitions屬性,會以num.partitions為依據建立分區個數的。

5、producerapi,調用send時,如果不存在topic,也會建立topic,在該方法中沒有提供分區個數的參數,在這裡分區個數是由服務端broker的配置中num.partitions屬性決定的