天天看點

rocketmq簡介概念:特性:專業術語:叢集組成入門執行個體:

概念:

       rcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件,号稱消息中間件中的最強者,支援高并發,億級的消息堆積能力,在高并發的電商,金融等業務場景中多有使用。

特性:

       1、支援釋出/訂閱(Pub/Sub)和點對點(P2P)消息模型 

       2、在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞 

       3、支援拉(pull)和推(push)兩種消息模式 

       4、單一隊列百萬消息的堆積能力 

       5、支援多種消息協定,如 JMS、MQTT 等 

       6、分布式高可用的部署架構,滿足至少一次消息傳遞語義 

       7、提供 docker 鏡像用于隔離測試和雲叢集部署 

      8、提供配置、名額和監控等功能豐富的 Dashboard 

專業術語:

Producer:

       消息生産者,生産者的作用就是将消息發送到 MQ,生産者本身既可以産生消息,如讀取文本資訊等。也可以對外提供接口,由外部應用來調用接口,再由生産者将收到的消息發送到 MQ。

Producer Group:

       生産者組,簡單來說就是多個發送同一類消息的生産者稱之為一個生産者組。在這裡可以不用關心,隻要知道有這麼一個概念即可。

Consumer:

       消息消費者,簡單來說,消費 MQ 上的消息的應用程式就是消費者,至于消息是否進行邏輯處理,還是直接存儲到資料庫等取決于業務需要。

Consumer Group:

       消費者組,和生産者類似,消費同一類消息的多個 consumer 執行個體組成一個消費者組。

Topic:

       一種消息的邏輯分類,比如說你有訂單類的消息,也有庫存類的消息,那麼就需要進行分類,一個是訂單 Topic 存放訂單相關的消息,一個是庫存 Topic 存儲庫存相關的消息。

Message:

       Message 是消息的載體。一個 Message 必須指定 topic,相當于寄信的位址。Message 還有一個可選的 tag 設定,以便消費端可以基于 tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業務 key 來查找 broker 上的消息,友善在開發過程中診斷問題。

Tag:

       标簽可以被認為是對 Topic 進一步細化。一般在相同業務子產品中通過引入标簽來标記不同用途的消息。

Broker: 

       Broker 是 RocketMQ 系統的主要角色,其實就是前面一直說的 MQ。Broker 接收來自生産者的消息,儲存以及為消費者拉取消息的請求做好準備。

Name Server:

       Name Server 為 producer 和 consumer 提供路由資訊。

叢集組成

       由下面這張圖可以看到有四個叢集,分别是 NameServer 叢集、Broker 叢集、Producer 叢集和 Consumer 叢集:

rocketmq簡介概念:特性:專業術語:叢集組成入門執行個體:

NameServer:

       提供輕量級的服務發現和路由。 每個 NameServer 記錄完整的路由資訊,提供等效的讀寫服務,并支援快速存儲擴充。 

Broker:

       通過提供輕量級的 Topic 和 Queue 機制來處理消息存儲,同時支援推(push)和拉(pull)模式以及主從結構的容錯機制。 

Producer:

       生産者,産生消息的執行個體,擁有相同 Producer Group 的 Producer 組成一個叢集。 

Consumer:

       消費者,接收消息進行消費的執行個體,擁有相同 Consumer Group 的 Consumer 組成一個叢集。 

簡述:

       簡單說明一下圖中箭頭含義,從 Broker 開始,Broker Master1 和 Broker Slave1 是主從結構,它們之間會進行資料同步,即 Date Sync。同時每個 Broker 與 NameServer 叢集中的所有節點建立長連接配接,定時注冊 Topic 資訊到所有 NameServer 中。

       Producer 與 NameServer 叢集中的其中一個節點(随機選擇)建立長連接配接,定期從 NameServer 擷取 Topic 路由資訊,并向提供 Topic 服務的 Broker Master 建立長連接配接,且定時向 Broker 發送心跳。Producer 隻能将消息發送到 Broker master,但是 Consumer 則不一樣,它同時和提供 Topic 服務的 Master 和 Slave 建立長連接配接,既可以從 Broker Master 訂閱消息,也可以從 Broker Slave 訂閱消息。

RocketMQ 叢集部署模式

1、單 master 模式 

       隻有一個 master 節點,稱不上是叢集,一旦這個 master 節點當機,那麼整個服務就不可用,适合個人學習使用。

2、多 master 模式

       多個 master 節點組成叢集,單個 master 節點當機或者重新開機對應用沒有影響。

       優點:所有模式中性能最高。

       缺點:單個 master 節點當機期間,未被消費的消息在節點恢複之前不可用,消息的實時性就受到影響。 

       注意:使用同步刷盤可以保證消息不丢失,同時 Topic 相對應的 queue 應該分布在叢集中各個節點,而不是隻在某各節點上,否則,該節點當機會對訂閱該 topic 的應用造成影響。

