文章目錄
- 1. 引入依賴
- 2. 發送消息
-
- 2.1 發送同步消息
- 2.2 發送異步消息
- 2.3 發送單向消息
- 2.4 發送順序消息
- 3. Push方式消費消息
- 4. Pull方式消費消息
1. 引入依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
注意,這兒的jar包的版本号必須和rocket的版本号完全一緻,否則可能出現各種意向不到的問題,如果發現例子無法正常運作,請優先檢查版本号
2. 發送消息
2.1 發送同步消息
//使用組名來初始化一個生産者
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 指定Server Name 位址.
producer.setNamesrvAddr("localhost:9876");
//啟動執行個體
producer.start();
for (int i = 0; i < 100; i++) {
//建立消息執行個體,指定 topic, tag 和 消息内容
byte[] msgBytes = ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
Message msg = new Message("TopicTest","TagA" ,msgBytes);
// 發送消息并擷取發送結果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//生産者執行個體不在使用時進行關閉
producer.shutdown();
2.2 發送異步消息
吐槽官網,官網的例子基本無法運作,因為 在異步等待響應結果的時候,生産者執行個體被關閉了。。解決方案 注釋掉不關閉
// 使用組名執行個體化生産者。
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 指定NameServer 位址
producer.setNamesrvAddr("localhost:9876");
// 啟動執行個體
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//建立消息執行個體,指定 topic, tag 和 消息内容.
byte[] msgBytes = ("Hello world " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
Message msg = new Message("TopicTest","TagA","OrderID188",msgBytes);
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 生産者執行個體不在使用時進行關閉.
// producer.shutdown();
2.3 發送單向消息
// 使用組名執行個體化生産者 .
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 指定NameServer 位址.
producer.setNamesrvAddr("localhost:9876");
// 啟動執行個體.
producer.start();
for (int i = 0; i < 100; i++) {
// 建立消息執行個體,指定 topic, tag 和 消息内容.
byte[] msgBytes = ("Hello world " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
Message msg = new Message("TopicTest","TagA",msgBytes );
// 發送消息.
producer.sendOneway(msg);
}
// 生産者執行個體不在使用時進行關閉.
producer.shutdown();
2.4 發送順序消息
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
byte[] msgBytes = ("Hello world " +i).getBytes(RemotingHelper.DEFAULT_CHARSET);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,msgBytes);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
3. Push方式消費消息
// 用組名執行個體化消費者.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 指定NameServer 位址.
consumer.setNamesrvAddr("localhost:9876");
// 訂閱一個Topic.
consumer.subscribe("TopicTest", "*");
// 注冊回掉函數,從Broker中取到消息時會調用.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者執行個體.
consumer.start();
System.out.printf("Consumer Started.%n");
4. Pull方式消費消息
// 用組名執行個體化消費者.
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("sumanGroup");
// 指定NameServer 位址.
consumer.setNamesrvAddr("localhost:9876");
consumer.start();
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("TopicTest");
MessageQueue[] messageQueueArr = messageQueues.toArray(new MessageQueue[]{});
long offset = consumer.fetchConsumeOffset(messageQueueArr[0],true);
System.out.println(offset);
PullResult pull = consumer.pullBlockIfNotFound(messageQueueArr[0], "*", offset, 5);
pull.getMsgFoundList().forEach(item->System.out.println(new String(item.getBody())));
consumer.updateConsumeOffset(messageQueueArr[0],pull.getNextBeginOffset());
consumer.getOffsetStore().persist(messageQueueArr[0]);
/* 啟動消費者執行個體. */
System.out.printf("Consumer Started.%n");