前言
雖然現在是SpringBoot開發統領的時代,但還是要學習原始的Api使用,一方面有助于加深對RocketMq的了解,另一方面友善在出現問題時調試代碼。(Template誰不會用?對吧?)
Start
Dependency
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
複制代碼
同步消息
同步消息:生産者發送消息之後,必須等待broker傳回資訊之後才繼續業務邏輯,在broker傳回資訊之前,生産者保持阻塞等待狀态
生産者
public class SyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
producer.setNamesrvAddr("192.168.23.128:9876");
producer.start();
for (int i = 0; i < 100; ++i) {
Message message = new Message("TopicTest", "TagA", ("Message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.printf("send %s", new String(message.getBody()));
System.out.println();
System.out.printf("get result %s", sendResult.toString());
}
producer.shutdown();
}
}
複制代碼
消費者
public class SyncConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
consumer.setNamesrvAddr("192.168.23.128:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive Messages : %s\n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
複制代碼
異步消息
異步消息:生産者提供消息之後,不需要等待broker回報即可繼續執行業務邏輯,僅需要提供一個回調函數
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group02");
producer.setNamesrvAddr("192.168.23.128:9876");
producer.start();
producer.setRetryTimesWhenSendFailed(0);
int count = 100;
final CountDownLatch countDownLatch = new CountDownLatch(count);
for (int i = 0; i < count; ++i) {
try {
Message message = new Message("TopicTest", "TagA", "key", ("hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
final int index = i;
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%s get success result for message : %d\n", Thread.currentThread().getName(), index);
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%s get error result for message : %d\n", Thread.currentThread().getName(), index);
}
});
} catch (UnsupportedEncodingException | RemotingException | InterruptedException e) {
throw new RuntimeException(e);
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
複制代碼
單向消息
不需要等待broker回報,也不需要回調
public class OneWayProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
producer.setNamesrvAddr("192.168.23.128:9876");
producer.start();
for (int i = 0; i < 100; ++i) {
Message message = new Message("TopicTest", "TagA", ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(message);
}
Thread.sleep(5000);
producer.shutdown();
}
}
複制代碼
順序消息
局部順序消息
:指消費者消費某個topic中的某個隊列時是順序消費。使用MessageListenerOrderly實作.
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("192.168.23.128:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println("receive message : " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("consumer start");
}
}
複制代碼
全局順序消息
消費者消費全部消息都是順序的,隻能通過某topic隻有一個隊列實作,使用場景少,性能差。
亂序消費
使用MessageListenerConcurrently實作,我們在上面同步消息已經示範過
廣播消息
消費者
public class BroadcastConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("192.168.23.128:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt message : msgs) {
System.out.println("receive message : " + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
複制代碼
生産者
public class BroadcastProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
producer.setNamesrvAddr("192.168.23.128:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message message = new Message("TopicTest", "TagA", "OrderId", ("hello " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(message);
}
producer.shutdown();
}
}
複制代碼
延遲消息
生産者
public class ScheduledProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
producer.start();
for (int i = 0; i < 100; ++i) {
Message message = new Message("TopicTest", ("hello " + i).getBytes(StandardCharsets.UTF_8));
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}
複制代碼
消費者
public class ScheduledConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group01");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt message : msgs) {
// TODO: 消費消息
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
複制代碼
RocketMQ設計了18個延遲等級
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
批量消息
發送一個Message集合
public class BatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer-group01");
producer.start();
String topic = "TopicTest";
List<Message> messages = new LinkedList<>();
messages.add(new Message(topic, "TagA", "Order01", "order01".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message(topic, "TagA", "Order02", "order02".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message(topic, "TagA", "Order03", "order03".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message(topic, "TagA", "Order04", "order04".getBytes(StandardCharsets.UTF_8)));
producer.send(messages);
producer.shutdown();
}
}
複制代碼
但是,我們之前在部署broker時,會注意到一個參數,maxMessageSize=4194304,這是批量消息的最大大小。如果超過這個值,應采用分批發送.
事務消息
: 事務消息確定本地事務的執行和消息的發送可以原子地進行。 事務消息三種狀态:
- TransactionStatus.CommitTransaction:事務送出,允許消費
- TransactionStatus.RollbackTransaction:事務復原,消息删除
- TransactionStatus.Unknown:需要MQ回查确定狀态
事務消息流程
生産者
public class TransactionProducer {
public static void main(String[] args) throws MQClientException {
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagA")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagB")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagC")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagD")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
};
TransactionMQProducer producer = new TransactionMQProducer("producer-group01");
producer.setNamespace("192.168.23.128:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message message = new Message("TopicTest", tags[i % tags.length], "Key" + i, ("hello " + i).getBytes());
producer.sendMessageInTransaction(message, null);
}
producer.shutdown();
}
}
複制代碼
注意事項
- 事務消息沒有排程和批處理主持
- 為避免單條消息被檢查次數過多,導緻半消息堆積,我們預設将單條消息的檢查次數限制為15次,可以通過transactionCheckMax更改。一旦超過,broker會丢棄該消息,并給出錯誤日志,可以通過重寫AbstractTransactionCheckListener來定制
- 事務消息在一定時間後檢查,由transactionTimeout确定,也可以在發送事務消息時設定CHECK_IMMUNITY_TIME_IN_SECONDS的使用者屬性來改變。
- 一個事務消息可能會被檢查或者消費不止一次
- 送出給⽤戶⽬标主題的消息reput可能會失敗。⽬前,它取決于⽇志記錄。 ⾼可⽤是由 RocketMQ 本身的⾼可⽤機制來保證的。如果要保證事務消息不 丢失,保證事務完整性,推薦使⽤同步雙寫機制。
- 事務性消息的生産者 ID 不能與其他類型消息的⽣産者 ID 共享。與其他類型 的消息不同,事務性消息允許向後查詢。MQ 伺服器通過其生産者 ID 查詢客 戶端