天天看點

Kafka安裝使用-超詳細

一:介紹

1.官網

  kafka.apache.org

2.産生

  Kafka由 linked-in 開源 

  kafka-即是解決上述這類問題的一個架構,它實作了生産者和消費者之間的無縫連接配接。 

  kafka-高産出的分布式消息系統(A high-throughput distributed messaging system)

3.狀況

  Apache kafka 是一個分布式的基于push-subscribe的消息系統,它具備快速、可擴充、可持久化的特點。

  它現在是Apache旗下的一個開源系統,作為Hadoop生态系統的一部分,被各種商業公司廣泛應用。

  它的最大的特性就是可以實時的處理大量資料以滿足各種需求場景:比如基于hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎。

4.特性

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低隻有幾毫秒
  • 可擴充性:kafka叢集支援熱擴充
  • 持久性、可靠性:消息被持久化到本地磁盤,并且支援資料備份防止資料丢失
  • 容錯性:允許叢集中節點失敗(若副本數量為n,則允許n-1個節點失敗)
  • 高并發:支援數千個用戶端同時讀寫

5.設計思想

  • Consumergroup:各個consumer可以組成一個組,每個消息隻能被組中的一個consumer消費,如果一個消息可以被多個consumer消費的話,那麼這些consumer必須在不同的組。
  • 消息狀态:在Kafka中,消息的狀态被儲存在consumer中,broker不會關心哪個消息被消費了被誰消費了,隻記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味着如果consumer處理不好的話,broker上的一個消息可能會被消費多次。
  • 消息持久化:Kafka中會把消息持久化到本地檔案系統中,并且保持極高的效率。
  • 消息有效期:Kafka會長久保留其中的消息,以便consumer可以多次消費,當然其中很多細節是可配置的。
  • 批量發送:Kafka支援以消息集合為機關進行批量發送,以提高push效率。
  • push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer隻管向broker push消息,consumer隻管從broker pull消息,兩者對消息的生産和消費是異步的。
  • Kafka叢集中broker之間的關系:不是主從關系,各個broker在叢集中地位一樣,我們可以随意的增加或删除任何一個broker節點。
  • 負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對于0.7.x主要靠zookeeper來實作負載均衡)。
  • 同步異步:Producer采用異步push方式,極大提高Kafka系統的吞吐率(可以通過參數控制是采用同步還是異步方式)。
  • 分區機制partition:Kafka的broker端支援消息分區,Producer可以決定把消息發到哪個分區,在一個分區中消息的順序就是Producer發送消息的順序,一個主題中可以有多個分區,具體分區的數量是可配置的。分區的意義很重大,後面的内容會逐漸展現。
  • 離線資料裝載:Kafka由于對可拓展的資料持久化的支援,它也非常适合向Hadoop或者資料倉庫中進行資料裝載。
  • 插件支援:現在不少活躍的社群已經開發出不少插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關的插件。

6.應用場景

  • 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
  • 消息系統:解耦和生産者和消費者、緩存消息等。
  • 使用者活動跟蹤:Kafka經常被用來記錄web使用者或者app使用者的各種活動,如浏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、資料倉庫中做離線分析和挖掘。
  • 營運名額:Kafka也經常用來記錄營運監控資料。包括收集各種分布式應用的資料,生産各種操作的集中回報,比如報警和報告。
  • 流式處理:比如spark streaming和storm
  • 事件源

7.元件

  Kafka中釋出訂閱的對象是topic。

  我們可以為每類資料建立一個topic,把向topic釋出消息的用戶端稱作producer,從topic訂閱消息的用戶端稱作consumer。

  Producers和consumers可以同時從多個topic讀寫資料。一個kafka叢集由一個或多個broker伺服器組成,它負責持久化和備份具體的kafka消息。 

  • topic:消息存放的目錄即主題
  • Producer:生産消息到topic的一方
  • Consumer:訂閱topic消費消息的一方
  • Broker:Kafka的服務執行個體就是一個broker

8.Kafka Topic&Partition

  消息發送時都被發送到一個topic,其本質就是一個目錄,而topic由是由一些Partition Logs(分區日志)組成,其組織結構如下圖所示:

  

Kafka安裝使用-超詳細

  我們可以看到,每個Partition中的消息都是有序的,生産的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。 

  Kafka叢集會儲存所有的消息,不管消息有沒有被消費;我們可以設定消息的過期時間,隻有過期的資料才會被自動清除以釋放磁盤空間。比如我們設定消息過期時間為2天,那麼這2天内的所有消息都會被儲存到叢集中,資料隻有超過了兩天才會被清除。 

  Kafka需要維持的中繼資料隻有一個–消費消息在Partition中的offset值,Consumer每消費一個消息,offset就會加1。其實消息的狀态完全是由Consumer控制的,Consumer可以跟蹤和重設這個offset值,這樣的話Consumer就可以讀取任意位置的消息。 

  把消息日志以Partition的形式存放有多重考慮,第一,友善在叢集中擴充,每個Partition可以通過調整以适應它所在的機器,而一個topic又可以有多個Partition組成,是以整個叢集就可以适應任意大小的資料了;第二就是可以提高并發,因為可以以Partition為機關讀寫了。

