天天看點

kafka producer執行個體及原理分析

1.前言

首先,描述下應用場景:

假設,公司有一款遊戲,需要做行為統計分析,資料的源頭來自日志,由于使用者行為非常多,導緻日志量非常大。将日志資料插入資料庫然後再進行分析,已經滿足不了。最好的辦法是存日志,然後通過對日志的分析,計算出有用的資料。我們采用kafka這種分布式日志系統來實作這一過程。

步驟如下:

搭建KAFKA系統運作環境

如果你還沒有搭建起來,可以參考我的部落格:

<a href="http://zhangfengzhe.blog.51cto.com/8855103/1556650" target="_blank">http://zhangfengzhe.blog.51cto.com/8855103/1556650</a>

設計資料存儲格式

Producer端擷取資料,并對資料按上述設計的格式進行編碼

Producer将已經編碼的資料發送到broker上,在broker上進行存儲

Consumer端從broker中擷取資料,分析計算。

2.實作過程

為了快速實作,我們簡化日志消息格式。

在eclipse建立JAVA PROJECT,将kafka/libs下*.jar配置到項目build path即可。

Step 1 : 簡單的POJO對象(MobileGameLog)

1

2

3

4

<code>private</code> <code>String actionType;</code>

<code>private</code> <code>String appKey;</code>

<code>private</code> <code>String guid;</code>

<code>private</code> <code>String time;</code>

說明:

actionType 代表行為類型

appKey     代表遊戲ID

guid       代表角色

time       代表時間

提供getter/setter方法,并override toString()

Step 2 : 提供serializer

需要注意的是,POJO對象需要序列化轉化成KAFKA識别的消息存儲格式--byte[]

5

6

7

8

