概念:
rcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件,号稱消息中間件中的最強者,支援高并發,億級的消息堆積能力,在高并發的電商,金融等業務場景中多有使用。
特性:
1、支援釋出/訂閱(Pub/Sub)和點對點(P2P)消息模型
2、在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞
3、支援拉(pull)和推(push)兩種消息模式
4、單一隊列百萬消息的堆積能力
5、支援多種消息協定,如 JMS、MQTT 等
6、分布式高可用的部署架構,滿足至少一次消息傳遞語義
7、提供 docker 鏡像用于隔離測試和雲叢集部署
8、提供配置、名額和監控等功能豐富的 Dashboard
專業術語:
Producer:
消息生産者,生産者的作用就是将消息發送到 MQ,生産者本身既可以産生消息,如讀取文本資訊等。也可以對外提供接口,由外部應用來調用接口,再由生産者将收到的消息發送到 MQ。
Producer Group:
生産者組,簡單來說就是多個發送同一類消息的生産者稱之為一個生産者組。在這裡可以不用關心,隻要知道有這麼一個概念即可。
Consumer:
消息消費者,簡單來說,消費 MQ 上的消息的應用程式就是消費者,至于消息是否進行邏輯處理,還是直接存儲到資料庫等取決于業務需要。
Consumer Group:
消費者組,和生産者類似,消費同一類消息的多個 consumer 執行個體組成一個消費者組。
Topic:
一種消息的邏輯分類,比如說你有訂單類的消息,也有庫存類的消息,那麼就需要進行分類,一個是訂單 Topic 存放訂單相關的消息,一個是庫存 Topic 存儲庫存相關的消息。
Message:
Message 是消息的載體。一個 Message 必須指定 topic,相當于寄信的位址。Message 還有一個可選的 tag 設定,以便消費端可以基于 tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業務 key 來查找 broker 上的消息,友善在開發過程中診斷問題。
Tag:
标簽可以被認為是對 Topic 進一步細化。一般在相同業務子產品中通過引入标簽來标記不同用途的消息。
Broker:
Broker 是 RocketMQ 系統的主要角色,其實就是前面一直說的 MQ。Broker 接收來自生産者的消息,儲存以及為消費者拉取消息的請求做好準備。
Name Server:
Name Server 為 producer 和 consumer 提供路由資訊。
叢集組成
由下面這張圖可以看到有四個叢集,分别是 NameServer 叢集、Broker 叢集、Producer 叢集和 Consumer 叢集:
NameServer:
提供輕量級的服務發現和路由。 每個 NameServer 記錄完整的路由資訊,提供等效的讀寫服務,并支援快速存儲擴充。
Broker:
通過提供輕量級的 Topic 和 Queue 機制來處理消息存儲,同時支援推(push)和拉(pull)模式以及主從結構的容錯機制。
Producer:
生産者,産生消息的執行個體,擁有相同 Producer Group 的 Producer 組成一個叢集。
Consumer:
消費者,接收消息進行消費的執行個體,擁有相同 Consumer Group 的 Consumer 組成一個叢集。
簡述:
簡單說明一下圖中箭頭含義,從 Broker 開始,Broker Master1 和 Broker Slave1 是主從結構,它們之間會進行資料同步,即 Date Sync。同時每個 Broker 與 NameServer 叢集中的所有節點建立長連接配接,定時注冊 Topic 資訊到所有 NameServer 中。
Producer 與 NameServer 叢集中的其中一個節點(随機選擇)建立長連接配接,定期從 NameServer 擷取 Topic 路由資訊,并向提供 Topic 服務的 Broker Master 建立長連接配接,且定時向 Broker 發送心跳。Producer 隻能将消息發送到 Broker master,但是 Consumer 則不一樣,它同時和提供 Topic 服務的 Master 和 Slave 建立長連接配接,既可以從 Broker Master 訂閱消息,也可以從 Broker Slave 訂閱消息。
RocketMQ 叢集部署模式
1、單 master 模式
隻有一個 master 節點,稱不上是叢集,一旦這個 master 節點當機,那麼整個服務就不可用,适合個人學習使用。
2、多 master 模式
多個 master 節點組成叢集,單個 master 節點當機或者重新開機對應用沒有影響。
優點:所有模式中性能最高。
缺點:單個 master 節點當機期間,未被消費的消息在節點恢複之前不可用,消息的實時性就受到影響。
注意:使用同步刷盤可以保證消息不丢失,同時 Topic 相對應的 queue 應該分布在叢集中各個節點,而不是隻在某各節點上,否則,該節點當機會對訂閱該 topic 的應用造成影響。
3、多 master 多 slave 異步複制模式
在多 master 模式的基礎上,每個 master 節點都有至少一個對應的 slave。master 節點可讀可寫,但是 slave 隻能讀不能寫,類似于 mysql 的主備模式。
優點: 在 master 當機時,消費者可以從 slave 讀取消息,消息的實時性不會受影響,性能幾乎和多 master 一樣。
缺點:使用異步複制的同步方式有可能會有消息丢失的問題。
4、多 master 多 slave 同步雙寫模式
同多 master 多 slave 異步複制模式類似,差別在于 master 和 slave 之間的資料同步方式。
優點:同步雙寫的同步模式能保證資料不丢失。
缺點:發送單個消息 RT 會略長,性能相比異步複制低10%左右。
刷盤政策:同步刷盤和異步刷盤(指的是節點自身資料是同步還是異步存儲)
同步方式:同步雙寫和異步複制(指的一組 master 和 slave 之間資料的同步)
注意:要保證資料可靠,需采用同步刷盤和同步雙寫的方式,但性能會較其他方式低。
入門執行個體:
1、引入maven依賴
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.5.8</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.5.8</version>
<type>pom</type>
</dependency>
2、編寫生産者Producer代碼
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//需要一個producer group名字作為構造方法的參數,這裡為producer1
DefaultMQProducer producer = new DefaultMQProducer("producer1");
//設定NameServer位址,此處應改為實際NameServer位址,多個位址之間用;分隔
//NameServer的位址必須有,但是也可以通過環境變量的方式設定,不一定非得寫死在代碼裡
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setVipChannelEnabled(false);
//為避免程式啟動的時候報錯,添加此代碼,可以讓rocketMq自動建立topickey
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
producer.start();
for(int i=0;i<10;i++){
try {
Message message = new Message("TopicTest", "Tag1",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println("發送的消息ID:" + sendResult.getMsgId() +"--- 發送消息的狀态:" + sendResult.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
3、編寫消費者Customer代碼
public class Consumer {
private static final String ADDR = "127.0.0.1:9876";
public static void main(String[] args) throws MQClientException {
//設定消費者組
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_LRW_DEV_SUBS");
consumer.setVipChannelEnabled(false);
consumer.setNamesrvAddr(ADDR);
//設定消費者端消息拉取政策,表示從哪裡開始消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//設定消費者拉取消息的政策,*表示消費該topic下的所有消息,也可以指定tag進行消息過濾
consumer.subscribe("TopicTest", "*");
//消費者端啟動消息監聽,一旦生産者發送消息被監聽到,就列印消息,和rabbitmq中的handlerDelivery類似
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
String topic = messageExt.getTopic();
String tag = messageExt.getTags();
String msg = new String(messageExt.getBody());
System.out.println("*********************************");
System.out.println("消費響應:msgId : " + messageExt.getMsgId() + ", msgBody : " + msg + ", tag:" + tag + ", topic:" + topic);
System.out.println("*********************************");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//調用start()方法啟動consumer
consumer.start();
System.out.println("Consumer Started....");
}
}
4、首先啟動消費者,然後啟動生産者,如果啟動消費者沒有報錯,但是啟動消費者報錯了,如下所示:
Caused by: com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now, maybe disk full, CL: 0.90 CQ: 0.90 INDEX: 0.90, maybe your broker machine memory too small.
For more information, please visit the url, http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:455)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:272)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:253)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:215)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:671)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:440)
... 35 more
解決辦法: 打開rocketmq安裝目錄,進入bin目錄下,修改runbroker.sh,在裡面增加一句話即可: JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98" 我這裡把磁盤保護的百分比設定成98%,隻有磁盤空間使用率達到98%時才拒絕接收producer消息。 原因如下圖所示,rocketmq預設的磁盤空間警戒水位是90%
5、下面是正常的生産者生産的資料,生産了10條資料
6、下面的列印就是消費者接收到的資料,可以看到消費者端也成功收到了這10條消息,每條消息都會存在一個msgId,是以實際業務中,可以據此回溯查到每一條消息,保證消息的較低的丢失率;
參考部落格:https://blog.csdn.net/zhangcongyi420/article/details/82593982