路由模式:
1 、每個消費者監聽自己的隊列,并且設定routingkey。
2 、生産者将消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列。
聲明exchange_routing_inform交換機。
聲明兩個隊列并且綁定到此交換機,綁定時需要指定routingkey
發送消息時需要指定routingkey
生産者:
package cn.pika.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生産者Routing
*/
public class ProducerRouting {
//隊列的名稱
private static final String QUEUE_INFORM_EMAIL="queue_inform_email"; //郵箱隊列名
private static final String QUEUE_INFORM_SMS="queue_inform_sms"; //短信隊列名
//交換機名稱
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
public static void main(String[] args) {
Connection connection=null;
Channel channel=null;
try {
//1,建立連結的工廠對象
ConnectionFactory factory = new ConnectionFactory();
//1.1 設定連結的參數
factory.setHost("localhost"); //本地連結位址
factory.setPort(5672); //端口号
factory.setUsername("guest"); //使用者名
factory.setPassword("guest"); //密碼
factory.setVirtualHost("/"); //rabbitmq預設虛拟機名稱為“/”,虛拟機相當于一個獨立的mq伺服器
//1.2 建立連結對象
connection =factory.newConnection();
//2,建立與Exchange(交換機)的通道,每個連接配接可以建立多個通道,每個通道代表一個會話任務
channel =connection.createChannel();
//3,聲明交換機及其類型
/**
* 參數1:交換機的名稱
* 參數2:交換機的類型:fanout:不處理路由鍵,隊列在綁定交換機的時候,不需要指定路由鍵,寫空字元串即可,交換機在接受到資訊以後會轉發
* 到綁定了該交換機的所有隊列中
* topic:将路由鍵和某模式進行比對,不需要綁定交換機,可以使用通配符
* direct:處理路由鍵,隊列在綁定到交換機上的時候,需要指定路由鍵,交換機在接受到資訊以後隻會轉發到和該路由鍵比對的隊列中
* headers:不處理路由鍵。而是根據發送的消息内容中的headers屬性進行比對
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//4,聲明一個隊列,如果Rabbit中沒有此隊列将自動建立
/**
* 參數介紹:
* * param1:隊列名稱
* * param2:是否持久化,若持久化,mq重新開機後該隊列仍然存在
* * param3:隊列是否獨占此連接配接,隊列隻允許在該連接配接中通路,如果連接配接關閉,則隊列自動删除
* * param4:隊列不再使用時是否自動删除此隊列
* * param5:擴充參數
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); //聲明一個郵件隊列
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //聲明一個消息隊列
//5,綁定隊列
/**
* 參數1:隊列名稱
* 參數2:交換機名稱
* 參數3:路由鍵(路由key)
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); //将email隊列綁定到交換機上
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS); //将sms隊列綁定到交換機上
//6,發送消息
for(int i=0;i<10;i++){
//6.1定義發送的消息内容
String message="郵箱----- inform to 本次消費99元"+i;
//6.2 發送消息
/**
* * param1:Exchange(交換機)的名稱,如果沒有指定,則使用Default Exchange 注意:不可以寫null
* * param2:routingKey,消息的路由Key,是用于Exchange(交換機)将消息轉發到指定的消息隊列
* * param3:消息包含的屬性,沒有可以寫null
* * param4:消息體,要發送的消息,位元組數組 byte【】類型
*/
channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL,null,message.getBytes());
System.out.println("發送成功,消息内容是:"+message);
}
for(int i=0;i<10;i++){
//6.1定義發送的消息内容
String message="sms----- inform to 本次消費99元"+i;
//6.2 發送消息
/**
* * param1:Exchange(交換機)的名稱,如果沒有指定,則使用Default Exchange 注意:不可以寫null
* * param2:routingKey,消息的路由Key,是用于Exchange(交換機)将消息轉發到指定的消息隊列
* * param3:消息包含的屬性,沒有可以寫null
* * param4:消息體,要發送的消息,位元組數組 byte【】類型
*/
channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS,null,message.getBytes());
System.out.println("發送成功,消息内容是:"+message);
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
try {
//5,關閉連接配接對象,倒着關!
if(channel != null){
channel.close();
}
if(connection != null){
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}
消費者email:
package cn.pika.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消費者--email
*/
public class ConsumerEmailRouting {
//隊列的名稱
private static final String QUEUE_INFORM_EMAIL="queue_inform_email"; //郵箱隊列名
//交換機名稱
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//1,建立連結的工廠對象
ConnectionFactory factory = new ConnectionFactory();
//1.1 設定連結的參數
factory.setHost("localhost"); //本地連結位址
factory.setPort(5672); //端口号
factory.setUsername("guest"); //使用者名
factory.setPassword("guest"); //密碼
factory.setVirtualHost("/"); //rabbitmq預設虛拟機名稱為“/”,虛拟機相當于一個獨立的mq伺服器
//1.2 建立連結對象
connection =factory.newConnection();
//2,建立與Exchange(交換機)的通道,每個連接配接可以建立多個通道,每個通道代表一個會話任務
channel =connection.createChannel();
//3,聲明交換機及其類型
/**
* 參數1:交換機的名稱
* 參數2:交換機的類型:fanout:不處理路由鍵,隊列在綁定交換機的時候,不需要指定路由鍵,寫空字元串即可,交換機在接受到資訊以後會轉發
* 到綁定了該交換機的所有隊列中
* topic:将路由鍵和某模式進行比對,不需要綁定交換機,可以使用通配符
* direct:處理路由鍵,隊列在綁定到交換機上的時候,需要指定路由鍵,交換機在接受到資訊以後隻會轉發到和該路由鍵比對的隊列中
* headers:不處理路由鍵。而是根據發送的消息内容中的headers屬性進行比對
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//4,聲明一個隊列,如果Rabbit中沒有此隊列将自動建立
/**
* 參數介紹:
* * param1:隊列名稱
* * param2:是否持久化,若持久化,mq重新開機後該隊列仍然存在
* * param3:隊列是否獨占此連接配接,隊列隻允許在該連接配接中通路,如果連接配接關閉,則隊列自動删除
* * param4:隊列不再使用時是否自動删除此隊列
* * param5:擴充參數
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
//5,綁定隊列
/**
* 參數1:隊列名稱
* 參數2:交換機名稱
* 參數3:路由鍵(路由key)
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); //将email隊列綁定到交換機上
//6,定義消費方法,監聽到隊列中有消息以後要做什麼
DefaultConsumer consumer = new DefaultConsumer(channel){
//4.1重寫handleDelivery()方法,消費着接受消息以後會自動調用此方法
/**
*
* @param consumerTag 消費者的标簽,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可從中擷取消息id,消息routingkey,交換機,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties
* @param body 消息體(接受到的消息内容)
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//4.2 擷取接收到的消息
String message=new String(body);
System.out.println("接收到的消息是:"+message);
}
};
//5,監聽隊列
/**
* 參數1:String queue:隊列名稱
* 參數2:boolean autoAck:是否自動回複,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動回複
* 參數3:Consumer callback:消費方法,消費者接收到消息後調用此方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
消費者sms:
package cn.pika.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消費者--email
*/
public class ConsumerSmsRouting {
//隊列的名稱
private static final String QUEUE_INFORM_SMS="queue_inform_sms"; //短信隊列名
//交換機名稱
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//1,建立連結的工廠對象
ConnectionFactory factory = new ConnectionFactory();
//1.1 設定連結的參數
factory.setHost("localhost"); //本地連結位址
factory.setPort(5672); //端口号
factory.setUsername("guest"); //使用者名
factory.setPassword("guest"); //密碼
factory.setVirtualHost("/"); //rabbitmq預設虛拟機名稱為“/”,虛拟機相當于一個獨立的mq伺服器
//1.2 建立連結對象
connection =factory.newConnection();
//2,建立與Exchange(交換機)的通道,每個連接配接可以建立多個通道,每個通道代表一個會話任務
channel =connection.createChannel();
//3,聲明交換機及其類型
/**
* 參數1:交換機的名稱
* 參數2:交換機的類型:fanout:不處理路由鍵,隊列在綁定交換機的時候,不需要指定路由鍵,寫空字元串即可,交換機在接受到資訊以後會轉發
* 到綁定了該交換機的所有隊列中
* topic:将路由鍵和某模式進行比對,不需要綁定交換機,可以使用通配符
* direct:處理路由鍵,隊列在綁定到交換機上的時候,需要指定路由鍵,交換機在接受到資訊以後隻會轉發到和該路由鍵比對的隊列中
* headers:不處理路由鍵。而是根據發送的消息内容中的headers屬性進行比對
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//4,聲明一個隊列,如果Rabbit中沒有此隊列将自動建立
/**
* 參數介紹:
* * param1:隊列名稱
* * param2:是否持久化,若持久化,mq重新開機後該隊列仍然存在
* * param3:隊列是否獨占此連接配接,隊列隻允許在該連接配接中通路,如果連接配接關閉,則隊列自動删除
* * param4:隊列不再使用時是否自動删除此隊列
* * param5:擴充參數
*/
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//5,綁定隊列
/**
* 參數1:隊列名稱
* 參數2:交換機名稱
* 參數3:路由鍵(路由key)
*/
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS); //将email隊列綁定到交換機上
//6,定義消費方法,監聽到隊列中有消息以後要做什麼
DefaultConsumer consumer = new DefaultConsumer(channel){
//4.1重寫handleDelivery()方法,消費着接受消息以後會自動調用此方法
/**
*
* @param consumerTag 消費者的标簽,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可從中擷取消息id,消息routingkey,交換機,消息和重傳标志(收到消息失敗後是否需要重新發送)
* @param properties
* @param body 消息體(接受到的消息内容)
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//4.2 擷取接收到的消息
String message=new String(body);
System.out.println("接收到的消息是:"+message);
}
};
//5,監聽隊列
/**
* 參數1:String queue:隊列名稱
* 參數2:boolean autoAck:是否自動回複,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動回複
* 參數3:Consumer callback:消費方法,消費者接收到消息後調用此方法
*/
channel.basicConsume(QUEUE_INFORM_SMS,true,consumer);
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}