opic的有序消息已經成為mq的标配。而RocketMQ中是這樣區分消息類型的, 普通消息也叫做無序消息,簡單來說就是沒有順序的消息,而有序消息就是按照一定的先後順序的消息類型。舉個例子,producer 依次發送 order id 為 1、2、3 的消息到 broker,consumer 接到的消息順序也就是 1、2、3 ,而不會出現普通消息那樣的 2、1、3 等情況。
一、有序消息該如何實作
理論上:我們都知道消息首先由 producer 到 broker,再從 broker 到 consumer,分這兩步走。那麼要保證消息的有序,勢必這兩步都是要保證有序的,即要保證消息是按有序發送到 broker,broker 也是有序将消息投遞給 consumer,兩個條件必須同時滿足,缺一不可。
1.1、全局有序消息
由于一個 topic 隻有一個 queue ,即使我們有多個 producer 執行個體和 consumer 執行個體也很難提高消息吞吐量。就好比過獨木橋,大家隻能一個挨着一個過去,效率低下。
1.2、局部有序消息
常見做法就是将 order id 進行處理,将 order id 相同的消息發送到 topicB 的同一個 queue,假設我們 topicB 有 2 個 queue,那麼我們可以簡單的對 id 取餘,奇數的發往 queue0,偶數的發往 queue1,消費者按照 queue 去消費時,就能保證 queue0 裡面的消息有序消費,queue1 裡面的消息有序消費。
二、RocketMQ的topic的補充
opic 隻是消息的邏輯分類,内部實作其實是由 queue 組成。當 producer 把消息發送到某個 topic 時,預設是會消息發送到具體的 queue 上。由于一個 topic 可以有多個 queue,是以在性能比全局有序高得多。假設 queue 數是 n,理論上性能就是全局有序的 n 倍,當然 consumer 也要跟着增加才行。在實際情況中,這種局部有序消息是會比全局有序消息用的更多。
/**
* 有序消息
*/
public class OrderedProducer {
public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException {
// 1:建立生産者對象,并指定組名
DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
// 2:指定NameServer位址
producer.setNamesrvAddr(NAME_SERVER_ADDR);
// 3:啟動生産者
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0); // 設定異步發送失敗重試次數,預設為2
// 4:定義消息隊列選擇器
MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
/**
* 消息隊列選擇器,保證同一條業務資料的消息在同一個隊列
* @param mqs topic中所有隊列的集合
* @param msg 發送的消息
* @param arg 此參數是本示例中producer.send的第三個參數
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
// id == 1001
int index = id % mqs.size();
// 分區順序:同一個模值的消息在同一個隊列中
return mqs.get(index);
// 全局順序:所有的消息都在同一個隊列中
// return mqs.get(mqs.size() - 1);
}
};
String[] tags = new String[]{"TagA", "TagB", "TagC"};
List<Map> bizDatas = getBizDatas();
// 5:循環發送消息
for (int i = 0; i < bizDatas.size(); i++) {
Map bizData = bizDatas.get(i);
// keys:業務資料的ID,比如使用者ID、訂單編号等等
Message msg = new Message("TopicTest", tags[i % tags.length], "" + bizData.get("msgType"), bizData.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發送有序消息
SendResult sendResult = producer.send(msg, messageQueueSelector, bizData.get("msgType"));
System.out.printf("%s, body:%s%n", sendResult, bizData);
}
// 6:關閉生産者
producer.shutdown();
}
public static List<Map> getBizDatas() {
List<Map> orders = new ArrayList<Map>();
HashMap orderData = new HashMap();
orderData.put("msgType", 1001);
orderData.put("userId", "張三");
orderData.put("desc", "存錢1000");
orders.add(orderData);
orderData = new HashMap();
orderData.put("msgType", 2001);
orderData.put("userId", "張三");
orderData.put("desc", "取錢1000");
orders.add(orderData);
orderData = new HashMap();
orderData.put("msgType", 3001);
orderData.put("userId", "張三");
orderData.put("desc", "存錢2000");
orders.add(orderData);
orderData = new HashMap();
orderData.put("msgType", 4001);
orderData.put("userId", "張三");
orderData.put("desc", "存錢3000");
orders.add(orderData);
orderData = new HashMap();
orderData.put("msgType", 5001);
orderData.put("userId", "張三");
orderData.put("desc", "存錢4000");
orders.add(orderData);
orderData = new HashMap();
orderData.put("msgType", 6001);
orderData.put("userId", "張三");
orderData.put("desc", "取錢5000");
orders.add(orderData);
orderData = new HashMap();
orderData.put("msgType", 7001);
orderData.put("userId", "張三");
orderData.put("desc", "取錢6000");
orders.add(orderData);
orderData = new HashMap();
orderData.put("msgType", 8001);
orderData.put("userId", "張三");
orderData.put("desc", "取錢2000");
orders.add(orderData);
orderData = new HashMap();
orderData.put("msgType", 9001);
orderData.put("userId", "張三");
orderData.put("desc", "存錢9000");
orders.add(orderData);
return orders;
}
}
/**
* 順序消息消費者
*/
public class OrderedConsumer {
public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
public static void main(String[] args) throws Exception {
// 1. 建立消費者(Push)對象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST");
// 2. 設定NameServer的位址,如果設定了環境變量NAMESRV_ADDR,可以省略此步
consumer.setNamesrvAddr(NAME_SERVER_ADDR);
/**
* 設定Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
* 這裡設定的是一個consumer的消費政策
* CONSUME_FROM_LAST_OFFSET 預設政策,從該隊列最尾開始消費,即跳過曆史消息
* CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即曆史消息(還儲存在broker的)全部消費一遍
* CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 3. 訂閱對應的主題和Tag String[] tags = new String[]{"TagA", "TagB", "TagC"};
consumer.subscribe("TopicTest", "TagA || TagB || TagC");
// 4. 注冊消息接收到Broker消息後的處理接口
// 注1:普通消息消費 [[
// consumer.registerMessageListener(new MessageListenerConcurrently() {
// AtomicInteger count = new AtomicInteger(0);
//
// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// doBiz(list.get(0));
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// }
// });
// ]] 注1:普通消息消費
// consumer
consumer.setMaxReconsumeTimes(-1);
// 延時 level 3
// 注2:順序消息消費 [[
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true);
doBiz(msgs.get(0));
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5. 啟動消費者(必須在注冊完消息監聽器後啟動,否則會報錯)
consumer.start();
System.out.println("已啟動消費者");
}
/**
* 模拟處理業務
*
* @param message
*/
public static void doBiz(Message message) {
try {
System.out.printf("線程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), message.getTags(), new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}

這短短的一生我們最終都會失去,不妨大膽一點,愛一個人,攀一座山,追一個夢