二:核心元件

1.Replications、Partitions 和Leaders

  通過上面介紹的我們可以知道,kafka中的資料是持久化的并且能夠容錯的。Kafka允許使用者為每個topic設定副本數量,副本數量決定了有幾個broker來存放寫入的資料。如果你的副本數量設定為3,那麼一份資料就會被存放在3台不同的機器上,那麼就允許有2個機器失敗。一般推薦副本數量至少為2,這樣就可以保證增減、重新開機機器時不會影響到資料消費。如果對資料持久化有更高的要求,可以把副本數量設定為3或者更多。 

Kafka中的topic是以partition的形式存放的,每一個topic都可以設定它的partition數量,Partition的數量決定了組成topic的log的數量。Producer在生産資料時,會按照一定規則(這個規則是可以自定義的)把消息釋出到topic的各個partition中。上面将的副本都是以partition為機關的,不過隻有一個partition的副本會被選舉成leader作為讀寫用。 

關于如何設定partition值需要考慮的因素。一個partition隻能被一個消費者消費(一個消費者可以同時消費多個partition),是以,如果設定的partition的數量小于consumer的數量,就會有消費者消費不到資料。是以,推薦partition的數量一定要大于同時運作的consumer的數量。另外一方面,建議partition的數量大于叢集broker的數量,這樣leader partition就可以均勻的分布在各個broker中,最終使得叢集負載均衡。在Cloudera,每個topic都有上百個partition。需要注意的是,kafka需要為每個partition配置設定一些記憶體來緩存消息資料,如果partition數量越大,就要為kafka配置設定更大的heap space。

2. Producers

  Producers直接發送消息到broker上的leader partition,不需要經過任何中介一系列的路由轉發。為了實作這個特性,kafka叢集中的每個broker都可以響應producer的請求,并傳回topic的一些元資訊,這些元資訊包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是可以直接被通路的。 

  Producer用戶端自己控制着消息被推送到哪些partition。實作的方式可以是随機配置設定、實作一類随機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供使用者實作自定義的分區,使用者可以為每個消息指定一個partitionKey,通過這個key來實作一些hash分區算法。比如,把userid作為partitionkey的話,相同userid的消息将會被推送到同一個分區。 

  以Batch的方式推送資料可以極大的提高處理效率,kafka Producer 可以将消息在記憶體中累計到一定數量後作為一個batch發送請求。Batch的數量大小可以通過Producer的參數控制,參數值可以設定為累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的資料大小(64KB)。通過增加batch的大小,可以減少網絡請求和磁盤IO的次數,當然具體參數設定需要在效率和時效性方面做一個權衡。 

  Producers可以異步的并行的向kafka發送消息,但是通常producer在發送完消息之後會得到一個future響應,傳回的是offset值或者發送過程中遇到的錯誤。這其中有個非常重要的參數“acks”,這個參數決定了producer要求leader partition 收到确認的副本個數,如果acks設定數量為0,表示producer不會等待broker的響應,是以,producer無法知道消息是否發送成功,這樣有可能會導緻資料丢失,但同時,acks值為0會得到最大的系統吞吐量。 

若acks設定為1,表示producer會在leader partition收到消息時得到broker的一個确認,這樣會有更好的可靠性,因為用戶端會等待直到broker确認收到消息。若設定為-1,producer會在所有備份的partition收到消息時得到broker的确認,這個設定可以得到最高的可靠性保證。 

  Kafka 消息有一個定長的header和變長的位元組數組組成。因為kafka消息支援位元組數組,也就使得kafka可以支援任何使用者自定義的序列号格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但我們推薦消息大小不要超過1MB,通常一般消息大小都在1~10kB之前。

3.Consumers

  Kafka提供了兩套consumer api,分為high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的連接配接,并且這個API是完全無狀态的,每次請求都需要指定offset值,是以,這套API也是最靈活的。 

在kafka中,目前讀到消息的offset值是由consumer來維護的,是以,consumer可以自己決定如何讀取kafka中的資料。比如,consumer可以通過重設offset值來重新消費已消費過的資料。不管有沒有被消費,kafka會儲存資料一段時間,這個時間周期是可配置的,隻有到了過期時間,kafka才會删除這些資料。 

  High-level API封裝了對叢集中一系列broker的通路,可以透明的消費一個topic。它自己維持了已消費消息的狀态,即每次消費的都是下一個消息。 

