天天看点

JAVA应用开发MQ实战最佳实践——Series1:RocketMQ综述及代码设计

后续内容

JAVA应用开发MQ实战最佳实践——Series2:消息队列RocketMQ性能测试案例
JAVA应用开发MQ实战最佳实践——Series1:RocketMQ综述及代码设计

1. 最佳实践综述

本次最佳实践,将结合JAVA代码对消息队列RocketMQ版(简称RocketMQ)的使用原理进行分析。

RocketMQ 是企业级互联网架构的核心产品,具备低延迟、高并发、高可用、高可靠,可支撑万亿级数据洪峰的分布式消息中间件。可通过RocketMQ控制台创建RocketMQ实例,无需安装包,省去繁杂的手续。对RocketMQ消息服务消息可视化可以按Topic、MessageID或Topic不同维度查询发送的消息、按消息轨迹功能展示发送和消费关系、消息是否成功消费等信息。其中资源报表可以快速的统计RocketMQ在一定时间段内发送和订阅消息的TPS数。

本次最佳实践的内容主要包含:

(1)消息同步和异步发送的JAVA示例代码及原理分析。

(2)针对同步和异步发送的区别选择适用的消息发送方式满足需求。

(3)对消息发送可以分Topic,更细粒化标签tag消息进行归类。

(4)通过Topic和Tag选择过滤消费消息。

(5)对消息发送失败有进行消息重试处理。

(6)结合JAVA代码对集群和广播订阅消息消费原理进行详述。

2. 最佳实践代码设计

2.1 生产者发送消息

本章对生产者发送消息的两种模式进行代码的说明。

2.1.1 同步发送消息

同步发送原理:

同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。

1.在pom.xml文件导入依赖包。

<dependency>
    <groupId>************ces</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>
</dependency>

           

2.配置文件application.properties连接mq的参数值。

# POC2专有云MQ配置

mq.accessKey=**************1HaI

mq.secretKey=*****************1HaI

mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.*************-d01.mq.namesrv.cloud.poc2.com:9876

mq.normalTopic=pdsa_topic

mq.producerId=GID_pdsa_mq

mq.consumerId=CID_consumer

mq.sendMsgTimeoutMillis=3000

mq.tag=TagA

3.同步发送示例代码,针对性适配后面MQ性能压测场景代码,内容包含发送每条消息数据大小50Kb,Topic和Tag消息更细粒化分类,消息发送失败进行重试处理。

JAVA应用开发MQ实战最佳实践——Series1:RocketMQ综述及代码设计

package com.aliware.edas.com.aliware.edas.rocketmq;

import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;

import com.aliyun.openservices.ons.api.*;

import org.springframework.cloud.context.config.annotation.RefreshScope;

import org.springframework.stereotype.Component;

import java.util.Date;

import java.util.Properties;

/**

* @author liuhuihui

*/

@Component("simpleMQProduce")

@ RefreshScope

public class SimpleMQProduce extends ProducerEntry

{

StringBuilder content = new StringBuilder();

public void sendMsg()

{

for(int i = 0; i < 6400; i++)

{

content.append(String.valueOf("A"));

}

Properties properties = new Properties();

properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());

// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建

properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());

// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建

properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());

//设置发送超时时间,单位毫秒

properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());

// 设置 TCP 接入域名,到控制台的实例基本信息中查看

properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());

Producer producer = ONSFactory.createProducer(properties);

// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可

producer.start();

Message msg = new Message(

// Message 所属的 Topic

this.getTopic(),

// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤

this.getTag(),

// Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,

// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式

(content.toString()).getBytes());

// 设置代表消息的业务关键属性,请尽可能全局唯一。

// 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发

// 注意:不设置也不会影响消息正常收发

msg.setKey("ORDERID_" + 1);

try

{

SendResult sendResult = producer.send(msg);

// 同步发送消息,只要不抛异常就是成功

if(sendResult != null)

{

System.out.println(new Date() + "消息长度:" + content.length() + "--发送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));

}

}

catch(Exception e)

{

// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理

System.out.println(new Date() + "消息长度:" + content.length() + "--发送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));

e.printStackTrace();

}

// 在应用退出前,销毁 Producer 对象

// 注意:如果不销毁也没有问题

producer.shutdown();

}

}

2.1.2 同步发送应用场景

此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

2.1.3 异步发送消息

异步发送原理:

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要用户实现异步发送回调接口。

<dependency>
    <groupId>c***********ces</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>
</dependency>
           

# POC2专有云MQ配置

mq.accessKey=*************1qc

mq.secretKey=***************R1HaI

mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.**********-d01.mq.namesrv.cloud.poc2.com:9876

mq.normalTopic=pdsa_topic

mq.producerId=GID_pdsa_mq

mq.consumerId=CID_consumer

mq.sendMsgTimeoutMillis=3000

mq.tag=TagA

3.异步发送示例代码,针对性适配后面MQ性能压测场景代码,内容包含发送每条消息数据大小50Kb ,Topic和Tag消息更细粒化分类,消息发送失败进行重试处理。

