rocketmq的順序消息需要滿足2點:
1.producer端保證發送消息有序,且發送到同一個隊列。
2.consumer端保證消費同一個隊列。
先看個例子,代碼版本跟前面的一樣。
producer類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<code>import</code> <code>java.io.ioexception;</code>
<code>import</code> <code>java.text.simpledateformat;</code>
<code>import</code> <code>java.util.date;</code>
<code>import</code> <code>java.util.list;</code>
<code>import</code> <code>com.alibaba.rocketmq.client.exception.mqbrokerexception;</code>
<code>import</code> <code>com.alibaba.rocketmq.client.exception.mqclientexception;</code>
<code>import</code> <code>com.alibaba.rocketmq.client.producer.defaultmqproducer;</code>
<code>import</code> <code>com.alibaba.rocketmq.client.producer.messagequeueselector;</code>
<code>import</code> <code>com.alibaba.rocketmq.client.producer.sendresult;</code>
<code>import</code> <code>com.alibaba.rocketmq.common.message.message;</code>
<code>import</code> <code>com.alibaba.rocketmq.common.message.messagequeue;</code>
<code>import</code> <code>com.alibaba.rocketmq.remoting.exception.remotingexception;</code>
<code>/**</code>
<code></code><code>* producer,發送順序消息</code>
<code></code><code>*/</code>
<code>public</code> <code>class</code> <code>producer {</code>
<code></code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>ioexception {</code>
<code></code><code>try</code> <code>{</code>
<code></code><code>defaultmqproducer producer =</code><code>new</code> <code>defaultmqproducer(</code><code>"please_rename_unique_group_name"</code><code>);</code>
<code></code><code>producer.setnamesrvaddr(</code><code>"192.168.0.104:9876"</code><code>);</code>
<code></code><code>producer.start();</code>
<code></code><code>string[] tags =</code><code>new</code> <code>string[] {</code><code>"taga"</code><code>,</code><code>"tagc"</code><code>,</code><code>"tagd"</code> <code>};</code>
<code></code><code>date date =</code><code>new</code> <code>date();</code>
<code></code><code>simpledateformat sdf =</code><code>new</code> <code>simpledateformat(</code><code>"yyyy-mm-dd hh:mm:ss"</code><code>);</code>
<code></code><code>string datestr = sdf.format(date);</code>
<code></code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i <</code><code>10</code><code>; i++) {</code>
<code></code><code>// 加個時間字尾</code>
<code></code><code>string body = datestr +</code><code>" hello rocketmq "</code> <code>+ i;</code>
<code></code><code>message msg =</code><code>new</code> <code>message(</code><code>"topictestjjj"</code><code>, tags[i % tags.length],</code><code>"key"</code> <code>+ i, body.getbytes());</code>
<code></code><code>sendresult sendresult = producer.send(msg,</code><code>new</code> <code>messagequeueselector() {</code>
<code></code><code>@override</code>
<code></code><code>public</code> <code>messagequeue select(list<messagequeue> mqs, message msg, object arg) {</code>
<code></code><code>integer id = (integer) arg;</code>
<code></code><code>return</code> <code>mqs.get(id);</code>
<code></code><code>}</code>
<code></code><code>},</code><code>0</code><code>);</code><code>//0是隊列的下标</code>
<code></code><code>system.out.println(sendresult +</code><code>", body:"</code> <code>+ body);</code>
<code></code><code>producer.shutdown();</code>
<code></code><code>}</code><code>catch</code> <code>(mqclientexception e) {</code>
<code></code><code>e.printstacktrace();</code>
<code></code><code>}</code><code>catch</code> <code>(remotingexception e) {</code>
<code></code><code>}</code><code>catch</code> <code>(mqbrokerexception e) {</code>
<code></code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>
<code></code><code>system.in.read();</code>
<code>}</code>
consumer端:
<code>import</code> <code>java.util.random;</code>
<code>import</code> <code>java.util.concurrent.timeunit;</code>
<code>import</code> <code>com.alibaba.rocketmq.client.consumer.defaultmqpushconsumer;</code>
<code>import</code> <code>com.alibaba.rocketmq.client.consumer.listener.consumeorderlycontext;</code>
<code>import</code> <code>com.alibaba.rocketmq.client.consumer.listener.consumeorderlystatus;</code>
<code>import</code> <code>com.alibaba.rocketmq.client.consumer.listener.messagelistenerorderly;</code>
<code>import</code> <code>com.alibaba.rocketmq.common.consumer.consumefromwhere;</code>
<code>import</code> <code>com.alibaba.rocketmq.common.message.messageext;</code>
<code></code><code>* 順序消息消費,帶事務方式(應用可控制offset什麼時候送出)</code>
<code>public</code> <code>class</code> <code>consumer {</code>
<code></code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>mqclientexception {</code>
<code></code><code>defaultmqpushconsumer consumer =</code><code>new</code> <code>defaultmqpushconsumer(</code><code>"please_rename_unique_group_name_3"</code><code>);</code>
<code></code><code>consumer.setnamesrvaddr(</code><code>"192.168.0.104:9876"</code><code>);</code>
<code></code><code>/**</code>
<code></code><code>* 設定consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br></code>
<code></code><code>* 如果非第一次啟動,那麼按照上次消費的位置繼續消費</code>
<code></code><code>consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset);</code>
<code></code><code>consumer.subscribe(</code><code>"topictestjjj"</code><code>,</code><code>"taga || tagc || tagd"</code><code>);</code>
<code></code><code>consumer.registermessagelistener(</code><code>new</code> <code>messagelistenerorderly() {</code>
<code></code><code>random random =</code><code>new</code> <code>random();</code>
<code></code><code>public</code> <code>consumeorderlystatus consumemessage(list<messageext> msgs, consumeorderlycontext context) {</code>
<code></code><code>context.setautocommit(</code><code>true</code><code>);</code>
<code></code><code>system.out.print(thread.currentthread().getname() +</code><code>" receive new messages: "</code> <code>);</code>
<code></code><code>for</code> <code>(messageext msg: msgs) {</code>
<code></code><code>system.out.println(msg +</code><code>", content:"</code> <code>+</code><code>new</code> <code>string(msg.getbody()));</code>
<code></code><code>//模拟業務邏輯進行中...</code>
<code></code><code>timeunit.seconds.sleep(random.nextint(</code><code>10</code><code>));</code>
<code></code><code>}</code><code>catch</code> <code>(exception e) {</code>
<code></code><code>return</code> <code>consumeorderlystatus.success;</code>
<code></code><code>});</code>
<code></code><code>consumer.start();</code>
<code></code><code>system.out.println(</code><code>"consumer started."</code><code>);</code>
nameserver和brokerserver起來後,運作列印,把前面的不重要的去掉了,隻看後面的幾列:
content:2015-12-06 17:03:21 hello rocketmq 0
content:2015-12-06 17:03:21 hello rocketmq 1
content:2015-12-06 17:03:21 hello rocketmq 2
content:2015-12-06 17:03:21 hello rocketmq 3
content:2015-12-06 17:03:21 hello rocketmq 4
content:2015-12-06 17:03:21 hello rocketmq 5
content:2015-12-06 17:03:21 hello rocketmq 6
content:2015-12-06 17:03:21 hello rocketmq 7
content:2015-12-06 17:03:21 hello rocketmq 8
content:2015-12-06 17:03:21 hello rocketmq 9
可以看到,消息有序的。
如何在叢集消費時保證消費的有序呢?
1.consumemessageorderlyservice類的start()方法,如果是叢集消費,則啟動定時任務,定時向broker發送批量鎖住目前正在消費的隊列集合的消息,具體是consumer端拿到正在消費的隊列集合,發送鎖住隊列的消息至broker,broker端傳回鎖住成功的隊列集合。
consumer收到後,設定是否鎖住标志位。
這裡注意2個變量:
consumer端的rebalanceimpl裡的concurrenthashmap processqueuetable,是否鎖住設定在processqueue裡。
broker端的rebalancelockmanager裡的concurrenthashmap> mqlocktable,這裡維護着全局隊列鎖。,>
2.consumemessageorderlyservice.consumerequest的run方法是消費消息,這裡還有個messagequeuelock messagequeuelock,維護目前consumer端的本地隊列鎖。保證目前隻有一個線程能夠進行消費。
3.拉到消息存入processqueue,然後判斷,本地是否獲得鎖,全局隊列是否被鎖住,然後從processqueue裡取出消息,用messagelistenerorderly進行消費。
拉到消息後調用processqueue.putmessage(final list msgs) 存入,具體是存入treemapmsgtreemap。
然後是調用processqueue.takemessags(final int batchsize)消費,具體是把msgtreemap裡消費過的消息,轉移到treemap msgtreemaptemp。,>,>
4.本地消費的事務控制,consumeorderlystatus.success(送出),consumeorderlystatus.suspend_current_queue_a_moment(挂起一會再消費),在此之前還有一個變量consumeorderlycontext context的setautocommit()是否自動送出。
當suspend_current_queue_a_moment時,autocommit設定為true或者false沒有差別,本質跟消費相反,把消息從msgtreemaptemp轉移回msgtreemap,等待下次消費。
當success時,autocommit設定為true時比設定為false多做了2個動作,consumerequest.getprocessqueue().commit()和this.defaultmqpushconsumerimpl.getoffsetstore().updateoffset(consumerequest.getmessagequeue(), commitoffset, false);
processqueue.commit() :本質是删除msgtreemaptemp裡的消息,msgtreemaptemp裡的消息在上面消費時從msgtreemap轉移過來的。
this.defaultmqpushconsumerimpl.getoffsetstore().updateoffset() :本質是把拉消息的偏移量更新到本地,然後定時更新到broker。
那麼少了這2個動作會怎麼樣呢,随着消息的消費進行,msgtreemaptemp裡的消息堆積越來越多,消費消息的偏移量一直沒有更新到broker導緻consumer每次重新啟動後都要從頭開始重複消費。
就算更新了offset到broker,那麼msgtreemaptemp裡的消息堆積呢?不知道這算不算bug。
是以,還是把autocommit設定為true吧。