High-level API還支援以組的形式消費topic,如果consumers有同一個組名,那麼kafka就相當于一個隊列消息服務,而各個consumer均衡的消費相應partition中的資料。若consumers有不同的組名,那麼此時kafka就相當與一個廣播服務,會把topic中的所有消息廣播到每個consumer。 

二:安裝

1.上傳

  這個可以在官網上進行下載下傳。

  

Kafka安裝使用-超詳細

  上傳。

  

Kafka安裝使用-超詳細

2.解壓到modules

  tar -zxvf kafka_2.10-0.8.1.1.tgz -C /opt/modules/

3.修改配置檔案server.properties

  這個配置檔案用于伺服器節點的配置檔案。

  )修改kafa收集到的日志資料存儲檔案夾

    位址可以先不用建立,在首次啟動kafka的時候,會自動進行建立。

  

Kafka安裝使用-超詳細

  )修改zookeeper

  

Kafka安裝使用-超詳細

三:啟動

1.啟動zookeeper

  這個是在啟動broker之前需要保證zookeeper叢集是運作着的。

  

Kafka安裝使用-超詳細

2.啟動broker

  先啟動一下,開有沒有報錯。

  

Kafka安裝使用-超詳細

  如果沒有出現日志錯誤,就使用下面的指令:

  nohup bin/kafka-server-start.sh config/server.properties > logs/server-start.log 2>&1 &

  其中,server-start.log是自己寫的一個log檔案,在原有的檔案logs下面是沒有的。

3.檢驗

  一個jps,一個ps檢視程序。

  

Kafka安裝使用-超詳細

   檢視端口9092是否開放

  

Kafka安裝使用-超詳細

4.建立topic(使用help幫助)

  

Kafka安裝使用-超詳細

 5.建立一個nginxlog的topic

  bin/kafka-topics.sh --create --topic nginxlog --partitions 1 --replication-factor 1 --zookeeper linux-hadoop3.ibeifeng.com:2181

  

Kafka安裝使用-超詳細

  說明:

  建立的消息主題是nginxlog。

  指定topic的分區數:partitons

  指定topic的備份數:replication-factor

  

6.檢視詳情

   bin/kafka-topics.sh --describe --topic nginxlog --zookeeper linux-hadoop3.ibeifeng.com:2181 

  

Kafka安裝使用-超詳細

三:檢測topic

  一個産生,一個在在消費的測試。

1.啟動消息生産者,将消息發送到kafka的topic上

  bin/kafka-console-producer.sh --broker-list linux-hadoop3.ibeifeng.com:9092 --topic nginxlog

  

Kafka安裝使用-超詳細

  說明:

    --broker-list是指定broker的節點位址。

2.啟動消息消費者

  bin/kafka-console-consumer.sh --zookeeper linux-hadoop3.ibeifeng.com:2181 --topic nginxlog --from-beginning 

新版啟動指令:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

  

Kafka安裝使用-超詳細

  --from-beginning是從開頭讀取的意思。

四:模拟産生nginx日志

1.在伺服器上建立一個根目錄

  

Kafka安裝使用-超詳細

2.上傳jar包

  

Kafka安裝使用-超詳細

3.執行指令

  java -jar data-generate-1.0-SNAPSHOT-jar-with-dependencies.jar 1000 >>nginx.log

   

Kafka安裝使用-超詳細

4.檢視

  tail -f nginx.log

  

Kafka安裝使用-超詳細

 五:使用flume将模拟産生的nginx日志上傳到hdfs和kafka上

1.目前狀态

  

Kafka安裝使用-超詳細

2.project_agent.conf

  

Kafka安裝使用-超詳細

3.具體的代碼 

  

Kafka安裝使用-超詳細

 View Code

六:具體情況

  思路:由jar産生資料到nginx.log,然後使用flume讀取,傳送到kafka,這樣就有了生産的資料。

     然後,消費者就可以消費jar産生的資料了。

1.執行flume

  bin/flume-ng agent -n a1 -c conf/ --conf-file conf/project_agent.conf -Dflume-root-logger=INFO,console

  

Kafka安裝使用-超詳細

2.啟動生産者,就是模拟問價jar

  讓其不斷的産生日志到nginx.log中,代替生産者。

  

Kafka安裝使用-超詳細

3.觀看啟動了消費者的視窗

  就會發現不斷的産生日志被消費

  

Kafka安裝使用-超詳細

 七:注意

1.關閉kafka

  

Kafka安裝使用-超詳細

參考:

https://www.cnblogs.com/juncaoit/p/6358007.html