一 RocketMQ介紹
rocketmq是阿裡巴巴開源的一款分布式的消息中間件,他源于jms規範但是不遵守jms規範。對于分布式隻一點,如果你了用過其他mq并且了解過rocketmq,就知道rocketmq天生就是分布式的,可以說是broker、provider、consumer等各種分布式。
二 RocketMQ優點:
1、 rmq去除對zk的依賴
2、 rmq支援異步和同步兩種方式刷磁盤
3、 rmq單機支援的隊列或者topic數量是5w
4、 rmq支援消息重試
5、 rmq支援嚴格按照一定的順序發送消息
6、 rmq支援定時發送消息
7、 rmq支援根據消息ID來進行查詢
8、 rmq支援根據某個時間點進行消息的回溯
9、 rmq支援對消息服務端的過濾
10、 rmq消費并行度:順序消費 取決于queue數量,亂序消費 取決于consumer數量
三 啟動RocketMQ
安裝ROcketMQ請參考《
RocketMQ在windows環境下的安裝與配置》 啟動namesrv
四 建立項目
(1)pom.xml檔案如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>spring-cloud</groupId>
<artifactId>sc-rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>sc-rocketmq</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
(2)消費者
package test;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setInstanceName("rmq-instance");
consumer.subscribe("log-topic", "user-tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消費者消費資料:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
(3)提供者
package test;
import java.io.Serializable;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import com.alibaba.fastjson.JSON;
/**
* @Function 消息生産者
*/
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("test-group");
producer.setNamesrvAddr("localhost:9876");
producer.setInstanceName("rmq-instance");
producer.start();
try {
for (int i = 0; i < 100; i++) {
User user = new User();
user.setLoginName("abc" + i);
user.setPwd(String.valueOf(i));
Message message = new Message("log-topic", "user-tag", JSON.toJSONString(user).getBytes());
System.out.println("生産者發送消息:" + JSON.toJSONString(user));
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
/**
* 發送使用者消息
*/
static class User implements Serializable {
private String loginName;
private String pwd;
public String getLoginName() {
return loginName;
}
public void setLoginName(String loginName) {
this.loginName = loginName;
}
public String getPwd() {
return pwd;
}
public void setPwd(String pwd) {
this.pwd = pwd;
}
}
}
接下來先啟動消費者,然後再啟動生産者,看一下效果
生産者控制台發送消息:
消費者控制台消費消息
Spring Cloud 2.x系列之內建RocketMQ:
https://blog.csdn.net/qq_18603599/article/details/81172866
https://blog.csdn.net/wd2014610/article/details/81781109
本文作者: java樂園
本文來自雲栖社群合作夥伴“
JAVA樂園”,了解相關資訊可以關注“
”