<code>public</code> <code>class</code> <code>MobileGameKafkaMessage </code><code>implements</code> <code>kafka.serializer.Encoder&lt;MobileGameLog&gt;{</code>

<code>@Override</code>

<code>public</code> <code>byte</code><code>[] toBytes(MobileGameLog mobileGameLog) {</code>

<code>return</code> <code>mobileGameLog.toString().getBytes();</code>

<code>}</code>

<code>public</code> <code>MobileGameKafkaMessage(VerifiableProperties props){</code>

Step 3 : 提供Partitioner

我們可以提供Partitioner,這樣可以使得資料按照我們的政策來存儲在brokers中。

<a href="http://s3.51cto.com/wyfs02/M00/4B/CC/wKioL1QzrbnROV9UAAKDYIN8EWw025.jpg" target="_blank"></a>

這裡,我根據appKey來進行分區。

Step 4 : 提供Producer

提供配置

<a href="http://s3.51cto.com/wyfs02/M02/4B/CC/wKioL1QzrsiiG8CyAAGOsHs0upE284.jpg" target="_blank"></a>

運作kafka環境

啟動zookeeper:

<code>[root@localhost kafka_2.9.2-0.8.1.1]</code><code># bin/zookeeper-server-start.sh  </code>

<code>config</code><code>/zookeeper</code><code>.properties &amp;</code>

啟動kafka broker(id=0):

<code>[root@localhost kafka_2.9.2-0.8.1.1]</code><code># bin/kafka-server-start.sh </code>

<code>config</code><code>/server</code><code>.properties &amp;</code>

啟動kafka broker(id=1)

<code>[root@localhost kafka_2.9.2-0.8.1.1]</code><code># bin/kafka-server-start.sh  </code>

<code>config</code><code>/server-1</code><code>.properties &amp;</code>

上述過程,在我的部落格【搭建kafka運作環境】裡面都有詳細記錄,大家可以參考。

建立一個topic:

<code>[root@localhost kafka_2.9.2-0.8.1.1]</code><code># bin/kafka-topics.sh --zookeeper localhost:2181 </code>

<code>--create --topic log_1 --replication-factor 2 --partitions 3</code>

注意topic:log_1有3個分區,2個複制。

制造資料并發送

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

<code>// Producer&lt;key , value&gt;</code>

<code>// V: type of the message</code>

<code>// K: type of the optional key associated with the message</code>

<code>kafka.javaapi.producer.Producer&lt;MobileGameLog, MobileGameLog&gt; producer </code>

<code>= </code><code>new</code> <code>Producer&lt;MobileGameLog, MobileGameLog&gt;(</code>

<code>config);</code>

<code>List&lt;KeyedMessage&lt;MobileGameLog, MobileGameLog&gt;&gt; list </code>

<code>= </code><code>new</code> <code>ArrayList&lt;KeyedMessage&lt;MobileGameLog, MobileGameLog&gt;&gt;();</code>

<code>// 5條tlbb資料</code>

<code>for</code> <code>(</code><code>int</code> <code>i = </code><code>1</code><code>; i &lt;= </code><code>5</code><code>; i++) {</code>

<code>MobileGameLog log = </code><code>new</code> <code>MobileGameLog();</code>

<code>log.setActionType(</code><code>"YuanBaoShop"</code><code>);</code>

<code>log.setAppKey(</code><code>"tlbb"</code><code>);</code>

<code>log.setGuid(</code><code>"xxx_"</code> <code>+ i);</code>

<code>log.setTime(</code><code>"2014-10-01 10:00:20"</code><code>);</code>

<code>KeyedMessage&lt;MobileGameLog, MobileGameLog&gt; keyedMessage </code>

<code>= </code><code>new</code> <code>KeyedMessage&lt;MobileGameLog, MobileGameLog&gt;(</code>

<code>"log_1"</code><code>, log, log);</code>

<code>list.add(keyedMessage);</code>

<code>// 8條ldj資料</code>

<code>for</code> <code>(</code><code>int</code> <code>i = </code><code>1</code><code>; i &lt;= </code><code>8</code><code>; i++) {</code>

<code>log.setActionType(</code><code>"BlackMarket"</code><code>);</code>

<code>log.setAppKey(</code><code>"ldj"</code><code>);</code>

<code>log.setGuid(</code><code>"yyy_"</code> <code>+ i);</code>

<code>log.setTime(</code><code>"2014-10-02 10:00:20"</code><code>);</code>

<code>producer.send(list);</code>

<code>producer.close();</code>

a.producer既可以send 一個keyedMessage,可以是一個keyedMessage list.

b.注意producer執行個體化時的泛型。value是消息對象,即POJO,key是這個pojo的标示,這個是要用來進行分區的。

c.producer向broker發送的是KeyedMessage,注意執行個體化時的泛型,KEY/VALUE的意義同b.

d.KeyedMessage需要指明topic name.

eclipse 運作結果如下:

-------start info

運作至MobileGameKafkaPartition

VerifiableProperties : {metadata.broker.list=192.168.152.2:9092,192.168.152.2:9093, 

zk.connectiontimeout.ms=6000, request.required.acks=1, 

partitioner.class=com.sohu.game.kafka.day2.MobileGameKafkaPartition, 

serializer.class=com.sohu.game.kafka.day2.MobileGameKafkaMessage}

-------end info

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

運作至MobileGameKafkaPartition的partition方法,分區大小為:3

分區key : tlbb

存儲的分區為:0

分區key : ldj

存儲的分區為:2

kafka consumer console 結果如下:

<a href="http://s3.51cto.com/wyfs02/M00/4B/CD/wKioL1Qzs_by4VoYAAP86rC1nao716.jpg" target="_blank"></a>

3.原理分析

檢視topic:log_1詳細資訊:

<code>--describe --topic log_1</code>

<code>Topic: log_1 PartitionCount:3 ReplicationFactor:2 Configs:</code>

<code>Topic: log_1 Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1</code>

<code>Topic: log_1 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1</code>

<code>Topic: log_1 Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0,1</code>

log_1有2個broker進行儲存,每一個broker上有3個分區,并且每一個分區的leader都是broker(id=0)

檢視broker(id=0)上的資訊:

<code>[root@localhost tmp]</code><code># ll</code>

<code>total 52</code>

<code>drwxr-xr-x  2 root root 4096 Oct  7 01:23 hsperfdata_root</code>

<code>drwxr-xr-x 10 root root 4096 Oct  7 02:40 kafka-logs</code>

<code>drwxr-xr-x  8 root root 4096 Oct  7 02:40 kafka-logs-1</code>

<code>srwxr-xr-x  1 root root    0 Sep 20 18:15 mapping-root</code>

<code>drwxrwxrwt  2 root root 4096 Oct  6 00:34 VMwareDnD</code>

<code>drwx------  2 root root 4096 Oct  6 18:05 vmware-root</code>

<code>drwxr-xr-x  3 root root 4096 Sep 20 19:58 zookeeper</code>

<code>[root@localhost tmp]</code><code># </code>

<code>[root@localhost tmp]</code><code># cd kafka-logs</code>

<code>[root@localhost kafka-logs]</code><code># pwd</code>

<code>/tmp/kafka-logs</code>

<code>[root@localhost kafka-logs]</code><code># ll</code>

<code>total 80</code>

<code>drwxr-xr-x 2 root root 4096 Oct  7 01:02 log_1-0</code>

<code>drwxr-xr-x 2 root root 4096 Oct  7 01:02 log_1-1</code>

<code>drwxr-xr-x 2 root root 4096 Oct  7 01:02 log_1-2</code>

<code>drwxr-xr-x 2 root root 4096 Oct  6 01:01 my_first_topic-0</code>

<code>-rw-r--r-- 1 root root  100 Oct  7 02:40 recovery-point-offset-checkpoint</code>

<code>-rw-r--r-- 1 root root  100 Oct  7 02:40 replication-offset-checkpoint</code>

<code>drwxr-xr-x 2 root root 4096 Oct  6 01:01 </code><code>test</code><code>-0</code>

<code>drwxr-xr-x 2 root root 4096 Oct  6 01:01 topic_1-0</code>

<code>drwxr-xr-x 2 root root 4096 Sep 21 00:21 topic_2-0</code>

<code>drwxr-xr-x 2 root root 4096 Sep 21 00:22 topic_3-0</code>

<code>[root@localhost kafka-logs]</code><code># cd log_1-0/</code>

<code>[root@localhost log_1-0]</code><code># ll</code>

<code>total 12</code>

<code>-rw-r--r-- 1 root root 10485760 Oct  7 01:16 00000000000000000000.index</code>

<code>-rw-r--r-- 1 root root     1020 Oct  7 01:18 00000000000000000000.log</code>

<code>[root@localhost log_1-0]</code><code># cat -A 00000000000000000000.log </code>

<code>^@^@^@^@^@^@^@^@^@^@^@M-@M-r^L2M-V^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_1, </code><code>time</code><code>=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_1, </code><code>time</code><code>=2014-10-01 10:00:20]^@^@^@^@^@^@^@^A^@^@^@M-@^^M-46M-h^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_2, </code><code>time</code><code>=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_2, </code>

<code>time</code><code>=2014-10-01 10:00:20]^@^@^@^@^@^@^@^B^@^@^@M-@M-sM-s7=^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_3, </code><code>time</code><code>=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_3, </code>

<code>time</code><code>=2014-10-01 10:00:20]^@^@^@^@^@^@^@^C^@^@^@M-@^\M-58M-U^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_4, </code><code>time</code><code>=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_4, </code>

<code>time</code><code>=2014-10-01 10:00:20]^@^@^@^@^@^@^@^D^@^@^@M-@M-qM-r9^@^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_5, </code><code>time</code><code>=2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_5, </code>

