後續内容
JAVA應用開發MQ實戰最佳實踐——Series2:消息隊列RocketMQ性能測試案例
1. 最佳實踐綜述
本次最佳實踐,将結合JAVA代碼對消息隊列RocketMQ版(簡稱RocketMQ)的使用原理進行分析。
RocketMQ 是企業級網際網路架構的核心産品,具備低延遲、高并發、高可用、高可靠,可支撐萬億級資料洪峰的分布式消息中間件。可通過RocketMQ控制台建立RocketMQ執行個體,無需安裝包,省去繁雜的手續。對RocketMQ消息服務消息可視化可以按Topic、MessageID或Topic不同次元查詢發送的消息、按消息軌迹功能展示發送和消費關系、消息是否成功消費等資訊。其中資源報表可以快速的統計RocketMQ在一定時間段内發送和訂閱消息的TPS數。
本次最佳實踐的内容主要包含:
(1)消息同步和異步發送的JAVA示例代碼及原理分析。
(2)針對同步和異步發送的差別選擇适用的消息發送方式滿足需求。
(3)對消息發送可以分Topic,更細粒化标簽tag消息進行歸類。
(4)通過Topic和Tag選擇過濾消費消息。
(5)對消息發送失敗有進行消息重試處理。
(6)結合JAVA代碼對叢集和廣播訂閱消息消費原理進行詳述。
2. 最佳實踐代碼設計
2.1 生産者發送消息
本章對生産者發送消息的兩種模式進行代碼的說明。
2.1.1 同步發送消息
同步發送原理:
同步發送是指消息發送方發出一條消息後,會在收到服務端傳回響應之後才發下一條消息的通訊方式。
1.在pom.xml檔案導入依賴包。
<dependency>
<groupId>************ces</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
2.配置檔案application.properties連接配接mq的參數值。
# POC2專有雲MQ配置
mq.accessKey=**************1HaI
mq.secretKey=*****************1HaI
mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.*************-d01.mq.namesrv.cloud.poc2.com:9876
mq.normalTopic=pdsa_topic
mq.producerId=GID_pdsa_mq
mq.consumerId=CID_consumer
mq.sendMsgTimeoutMillis=3000
mq.tag=TagA
3.同步發送示例代碼,針對性适配後面MQ性能壓測場景代碼,内容包含發送每條消息資料大小50Kb,Topic和Tag消息更細粒化分類,消息發送失敗進行重試處理。
package com.aliware.edas.com.aliware.edas.rocketmq;
import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;
import com.aliyun.openservices.ons.api.*;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Properties;
/**
* @author liuhuihui
*/
@Component("simpleMQProduce")
@ RefreshScope
public class SimpleMQProduce extends ProducerEntry
{
StringBuilder content = new StringBuilder();
public void sendMsg()
{
for(int i = 0; i < 6400; i++)
{
content.append(String.valueOf("A"));
}
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());
// AccessKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
// SecretKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//設定發送逾時時間,機關毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());
// 設定 TCP 接入域名,到控制台的執行個體基本資訊中檢視
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用 start 方法來啟動 Producer,隻需調用一次即可
producer.start();
Message msg = new Message(
// Message 所屬的 Topic
this.getTopic(),
// Message Tag 可了解為 Gmail 中的标簽,對消息進行再歸類,友善 Consumer 指定過濾條件在 MQ 伺服器過濾
this.getTag(),
// Message Body 可以是任何二進制形式的資料, MQ 不做任何幹預,
// 需要 Producer 與 Consumer 協商好一緻的序列化和反序列化方式
(content.toString()).getBytes());
// 設定代表消息的業務關鍵屬性,請盡可能全局唯一。
// 以友善您在無法正常收到消息情況下,可通過阿裡雲伺服器管理控制台查詢消息并補發
// 注意:不設定也不會影響消息正常收發
msg.setKey("ORDERID_" + 1);
try
{
SendResult sendResult = producer.send(msg);
// 同步發送消息,隻要不抛異常就是成功
if(sendResult != null)
{
System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));
}
}
catch(Exception e)
{
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條資料進行補償處理
System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));
e.printStackTrace();
}
// 在應用退出前,銷毀 Producer 對象
// 注意:如果不銷毀也沒有問題
producer.shutdown();
}
}
2.1.2 同步發送應用場景
此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統等。
2.1.3 異步發送消息
異步發送原理:
異步發送是指發送方發出一條消息後,不等服務端傳回響應,接着發送下一條消息的通訊方式。消息隊列RocketMQ版的異步發送,需要使用者實作異步發送回調接口。
<dependency>
<groupId>c***********ces</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
# POC2專有雲MQ配置
mq.accessKey=*************1qc
mq.secretKey=***************R1HaI
mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.**********-d01.mq.namesrv.cloud.poc2.com:9876
mq.normalTopic=pdsa_topic
mq.producerId=GID_pdsa_mq
mq.consumerId=CID_consumer
mq.sendMsgTimeoutMillis=3000
mq.tag=TagA
3.異步發送示例代碼,針對性适配後面MQ性能壓測場景代碼,内容包含發送每條消息資料大小50Kb ,Topic和Tag消息更細粒化分類,消息發送失敗進行重試處理。
package com.aliware.edas.com.aliware.edas.rocketmq;
import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;
import com.aliyun.openservices.ons.api.*;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Properties;
/**
* @author liuhuihui
*/
@Component("asyncSimpleMQProduce")
@ RefreshScope
public class AsyncSimpleMQProduce extends ProducerEntry
{
StringBuilder content = new StringBuilder();
public void sendMsg()
{
for(int i = 0; i < 6400; i++)
{
content.append(String.valueOf("A"));
}
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());
// AccessKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
// SecretKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//設定發送逾時時間,機關毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());
// 設定 TCP 接入域名,到控制台的執行個體基本資訊中檢視
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用 start 方法來啟動 Producer,隻需調用一次即可
producer.start();
Message msg = new Message(
// Message 所屬的 Topic
this.getTopic(),
// Message Tag 可了解為 Gmail 中的标簽,對消息進行再歸類,友善 Consumer 指定過濾條件在 MQ 伺服器過濾
this.getTag(),
// Message Body 可以是任何二進制形式的資料, MQ 不做任何幹預,
// 需要 Producer 與 Consumer 協商好一緻的序列化和反序列化方式
(content.toString()).getBytes());
// 設定代表消息的業務關鍵屬性,請盡可能全局唯一。
// 以友善您在無法正常收到消息情況下,可通過阿裡雲伺服器管理控制台查詢消息并補發
// 注意:不設定也不會影響消息正常收發
msg.setKey("ORDERID_" + 1);
while(true)
{
producer.sendAsync(msg, new SendCallback()
{
@Override
public void onSuccess(SendResult sendResult)
{
System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));
}
@Override
public void onException(OnExceptionContext context)
{
System.out.println("發送失敗!");
}
});
}
// 在應用退出前,銷毀 Producer 對象
// 注意:如果不銷毀也沒有問題
// producer.shutdown();
}
}
2.1.4 異步發送應用場景
異步發送一般用于鍊路耗時較長,對響應時間較為敏感的業務場景,例如使用者視訊上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。
2.2 消費者訂閱消息
本章對消費者訂閱消息的兩種模式進行代碼的說明。
2.2.1 叢集訂閱
叢集訂閱原理:
同一個Group ID所辨別的所有Consumer平均分攤消費消息。例如某個Topic有9條消息,一個Group ID有3個Consumer執行個體,那麼在叢集消費模式下每個執行個體平均分攤,隻消費其中的3條消息。
<dependency>
<groupId>**************ices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
3.叢集訂閱示例代碼,适配後面MQ性能壓測場景代碼。
@Component("simpleMQConsumer")
@ RefreshScope
public classSimpleMQConsumerextendsProducerEntry
{
public void receive()
{
Properties properties = new Properties();
//您在控制台建立的GroupID
properties.put(PropertyKeyConst.GROUP_ID, this.getProcucerId());
//AccessKey阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
//SecretKey阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//設定TCP接入域名,到控制台的執行個體基本資訊中檢視
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
//叢集訂閱方式(預設)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
Consumer consumer = ONSFactory.createConsumer(properties);
//訂閱另外一個Topic
consumer.subscribe(this.getTopic(), "*", new MessageListener()
{ //訂閱全部Tag
@Override
public Action consume(Message message, ConsumeContext context)
{
System.out.println("Receive:" + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
2.2.2 廣播訂閱
廣播訂閱原理:
同一個Group ID所辨別的所有Consumer都會各自消費某條消息一次。例如某個Topic有9條消息,一個Group ID有3個Consumer執行個體,那麼在廣播消費模式下每個執行個體都會各自消費9條消息。
1.在pom.xml檔案導入依賴包.
<dependency>
<groupId>**************ces</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
2.廣播消費示例代碼。
@Component("simpleMQConsumer")
@ RefreshScope
public classSimpleMQConsumerextendsProducerEntry
{
public void receive()
{
Properties properties = new Properties();
//您在控制台建立的GroupID
properties.put(PropertyKeyConst.GROUP_ID, this.getProcucerId());
//AccessKey阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
//SecretKey阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//設定TCP接入域名,到控制台的執行個體基本資訊中檢視
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
//廣播訂閱方式(預設)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(this.getTopic(), "*", new MessageListener()
{ //訂閱全部Tag
@Override
public Action consume(Message message, ConsumeContext context)
{
System.out.println("Receive:" + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
我們是阿裡雲智能全球技術服務-SRE團隊,我們緻力成為一個以技術為基礎、面向服務、保障業務系統高可用的工程師團隊;提供專業、體系化的SRE服務,幫助廣大客戶更好地使用雲、基于雲建構更加穩定可靠的業務系統,提升業務穩定性。我們期望能夠分享更多幫助企業客戶上雲、用好雲,讓客戶雲上業務運作更加穩定可靠的技術,您可用釘釘掃描下方二維碼,加入阿裡雲SRE技術學院釘釘圈子,和更多雲上人交流關于雲平台的那些事。