作者:俏巴
概述
初次使用AMQP的過程中,總是容易被AMQP支援的消息模型繞暈,這裡結合官方的教程,對AMQP的消息模型做一個簡要總結,供參考。目前官方給出了六種消息發送/接收
模型,這裡主要介紹前五種消息模型。
消息模型
1、Hello World
簡單模式就是生産者将消息發送到隊列、消費者從隊列中擷取消息。一條消息對應一個消費者。

示例代碼說明:
測試使用的是阿裡雲的AMQP消息隊列服務,具體的代碼配置過程可以參考阿裡雲官方 連結 。
工具類
import AMQP.AliyunCredentialsProvider;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception{
// 初始化參數設定
String AccessKey= "********";
String SecretKey = "********";
Long Uid = ********16617278L;
String VhostName = "********";
String host = "********16617278.mq-amqp.cn-hangzhou-a.aliyuncs.com";
// 定義連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 設定服務位址
connectionFactory.setHost(host);
// 端口
connectionFactory.setPort(5672);
// 設定使用者名、密碼、vhost
connectionFactory.setCredentialsProvider(new AliyunCredentialsProvider(AccessKey,SecretKey,Uid));
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(5000);
connectionFactory.setVirtualHost(VhostName);
// 通過工廠擷取連接配接對象
Connection connection = connectionFactory.newConnection();
return connection;
}
}
發送端示例代碼
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
// hello world 單個消費者和接收者
public class Send {
private final static String Queue_name = "helloDemo";
public static void main(String[] args) throws Exception {
// 擷取連接配接及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Queue_name,false,false,false,null);
//消息内容
String message = "Hello World!";
// 1、交換機,此處無 2、發送到那個隊列 3、屬性 4、消息内容
channel.basicPublish("",Queue_name,null,message.getBytes());
System.out.println("發送資料:" + message);
// 關閉連接配接
channel.close();
connection.close();
}
}
消費端示例代碼
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Receiver {
private final static String Queue_name = "helloDemo";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 開始消費消息
channel.basicConsume(Queue_name, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,進行業務邏輯處理
System.out.println("message receive: ");
System.out.println("Received: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
Thread.sleep(100000);
channel.close();
connection.close();
}
}
2、Work Queues
一條消息可以被多個消費者嘗試接收,最終隻有一個消費者能夠擷取到消息。
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
// 1:N 消費者各自接收消息
public class Sender {
private final static String queueName = "workQueue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(queueName,false,false,false,null);
for (int i = 0; i < 100; i++) {
String message = "workqueues message " + i;
channel.basicPublish("",queueName,null,message.getBytes());
System.out.println("發送消息: " + message);
Thread.sleep(10);//休眠
}
// 關閉連接配接
channel.close();
connection.close();
}
}
消費端示例代碼1
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Receiver1 {
private final static String queueName = "workQueue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(queueName,false,false,false,null);
channel.basicQos(1);//告訴伺服器,在沒有确認目前消息完成之前,不要給我發新的消息。
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,進行業務邏輯處理
System.out.println("message receive1: ");
System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);// 參數2 false為确認收到消息, true為拒絕收到消息
}
};
channel.basicConsume(queueName,false,consumer);// 參數2 手動确認,代表我們收到消息後需要手動确認告訴伺服器我們收到消息了
}
}
消費端示例代碼2
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Receiver2 {
private final static String queueName = "workQueue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(queueName,false,false,false,null);
channel.basicQos(1);//告訴伺服器,在沒有确認目前消息完成之前,不要給我發新的消息。
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,進行業務邏輯處理
System.out.println("message receive2: ");
System.out.println("Received2: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);// 參數2 false為确認收到消息, true為拒絕收到消息
}
};
channel.basicConsume(queueName,false,consumer);// 參數2 手動确認,代表我們收到消息後需要手動确認告訴伺服器我們收到消息了
}
}
3、Publish/Subscribe
一條消息可以被多個消費者同時擷取,生産者将消息發送給交換機,消費者将自己對應的隊列注冊到交換機,當發送消息後,所有注冊的隊列的消費者都可以收到消息。
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Sender {
private static String Exchange_Name = "ExchangeDemo";//聲明交換機
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明exchange
// Producer 将消息發送到 Exchange ,由 Exchange 将消息路由到一個或多個 Queue 中(或者丢棄),Exchange 按照相應的 Binding 邏輯将消息路由到 Queue。
channel.exchangeDeclare(Exchange_Name,"fanout");
String message = "Exchange message demo";
// 消息發送端交換機,如果此時沒有隊列綁定,則消息會丢失,因為交換機沒有存儲消息的能力
channel.basicPublish(Exchange_Name,"",null,message.getBytes());
System.out.println("發送消息: " + message);
}
}
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Sub1 {
private static String Exchange_Name = "ExchangeDemo";//聲明交換機
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("testqueue1",false,false,false,null);
// 綁定到交換機
channel.queueBind("testqueue1",Exchange_Name,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException{
System.out.println("sub1: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("testqueue1",false,consumer);
}
}
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Sub2 {
private static String Exchange_Name = "ExchangeDemo";//聲明交換機
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("testqueue2",false,false,false,null);
// 綁定到交換機
channel.queueBind("testqueue2",Exchange_Name,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException{
System.out.println("sub2: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("testqueue2",false,consumer);
}
}
4、Routing
生産者将消息發送到type為direct模式的交換機,消費者的隊列将自己綁定到路由的時候給自己綁定一個key,隻有生産者發送的消息key和綁定的key一緻時,消費者才能收到對應的消息。
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Sender {
private static final String ExchangeName = "Rout_Change";//路由消息交換機
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(ExchangeName,"direct");
channel.basicPublish(ExchangeName,"key3",null,"route 消息".getBytes());
channel.close();
connection.close();
}
}
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Sub {
private static final String ExchangeName = "Rout_Change";//路由消息交換機
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare("testroutequeue1",false,false,false,null);
// 綁定交換機
// 參數3 标記 綁定到交換機的時候會有一個标記,隻有和它一樣标記的消息才會别消費到
channel.queueBind("testroutequeue1",ExchangeName,"key1");
channel.queueBind("testroutequeue1",ExchangeName,"key2");//綁定多個标記
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,進行業務邏輯處理
System.out.println("message route receive1: ");
System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);// 參數2 false為确認收到消息, true為拒絕收到消息
}
};
channel.basicConsume("testroutequeue1",false,consumer);// 參數2 手動确認,代表我們收到消息後需要手動确認告訴伺服器我們收到消息了
}
}
5、Topics
該類型與 Direct 類型相似,隻是規則沒有那麼嚴格,可以模糊比對和多條件比對,即該類型 Exchange 使用 Routing key 模式比對和字元串比較的方式将消息路由至綁定的 Queue。
示例:
Routing key 為 use.stock 的消息會轉發給綁定比對模式為 .stock, use.stock, . 和 #.use.stock.# 的 Queue; 表是比對一個任意詞組,# 表示比對 0 個或多個詞組。
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Sender {
private static String Exchange_Name = "Exchange_Topic";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明exchange類型為Topic 也就是通配符模式
channel.exchangeDeclare(Exchange_Name,"topic");
channel.basicPublish(Exchange_Name,"abc.1.2",null,"Topic 模式消息".getBytes());
// 關閉通道和連接配接
channel.close();
connection.close();
}
}
接收端示例代碼
import AMQP.RabbitMQTutorials.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Sub {
private static String ExchangeName = "Exchange_Topic";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare("topicqueue",false,false,false,null);
// 綁定交換機
// 參數3 标記 綁定到交換機的時候會有一個标記,隻有和它一樣标記的消息才會别消費到
channel.queueBind("topicqueue",ExchangeName,"key.*");
channel.queueBind("topicqueue",ExchangeName,"abc.#");//綁定多個标記
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,進行業務邏輯處理
System.out.println("message route receive1: ");
System.out.println("Received1: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);// 參數2 false為确認收到消息, true為拒絕收到消息
}
};
channel.basicConsume("topicqueue",false,consumer);// 參數2 手動确認,代表我們收到消息後需要手動确認告訴伺服器我們收到消息了
}
}
參考連結
RabbitMQ Tutorials Rabbitmq企業級消息隊列視訊課程