<code>time</code><code>=2014-10-01 10:00:20][root@localhost log_1-0]</code><code>#</code>

注意kafka broker(id=0)的日志資訊顯示:

有log_1-0,log_1-1,log_1-2三個目錄,對應于0,1,2三個分區。

說明,topic在broker上是以partition為機關進行儲存的。

上面的0分區的日志資訊顯示,tlbb的5條資料都被儲存了2遍,并且可以發現在分區内,都是有序的。

我們在建立log_1時指定複制2份,是以資料在分區内被儲存了2遍。

同理,我們繼續分析broker(id=0)上的1,2分區的内容,有:

分區1無資料,分區2上8條ldj的資料被儲存了2遍。

由于我們隻制造了2種appkey的資料,根據分區函數,隻會傳回2個partition number,是以導緻有一個分區沒有資料。

同上的,繼續分析broker(id=1)上的0,1,2分區的内容,有:

分區0,tlbb的5條資料被儲存2遍

分區1,沒有資料

分區2,ldj的8條資料被儲存2遍

可見,broker(id=0),broker(id=1)他們的分區資料完全一緻,這也就是為什麼kafka的高可用性,某些broker挂了,其他的broker還可以繼續提供服務和資料。

本文轉自zfz_linux_boy 51CTO部落格,原文連結:http://blog.51cto.com/zhangfengzhe/1561021,如需轉載請自行聯系原作者