前言
虽然现在是SpringBoot开发统领的时代,但还是要学习原始的Api使用,一方面有助于加深对RocketMq的理解,另一方面方便在出现问题时调试代码。(Template谁不会用?对吧?)
Start
Dependency
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
复制代码
同步消息
同步消息:生产者发送消息之后,必须等待broker返回信息之后才继续业务逻辑,在broker返回信息之前,生产者保持阻塞等待状态
生产者
public class SyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
producer.setNamesrvAddr("192.168.23.128:9876");
producer.start();
for (int i = 0; i < 100; ++i) {
Message message = new Message("TopicTest", "TagA", ("Message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.printf("send %s", new String(message.getBody()));
System.out.println();
System.out.printf("get result %s", sendResult.toString());
}
producer.shutdown();
}
}
复制代码
消费者
public class SyncConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
consumer.setNamesrvAddr("192.168.23.128:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive Messages : %s\n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
复制代码
异步消息
异步消息:生产者提供消息之后,不需要等待broker反馈即可继续执行业务逻辑,仅需要提供一个回调函数
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group02");
producer.setNamesrvAddr("192.168.23.128:9876");
producer.start();
producer.setRetryTimesWhenSendFailed(0);
int count = 100;
final CountDownLatch countDownLatch = new CountDownLatch(count);
for (int i = 0; i < count; ++i) {
try {
Message message = new Message("TopicTest", "TagA", "key", ("hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
final int index = i;
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%s get success result for message : %d\n", Thread.currentThread().getName(), index);
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%s get error result for message : %d\n", Thread.currentThread().getName(), index);
}
});
} catch (UnsupportedEncodingException | RemotingException | InterruptedException e) {
throw new RuntimeException(e);
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
复制代码
单向消息
不需要等待broker反馈,也不需要回调
public class OneWayProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
producer.setNamesrvAddr("192.168.23.128:9876");
producer.start();
for (int i = 0; i < 100; ++i) {
Message message = new Message("TopicTest", "TagA", ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(message);
}
Thread.sleep(5000);
producer.shutdown();
}
}
复制代码
顺序消息
局部顺序消息
:指消费者消费某个topic中的某个队列时是顺序消费。使用MessageListenerOrderly实现.
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("192.168.23.128:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println("receive message : " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("consumer start");
}
}
复制代码
全局顺序消息
消费者消费全部消息都是顺序的,只能通过某topic只有一个队列实现,使用场景少,性能差。
乱序消费
使用MessageListenerConcurrently实现,我们在上面同步消息已经演示过
广播消息
消费者
public class BroadcastConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("192.168.23.128:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt message : msgs) {
System.out.println("receive message : " + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
复制代码
生产者
public class BroadcastProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
producer.setNamesrvAddr("192.168.23.128:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message message = new Message("TopicTest", "TagA", "OrderId", ("hello " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(message);
}
producer.shutdown();
}
}
复制代码
延迟消息
生产者
public class ScheduledProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
producer.start();
for (int i = 0; i < 100; ++i) {
Message message = new Message("TopicTest", ("hello " + i).getBytes(StandardCharsets.UTF_8));
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}
复制代码
消费者
public class ScheduledConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt message : msgs) {
// TODO: 消费消息
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
复制代码
RocketMQ设计了18个延迟等级
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
批量消息
发送一个Message集合
public class BatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
producer.start();
String topic = "TopicTest";
List<Message> messages = new LinkedList<>();
messages.add(new Message(topic, "TagA", "Order01", "order01".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message(topic, "TagA", "Order02", "order02".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message(topic, "TagA", "Order03", "order03".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message(topic, "TagA", "Order04", "order04".getBytes(StandardCharsets.UTF_8)));
producer.send(messages);
producer.shutdown();
}
}
复制代码
但是,我们之前在部署broker时,会注意到一个参数,maxMessageSize=4194304,这是批量消息的最大大小。如果超过这个值,应采用分批发送.
事务消息
: 事务消息确保本地事务的执行和消息的发送可以原子地进行。 事务消息三种状态:
- TransactionStatus.CommitTransaction:事务提交,允许消费
- TransactionStatus.RollbackTransaction:事务回滚,消息删除
- TransactionStatus.Unknown:需要MQ回查确定状态
事务消息流程
生产者
public class TransactionProducer {
public static void main(String[] args) throws MQClientException {
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagA")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagB")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagC")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagD")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
};
TransactionMQProducer producer = new TransactionMQProducer("producer-group01");
producer.setNamespace("192.168.23.128:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message message = new Message("TopicTest", tags[i % tags.length], "Key" + i, ("hello " + i).getBytes());
producer.sendMessageInTransaction(message, null);
}
producer.shutdown();
}
}
复制代码
注意事项
- 事务消息没有调度和批处理主持
- 为避免单条消息被检查次数过多,导致半消息堆积,我们默认将单条消息的检查次数限制为15次,可以通过transactionCheckMax更改。一旦超过,broker会丢弃该消息,并给出错误日志,可以通过重写AbstractTransactionCheckListener来定制
- 事务消息在一定时间后检查,由transactionTimeout确定,也可以在发送事务消息时设置CHECK_IMMUNITY_TIME_IN_SECONDS的用户属性来改变。
- 一个事务消息可能会被检查或者消费不止一次
- 提交给⽤户⽬标主题的消息reput可能会失败。⽬前,它取决于⽇志记录。 ⾼可⽤是由 RocketMQ 本身的⾼可⽤机制来保证的。如果要保证事务消息不 丢失,保证事务完整性,推荐使⽤同步双写机制。
- 事务性消息的生产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型 的消息不同,事务性消息允许向后查询。MQ 服务器通过其生产者 ID 查询客 户端