JAVA应用开发MQ实战最佳实践——Series1:RocketMQ综述及代码设计

package com.aliware.edas.com.aliware.edas.rocketmq;

import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;

import com.aliyun.openservices.ons.api.*;

import org.springframework.cloud.context.config.annotation.RefreshScope;

import org.springframework.stereotype.Component;

import java.util.Date;

import java.util.Properties;

/**

* @author liuhuihui

*/

@Component("asyncSimpleMQProduce")

@ RefreshScope

public class AsyncSimpleMQProduce extends ProducerEntry

{

StringBuilder content = new StringBuilder();

public void sendMsg()

{

for(int i = 0; i < 6400; i++)

{

content.append(String.valueOf("A"));

}

Properties properties = new Properties();

properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());

// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建

properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());

// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建

properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());

//设置发送超时时间,单位毫秒

properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());

// 设置 TCP 接入域名,到控制台的实例基本信息中查看

properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());

Producer producer = ONSFactory.createProducer(properties);

// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可

producer.start();

Message msg = new Message(

// Message 所属的 Topic

this.getTopic(),

// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤

this.getTag(),

// Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,

// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式

(content.toString()).getBytes());

// 设置代表消息的业务关键属性,请尽可能全局唯一。

// 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发

// 注意:不设置也不会影响消息正常收发

msg.setKey("ORDERID_" + 1);

while(true)

{

producer.sendAsync(msg, new SendCallback()

{

@Override

public void onSuccess(SendResult sendResult)

{

System.out.println(new Date() + "消息长度:" + content.length() + "--发送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));

}

@Override

public void onException(OnExceptionContext context)

{

System.out.println("发送失败!");

}

});

}

// 在应用退出前,销毁 Producer 对象

// 注意:如果不销毁也没有问题

// producer.shutdown();

}

}

2.1.4 异步发送应用场景

异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

2.2 消费者订阅消息

本章对消费者订阅消息的两种模式进行代码的说明。

2.2.1 集群订阅

集群订阅原理:

同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。

<dependency>
    <groupId>**************ices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>
</dependency>           

3.集群订阅示例代码,适配后面MQ性能压测场景代码。

JAVA应用开发MQ实战最佳实践——Series1:RocketMQ综述及代码设计

@Component("simpleMQConsumer")

@ RefreshScope

public classSimpleMQConsumerextendsProducerEntry

{

public void receive()

{

Properties properties = new Properties();

//您在控制台创建的GroupID

properties.put(PropertyKeyConst.GROUP_ID, this.getProcucerId());

//AccessKey阿里云身份验证,在阿里云服务器管理控制台创建

properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());

//SecretKey阿里云身份验证,在阿里云服务器管理控制台创建

properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());

//设置TCP接入域名,到控制台的实例基本信息中查看

properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());

//集群订阅方式(默认)

properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);

Consumer consumer = ONSFactory.createConsumer(properties);

//订阅另外一个Topic

consumer.subscribe(this.getTopic(), "*", new MessageListener()

{ //订阅全部Tag

@Override

public Action consume(Message message, ConsumeContext context)

{

System.out.println("Receive:" + message);

return Action.CommitMessage;

}

});

consumer.start();

System.out.println("Consumer Started");

}

}

2.2.2 广播订阅

广播订阅原理:

同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。

1.在pom.xml文件导入依赖包.

<dependency>
    <groupId>**************ces</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>
</dependency>           

2.广播消费示例代码。

JAVA应用开发MQ实战最佳实践——Series1:RocketMQ综述及代码设计

@Component("simpleMQConsumer")

@ RefreshScope

public classSimpleMQConsumerextendsProducerEntry

{

public void receive()

{

Properties properties = new Properties();

//您在控制台创建的GroupID

properties.put(PropertyKeyConst.GROUP_ID, this.getProcucerId());

//AccessKey阿里云身份验证,在阿里云服务器管理控制台创建

properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());

//SecretKey阿里云身份验证,在阿里云服务器管理控制台创建

properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());

//设置TCP接入域名,到控制台的实例基本信息中查看

properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());

//广播订阅方式(默认)

properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);

Consumer consumer = ONSFactory.createConsumer(properties);

consumer.subscribe(this.getTopic(), "*", new MessageListener()

{ //订阅全部Tag

@Override

public Action consume(Message message, ConsumeContext context)

{

System.out.println("Receive:" + message);

return Action.CommitMessage;

}

});

consumer.start();

System.out.println("Consumer Started");

}

}

我们是阿里云智能全球技术服务-SRE团队,我们致力成为一个以技术为基础、面向服务、保障业务系统高可用的工程师团队;提供专业、体系化的SRE服务,帮助广大客户更好地使用云、基于云构建更加稳定可靠的业务系统,提升业务稳定性。我们期望能够分享更多帮助企业客户上云、用好云,让客户云上业务运行更加稳定可靠的技术,您可用钉钉扫描下方二维码,加入阿里云SRE技术学院钉钉圈子,和更多云上人交流关于云平台的那些事。

JAVA应用开发MQ实战最佳实践——Series1:RocketMQ综述及代码设计