天天看點

RocketMQ學習(九):順序消息

rocketmq的順序消息需要滿足2點:

1.producer端保證發送消息有序,且發送到同一個隊列。

2.consumer端保證消費同一個隊列。

先看個例子,代碼版本跟前面的一樣。

producer類:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

<code>import</code> <code>java.io.ioexception;</code>

<code>import</code> <code>java.text.simpledateformat;</code>

<code>import</code> <code>java.util.date;</code>

<code>import</code> <code>java.util.list;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.exception.mqbrokerexception;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.exception.mqclientexception;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.producer.defaultmqproducer;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.producer.messagequeueselector;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.producer.sendresult;</code>

<code>import</code> <code>com.alibaba.rocketmq.common.message.message;</code>

<code>import</code> <code>com.alibaba.rocketmq.common.message.messagequeue;</code>

<code>import</code> <code>com.alibaba.rocketmq.remoting.exception.remotingexception;</code>

<code>/**</code>

<code></code><code>* producer,發送順序消息</code>

<code></code><code>*/</code>

<code>public</code> <code>class</code> <code>producer {</code>

<code></code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>ioexception {</code>

<code></code><code>try</code> <code>{</code>

<code></code><code>defaultmqproducer producer =</code><code>new</code> <code>defaultmqproducer(</code><code>"please_rename_unique_group_name"</code><code>);</code>

<code></code><code>producer.setnamesrvaddr(</code><code>"192.168.0.104:9876"</code><code>);</code>

<code></code><code>producer.start();</code>

<code></code><code>string[] tags =</code><code>new</code> <code>string[] {</code><code>"taga"</code><code>,</code><code>"tagc"</code><code>,</code><code>"tagd"</code> <code>};</code>

<code></code><code>date date =</code><code>new</code> <code>date();</code>

<code></code><code>simpledateformat sdf =</code><code>new</code> <code>simpledateformat(</code><code>"yyyy-mm-dd hh:mm:ss"</code><code>);</code>

<code></code><code>string datestr = sdf.format(date);</code>

<code></code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>10</code><code>; i++) {</code>

<code></code><code>// 加個時間字尾</code>

<code></code><code>string body = datestr +</code><code>" hello rocketmq "</code> <code>+ i;</code>

<code></code><code>message msg =</code><code>new</code> <code>message(</code><code>"topictestjjj"</code><code>, tags[i % tags.length],</code><code>"key"</code> <code>+ i, body.getbytes());</code>

<code></code><code>sendresult sendresult = producer.send(msg,</code><code>new</code> <code>messagequeueselector() {</code>

<code></code><code>@override</code>

<code></code><code>public</code> <code>messagequeue select(list&lt;messagequeue&gt; mqs, message msg, object arg) {</code>

<code></code><code>integer id = (integer) arg;</code>

<code></code><code>return</code> <code>mqs.get(id);</code>

<code></code><code>}</code>

<code></code><code>},</code><code>0</code><code>);</code><code>//0是隊列的下标</code>

<code></code><code>system.out.println(sendresult +</code><code>", body:"</code> <code>+ body);</code>

<code></code><code>producer.shutdown();</code>

<code></code><code>}</code><code>catch</code> <code>(mqclientexception e) {</code>

<code></code><code>e.printstacktrace();</code>

<code></code><code>}</code><code>catch</code> <code>(remotingexception e) {</code>

<code></code><code>}</code><code>catch</code> <code>(mqbrokerexception e) {</code>

<code></code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code></code><code>system.in.read();</code>

<code>}</code>

consumer端:

<code>import</code> <code>java.util.random;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.consumer.defaultmqpushconsumer;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.consumer.listener.consumeorderlycontext;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.consumer.listener.consumeorderlystatus;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.consumer.listener.messagelistenerorderly;</code>

<code>import</code> <code>com.alibaba.rocketmq.common.consumer.consumefromwhere;</code>

<code>import</code> <code>com.alibaba.rocketmq.common.message.messageext;</code>

<code></code><code>* 順序消息消費,帶事務方式(應用可控制offset什麼時候送出)</code>

<code>public</code> <code>class</code> <code>consumer {</code>

<code></code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>mqclientexception {</code>

<code></code><code>defaultmqpushconsumer consumer =</code><code>new</code> <code>defaultmqpushconsumer(</code><code>"please_rename_unique_group_name_3"</code><code>);</code>

<code></code><code>consumer.setnamesrvaddr(</code><code>"192.168.0.104:9876"</code><code>);</code>

<code></code><code>/**</code>

<code></code><code>* 設定consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費&lt;br&gt;</code>

<code></code><code>* 如果非第一次啟動,那麼按照上次消費的位置繼續消費</code>

<code></code><code>consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset);</code>

<code></code><code>consumer.subscribe(</code><code>"topictestjjj"</code><code>,</code><code>"taga || tagc || tagd"</code><code>);</code>

<code></code><code>consumer.registermessagelistener(</code><code>new</code> <code>messagelistenerorderly() {</code>

<code></code><code>random random =</code><code>new</code> <code>random();</code>

<code></code><code>public</code> <code>consumeorderlystatus consumemessage(list&lt;messageext&gt; msgs, consumeorderlycontext context) {</code>

<code></code><code>context.setautocommit(</code><code>true</code><code>);</code>

<code></code><code>system.out.print(thread.currentthread().getname() +</code><code>" receive new messages: "</code> <code>);</code>

<code></code><code>for</code> <code>(messageext msg: msgs) {</code>

<code></code><code>system.out.println(msg +</code><code>", content:"</code> <code>+</code><code>new</code> <code>string(msg.getbody()));</code>

<code></code><code>//模拟業務邏輯進行中...</code>

<code></code><code>timeunit.seconds.sleep(random.nextint(</code><code>10</code><code>));</code>

<code></code><code>}</code><code>catch</code> <code>(exception e) {</code>

<code></code><code>return</code> <code>consumeorderlystatus.success;</code>

<code></code><code>});</code>

<code></code><code>consumer.start();</code>

<code></code><code>system.out.println(</code><code>"consumer started."</code><code>);</code>

nameserver和brokerserver起來後,運作列印,把前面的不重要的去掉了,隻看後面的幾列:

content:2015-12-06 17:03:21 hello rocketmq 0

content:2015-12-06 17:03:21 hello rocketmq 1

content:2015-12-06 17:03:21 hello rocketmq 2

content:2015-12-06 17:03:21 hello rocketmq 3

content:2015-12-06 17:03:21 hello rocketmq 4

content:2015-12-06 17:03:21 hello rocketmq 5

content:2015-12-06 17:03:21 hello rocketmq 6

content:2015-12-06 17:03:21 hello rocketmq 7

content:2015-12-06 17:03:21 hello rocketmq 8

content:2015-12-06 17:03:21 hello rocketmq 9

可以看到,消息有序的。

如何在叢集消費時保證消費的有序呢?

1.consumemessageorderlyservice類的start()方法,如果是叢集消費,則啟動定時任務,定時向broker發送批量鎖住目前正在消費的隊列集合的消息,具體是consumer端拿到正在消費的隊列集合,發送鎖住隊列的消息至broker,broker端傳回鎖住成功的隊列集合。

consumer收到後,設定是否鎖住标志位。

這裡注意2個變量:

consumer端的rebalanceimpl裡的concurrenthashmap processqueuetable,是否鎖住設定在processqueue裡。

broker端的rebalancelockmanager裡的concurrenthashmap&gt; mqlocktable,這裡維護着全局隊列鎖。,&gt;

2.consumemessageorderlyservice.consumerequest的run方法是消費消息,這裡還有個messagequeuelock messagequeuelock,維護目前consumer端的本地隊列鎖。保證目前隻有一個線程能夠進行消費。

3.拉到消息存入processqueue,然後判斷,本地是否獲得鎖,全局隊列是否被鎖住,然後從processqueue裡取出消息,用messagelistenerorderly進行消費。

拉到消息後調用processqueue.putmessage(final list msgs) 存入,具體是存入treemapmsgtreemap。

然後是調用processqueue.takemessags(final int batchsize)消費,具體是把msgtreemap裡消費過的消息,轉移到treemap msgtreemaptemp。,&gt;,&gt;

4.本地消費的事務控制,consumeorderlystatus.success(送出),consumeorderlystatus.suspend_current_queue_a_moment(挂起一會再消費),在此之前還有一個變量consumeorderlycontext context的setautocommit()是否自動送出。

當suspend_current_queue_a_moment時,autocommit設定為true或者false沒有差別,本質跟消費相反,把消息從msgtreemaptemp轉移回msgtreemap,等待下次消費。

當success時,autocommit設定為true時比設定為false多做了2個動作,consumerequest.getprocessqueue().commit()和this.defaultmqpushconsumerimpl.getoffsetstore().updateoffset(consumerequest.getmessagequeue(), commitoffset, false);

processqueue.commit() :本質是删除msgtreemaptemp裡的消息,msgtreemaptemp裡的消息在上面消費時從msgtreemap轉移過來的。

this.defaultmqpushconsumerimpl.getoffsetstore().updateoffset() :本質是把拉消息的偏移量更新到本地,然後定時更新到broker。

那麼少了這2個動作會怎麼樣呢,随着消息的消費進行,msgtreemaptemp裡的消息堆積越來越多,消費消息的偏移量一直沒有更新到broker導緻consumer每次重新啟動後都要從頭開始重複消費。

就算更新了offset到broker,那麼msgtreemaptemp裡的消息堆積呢?不知道這算不算bug。

是以,還是把autocommit設定為true吧。