目錄
- 推薦公衆号
- 引言
- 單機版部署流程
-
- 代碼
- 基本概念
-
- 1.消息模型(Message Model)
- 2.消息生産者(Producer)
- 3.消息消費者(Consumer)
- 4.主題(Topic)
- 5.代理伺服器(Broker Server)
- 6.名字服務(Name Server)
- 7.拉取式消費(Pull Consumer)
- 8 推動式消費(Push Consuemr)
- 9.生産者組(Producer Group)
- 10.消費者組(Consumer Group)
- 11.叢集消費(Clustering)
- 12.廣播消費(Broadcasting)
- 13 普通順序消息(Normal Ordered Message)
- 14 嚴格順序消息(Strictly Order Message)
- 15 消息(Message)
- 16 标簽(Tag)
- 總結
推薦公衆号
有彩蛋哦!!!(或者公衆号内點選網賺擷取彩蛋)
引言
為什麼使用消息隊列呢?消峰,解耦,異步這些都是使用消息隊列的好處;但是項目中引入任務一門中間件時都需要考慮其利弊(維護成本是否大,性能是否穩定,社群是否活躍…)。嘿嘿,說這些都是扯淡,在我公司中RocketMQ起着解耦的作用,由于項目中資料服務覆寫放放面面,有許多公司每天都要從我們項目中拿到相關資料。
單機版部署流程
下載下傳zip
編輯着兩個cmd,主要時修改啟動堆大小參數
runserver.cmd
set "JAVA_OPT=%JAVA_OPT% -server -Xms500m -Xmx500m -Xmn250m
-XX:MetaspaceSize=50m -XX:MaxMetaspaceSize=60m"
在其中還可以看到垃圾回收器等資訊
runboker.cmd
set "JAVA_OPT=%JAVA_OPT% -server -Xms500m -Xmx500m -Xmn250m"
啟動mqnameser.cmd
啟動 mqbroker.cmd -n localhost:9876
代碼
main版本生産者
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.Scanner;
public class RocketProducer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("my-group");
producer.setNamesrvAddr("localhost:9876");
producer.setInstanceName("rmq-instance");
producer.start();
try {
Message message = new Message("demo-topic", "demo-tag", "這是一條測試消息".getBytes());
producer.send(message);
while (true) {
String text = new Scanner(System.in).next();
Message msg = new Message("demo-topic",// topic
"demo-tag",// tag
text.getBytes() // body
);
SendResult sendResult = producer.send(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
main版本消費者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setInstanceName("rmq-instance");
consumer.subscribe("demo-topic", "demo-tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
生産
消費
基本概念
過完手瘾之後,還是從頭開始,RocketMQ中有許多概念需要了解一下
非常友好的中文官方文檔
1.消息模型(Message Model)
RocketMQ主要由Producer Broker Consumer三部分組成,其中Producer負責生産消息,Consumer負責消費消息,
Broker負責存儲消息。Broker在實際部署過程中對應一台伺服器,每個Broker可以存儲多個Topic的消息,每個
Topic的消息也可以分片存儲于不同的Broker。Message Queue用于存儲消息的實體位址,每個Topic中的消息位址
存儲于多個Message Queue中。ConsumerGroup由多個Consumer執行個體構成。
2.消息生産者(Producer)
負責生産消息,一般由業務系統負責生産消息。一個消息生産者會把業務應用系統裡産生的消息發送到Broker伺服器。
RockerMQ提供多種發送方式,同步發送,異步發送,順序發送,單向發送。同步和異步方式均需要Broker傳回取人資訊,
單向發送不需要。
3.消息消費者(Consumer)
負責消費消息,一般都是背景系統負責異步消費。一個消息消費者會從Broker伺服器拉取消息,并将其提供給應用程式。
從使用者應用的角度而言提供了兩種消費形式:拉取式消費,推動式消費。
4.主題(Topic)
表示一類消息的集合,每個主題包含若幹條消息,每個消息隻能屬于一個主題,是RockerMQ進行消息訂閱的基本機關。
5.代理伺服器(Broker Server)
消息中轉角色,負責存儲消息,轉發消息。代理伺服器在RockerMQ系統中負責接收從生産者發送來的消息并存儲,
同時為消費者的拉去請求做準備。代理伺服器也存儲消息相關的中繼資料,包括消費者組,消費進度偏移,主題,隊列消息等
6.名字服務(Name Server)
名稱服務充當路由消息的提供者。生産者或消費者能夠通過名字服務查找各主題對應的Broker IP清單。多個Namesrv
執行個體組成繼承,但互相獨立,沒有資訊交換。
7.拉取式消費(Pull Consumer)
Consumer消費的一種類型,應用通常主動調用Consumer的拉消息方法從Broker伺服器拉消息,主動權由應用控制。
一旦擷取了批量消息,應用就會啟動消費過程。
8 推動式消費(Push Consuemr)
Consumer消費的一種類型,該模式下Broker收到資料後會主動推送給消費者端,該消費模式一般實時向比較高
9.生産者組(Producer Group)
同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一緻。如果發送的是事務消息且原始生産者
在發送之後崩潰,則Broker伺服器會聯系同一生産着組的其他生産者執行個體以送出或回溯消費。
10.消費者組(Consumer Group)
同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一緻。消費者組使得在消息消費方面,
實作負載均衡和容錯的目标變得非常容易。要注意的是,消費者組的消費者執行個體必須訂閱完全相同的Topic。
RockerMQ支援兩種消費模式:叢集消費(Clustering)和廣播消費(Broadcasting)。
11.叢集消費(Clustering)
叢集消費模式下,相同Consumer Group的每個Consumer執行個體平均分攤消息。
12.廣播消費(Broadcasting)
廣播消費模式下,相同Consumer Group的每個Consumer執行個體都接收全量的消息
13 普通順序消息(Normal Ordered Message)
普通順序消費模式下,消費者通過同意給消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。
14 嚴格順序消息(Strictly Order Message)
嚴格順序消息模式下,消費者收到的所有消息均是有順序的。
15 消息(Message)
消息系統所傳輸資訊的實體載體,生産和消費資料的最小機關,每條消息必須屬于一個主題。RocketMQ中每個消息
擁有唯一的Message ID,且可以攜帶具有業務辨別的key。系統提供了通過Message ID和Key查詢消息的功能。
16 标簽(Tag)
為消息設定的标志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,
可以根據不同業務目的在同一主題下設定不同标簽。标簽能夠有效地保持代碼的清晰度和連貫性,
并優化RocketMQ提供的查詢系統。消費者可以根據Tag實作對不同子主題的不同消費邏輯,實作更好的擴充性
總結
RocketMQ是阿裡巴巴貢獻給apache項目,已經成為頂級項目,經受過雙十一考驗,底層更是以java編寫,值得學習。其實單機版(main)隻是入門入門級上手,俗話說不高叢集的消息隊列就是在搞流氓