天天看点

RocketMQ学习(九):顺序消息

rocketmq的顺序消息需要满足2点:

1.producer端保证发送消息有序,且发送到同一个队列。

2.consumer端保证消费同一个队列。

先看个例子,代码版本跟前面的一样。

producer类:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

<code>import</code> <code>java.io.ioexception;</code>

<code>import</code> <code>java.text.simpledateformat;</code>

<code>import</code> <code>java.util.date;</code>

<code>import</code> <code>java.util.list;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.exception.mqbrokerexception;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.exception.mqclientexception;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.producer.defaultmqproducer;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.producer.messagequeueselector;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.producer.sendresult;</code>

<code>import</code> <code>com.alibaba.rocketmq.common.message.message;</code>

<code>import</code> <code>com.alibaba.rocketmq.common.message.messagequeue;</code>

<code>import</code> <code>com.alibaba.rocketmq.remoting.exception.remotingexception;</code>

<code>/**</code>

<code></code><code>* producer,发送顺序消息</code>

<code></code><code>*/</code>

<code>public</code> <code>class</code> <code>producer {</code>

<code></code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>ioexception {</code>

<code></code><code>try</code> <code>{</code>

<code></code><code>defaultmqproducer producer =</code><code>new</code> <code>defaultmqproducer(</code><code>"please_rename_unique_group_name"</code><code>);</code>

<code></code><code>producer.setnamesrvaddr(</code><code>"192.168.0.104:9876"</code><code>);</code>

<code></code><code>producer.start();</code>

<code></code><code>string[] tags =</code><code>new</code> <code>string[] {</code><code>"taga"</code><code>,</code><code>"tagc"</code><code>,</code><code>"tagd"</code> <code>};</code>

<code></code><code>date date =</code><code>new</code> <code>date();</code>

<code></code><code>simpledateformat sdf =</code><code>new</code> <code>simpledateformat(</code><code>"yyyy-mm-dd hh:mm:ss"</code><code>);</code>

<code></code><code>string datestr = sdf.format(date);</code>

<code></code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt;</code><code>10</code><code>; i++) {</code>

<code></code><code>// 加个时间后缀</code>

<code></code><code>string body = datestr +</code><code>" hello rocketmq "</code> <code>+ i;</code>

<code></code><code>message msg =</code><code>new</code> <code>message(</code><code>"topictestjjj"</code><code>, tags[i % tags.length],</code><code>"key"</code> <code>+ i, body.getbytes());</code>

<code></code><code>sendresult sendresult = producer.send(msg,</code><code>new</code> <code>messagequeueselector() {</code>

<code></code><code>@override</code>

<code></code><code>public</code> <code>messagequeue select(list&lt;messagequeue&gt; mqs, message msg, object arg) {</code>

<code></code><code>integer id = (integer) arg;</code>

<code></code><code>return</code> <code>mqs.get(id);</code>

<code></code><code>}</code>

<code></code><code>},</code><code>0</code><code>);</code><code>//0是队列的下标</code>

<code></code><code>system.out.println(sendresult +</code><code>", body:"</code> <code>+ body);</code>

<code></code><code>producer.shutdown();</code>

<code></code><code>}</code><code>catch</code> <code>(mqclientexception e) {</code>

<code></code><code>e.printstacktrace();</code>

<code></code><code>}</code><code>catch</code> <code>(remotingexception e) {</code>

<code></code><code>}</code><code>catch</code> <code>(mqbrokerexception e) {</code>

<code></code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code></code><code>system.in.read();</code>

<code>}</code>

consumer端:

<code>import</code> <code>java.util.random;</code>

<code>import</code> <code>java.util.concurrent.timeunit;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.consumer.defaultmqpushconsumer;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.consumer.listener.consumeorderlycontext;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.consumer.listener.consumeorderlystatus;</code>

<code>import</code> <code>com.alibaba.rocketmq.client.consumer.listener.messagelistenerorderly;</code>

<code>import</code> <code>com.alibaba.rocketmq.common.consumer.consumefromwhere;</code>

<code>import</code> <code>com.alibaba.rocketmq.common.message.messageext;</code>

<code></code><code>* 顺序消息消费,带事务方式(应用可控制offset什么时候提交)</code>

<code>public</code> <code>class</code> <code>consumer {</code>

<code></code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args)</code><code>throws</code> <code>mqclientexception {</code>

<code></code><code>defaultmqpushconsumer consumer =</code><code>new</code> <code>defaultmqpushconsumer(</code><code>"please_rename_unique_group_name_3"</code><code>);</code>

<code></code><code>consumer.setnamesrvaddr(</code><code>"192.168.0.104:9876"</code><code>);</code>

<code></code><code>/**</code>

<code></code><code>* 设置consumer第一次启动是从队列头部开始消费还是队列尾部开始消费&lt;br&gt;</code>

<code></code><code>* 如果非第一次启动,那么按照上次消费的位置继续消费</code>

<code></code><code>consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset);</code>

<code></code><code>consumer.subscribe(</code><code>"topictestjjj"</code><code>,</code><code>"taga || tagc || tagd"</code><code>);</code>

<code></code><code>consumer.registermessagelistener(</code><code>new</code> <code>messagelistenerorderly() {</code>

<code></code><code>random random =</code><code>new</code> <code>random();</code>

<code></code><code>public</code> <code>consumeorderlystatus consumemessage(list&lt;messageext&gt; msgs, consumeorderlycontext context) {</code>

<code></code><code>context.setautocommit(</code><code>true</code><code>);</code>

<code></code><code>system.out.print(thread.currentthread().getname() +</code><code>" receive new messages: "</code> <code>);</code>

<code></code><code>for</code> <code>(messageext msg: msgs) {</code>

<code></code><code>system.out.println(msg +</code><code>", content:"</code> <code>+</code><code>new</code> <code>string(msg.getbody()));</code>

<code></code><code>//模拟业务逻辑处理中...</code>

<code></code><code>timeunit.seconds.sleep(random.nextint(</code><code>10</code><code>));</code>

<code></code><code>}</code><code>catch</code> <code>(exception e) {</code>

<code></code><code>return</code> <code>consumeorderlystatus.success;</code>

<code></code><code>});</code>

<code></code><code>consumer.start();</code>

<code></code><code>system.out.println(</code><code>"consumer started."</code><code>);</code>

nameserver和brokerserver起来后,运行打印,把前面的不重要的去掉了,只看后面的几列:

content:2015-12-06 17:03:21 hello rocketmq 0

content:2015-12-06 17:03:21 hello rocketmq 1

content:2015-12-06 17:03:21 hello rocketmq 2

content:2015-12-06 17:03:21 hello rocketmq 3

content:2015-12-06 17:03:21 hello rocketmq 4

content:2015-12-06 17:03:21 hello rocketmq 5

content:2015-12-06 17:03:21 hello rocketmq 6

content:2015-12-06 17:03:21 hello rocketmq 7

content:2015-12-06 17:03:21 hello rocketmq 8

content:2015-12-06 17:03:21 hello rocketmq 9

可以看到,消息有序的。

如何在集群消费时保证消费的有序呢?

1.consumemessageorderlyservice类的start()方法,如果是集群消费,则启动定时任务,定时向broker发送批量锁住当前正在消费的队列集合的消息,具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合。

consumer收到后,设置是否锁住标志位。

这里注意2个变量:

consumer端的rebalanceimpl里的concurrenthashmap processqueuetable,是否锁住设置在processqueue里。

broker端的rebalancelockmanager里的concurrenthashmap&gt; mqlocktable,这里维护着全局队列锁。,&gt;

2.consumemessageorderlyservice.consumerequest的run方法是消费消息,这里还有个messagequeuelock messagequeuelock,维护当前consumer端的本地队列锁。保证当前只有一个线程能够进行消费。

3.拉到消息存入processqueue,然后判断,本地是否获得锁,全局队列是否被锁住,然后从processqueue里取出消息,用messagelistenerorderly进行消费。

拉到消息后调用processqueue.putmessage(final list msgs) 存入,具体是存入treemapmsgtreemap。

然后是调用processqueue.takemessags(final int batchsize)消费,具体是把msgtreemap里消费过的消息,转移到treemap msgtreemaptemp。,&gt;,&gt;

4.本地消费的事务控制,consumeorderlystatus.success(提交),consumeorderlystatus.suspend_current_queue_a_moment(挂起一会再消费),在此之前还有一个变量consumeorderlycontext context的setautocommit()是否自动提交。

当suspend_current_queue_a_moment时,autocommit设置为true或者false没有区别,本质跟消费相反,把消息从msgtreemaptemp转移回msgtreemap,等待下次消费。

当success时,autocommit设置为true时比设置为false多做了2个动作,consumerequest.getprocessqueue().commit()和this.defaultmqpushconsumerimpl.getoffsetstore().updateoffset(consumerequest.getmessagequeue(), commitoffset, false);

processqueue.commit() :本质是删除msgtreemaptemp里的消息,msgtreemaptemp里的消息在上面消费时从msgtreemap转移过来的。

this.defaultmqpushconsumerimpl.getoffsetstore().updateoffset() :本质是把拉消息的偏移量更新到本地,然后定时更新到broker。

那么少了这2个动作会怎么样呢,随着消息的消费进行,msgtreemaptemp里的消息堆积越来越多,消费消息的偏移量一直没有更新到broker导致consumer每次重新启动后都要从头开始重复消费。

就算更新了offset到broker,那么msgtreemaptemp里的消息堆积呢?不知道这算不算bug。

所以,还是把autocommit设置为true吧。