3、多 master 多 slave 異步複制模式

       在多 master 模式的基礎上,每個 master 節點都有至少一個對應的 slave。master 節點可讀可寫,但是 slave 隻能讀不能寫,類似于 mysql 的主備模式。 

        優點: 在 master 當機時,消費者可以從 slave 讀取消息,消息的實時性不會受影響,性能幾乎和多 master 一樣。 

        缺點:使用異步複制的同步方式有可能會有消息丢失的問題。

4、多 master 多 slave 同步雙寫模式 

       同多 master 多 slave 異步複制模式類似,差別在于 master 和 slave 之間的資料同步方式。 

       優點:同步雙寫的同步模式能保證資料不丢失。 

       缺點:發送單個消息 RT 會略長,性能相比異步複制低10%左右。 

       刷盤政策:同步刷盤和異步刷盤(指的是節點自身資料是同步還是異步存儲) 

       同步方式:同步雙寫和異步複制(指的一組 master 和 slave 之間資料的同步) 

       注意:要保證資料可靠,需采用同步刷盤和同步雙寫的方式,但性能會較其他方式低。

入門執行個體:

       1、引入maven依賴

<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.5.8</version>
</dependency>
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-all</artifactId>
    <version>3.5.8</version>
    <type>pom</type>
</dependency>
           

       2、編寫生産者Producer代碼

public class Producer {

	public static void main(String[] args) throws MQClientException, InterruptedException {
	    //需要一個producer group名字作為構造方法的參數,這裡為producer1
	    DefaultMQProducer producer = new DefaultMQProducer("producer1");

	    //設定NameServer位址,此處應改為實際NameServer位址,多個位址之間用;分隔
	    //NameServer的位址必須有,但是也可以通過環境變量的方式設定,不一定非得寫死在代碼裡
	    producer.setNamesrvAddr("127.0.0.1:9876");
	    producer.setVipChannelEnabled(false);

	    //為避免程式啟動的時候報錯,添加此代碼,可以讓rocketMq自動建立topickey
	    producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
	    producer.start();
	    for(int i=0;i<10;i++){
	        try {
	            Message message = new Message("TopicTest", "Tag1", 
	                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

	            SendResult sendResult = producer.send(message);

	            System.out.println("發送的消息ID:" + sendResult.getMsgId() +"--- 發送消息的狀态:" + sendResult.getSendStatus());
	        } catch (Exception e) {
	             e.printStackTrace();
	             Thread.sleep(1000);
	        }
	    }
	    producer.shutdown();
	}
}
           

       3、編寫消費者Customer代碼

public class Consumer {
	private static final String ADDR = "127.0.0.1:9876";
	public static void main(String[] args) throws MQClientException {
	    //設定消費者組
	    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_LRW_DEV_SUBS");
	    consumer.setVipChannelEnabled(false);
	    consumer.setNamesrvAddr(ADDR);
	    //設定消費者端消息拉取政策,表示從哪裡開始消費
	    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
	    //設定消費者拉取消息的政策,*表示消費該topic下的所有消息,也可以指定tag進行消息過濾
	    consumer.subscribe("TopicTest", "*");
	    //消費者端啟動消息監聽,一旦生産者發送消息被監聽到,就列印消息,和rabbitmq中的handlerDelivery類似
	    consumer.registerMessageListener(new MessageListenerConcurrently() {
	        @Override
	        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
	            for (MessageExt messageExt : msgs) {
	                String topic = messageExt.getTopic();
	                String tag = messageExt.getTags();
	                String msg = new String(messageExt.getBody());
	                System.out.println("*********************************");
	                System.out.println("消費響應:msgId : " + messageExt.getMsgId() + ",  msgBody : " + msg + ", tag:" + tag + ", topic:" + topic);
	                System.out.println("*********************************");
	            }
	            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	        }
	    });
	    //調用start()方法啟動consumer
	    consumer.start();
	    System.out.println("Consumer Started....");
	}

}
           

         4、首先啟動消費者,然後啟動生産者,如果啟動消費者沒有報錯,但是啟動消費者報錯了,如下所示:

Caused by: com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.90 CQ:  0.90 INDEX:  0.90, maybe your broker machine memory too small.
For more information, please visit the url, http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:455)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:272)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:253)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:215)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:671)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:440)
	... 35 more
           

       解決辦法: 打開rocketmq安裝目錄,進入bin目錄下,修改runbroker.sh,在裡面增加一句話即可: JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98" 我這裡把磁盤保護的百分比設定成98%,隻有磁盤空間使用率達到98%時才拒絕接收producer消息。 原因如下圖所示,rocketmq預設的磁盤空間警戒水位是90%

rocketmq簡介概念:特性:專業術語:叢集組成入門執行個體:

       5、下面是正常的生産者生産的資料,生産了10條資料

rocketmq簡介概念:特性:專業術語:叢集組成入門執行個體:

       6、下面的列印就是消費者接收到的資料,可以看到消費者端也成功收到了這10條消息,每條消息都會存在一個msgId,是以實際業務中,可以據此回溯查到每一條消息,保證消息的較低的丢失率; 

rocketmq簡介概念:特性:專業術語:叢集組成入門執行個體:

參考部落格:https://blog.csdn.net/zhangcongyi420/article/details/82593982

繼續閱讀