✨ RabbitMQ:路由模式
- 1.基本介紹
- 2.生産者
- 3.消費者
- 4.測試
📃個人首頁:不斷前進的皮卡丘
🌞部落格描述:夢想也許遙不可及,但重要的是追夢的過程,用部落格記錄自己的成長,記錄自己一步一步向上攀登的印記
🔥個人專欄:消息中間件
1.基本介紹
在路由工作模式中,我們需要配置一個類型為direct的交換機,并且需要指定不同的路由鍵(routing key),把對應的消息從交換機路由到不同的消息隊列進行存儲,由消費者進行消費。
- P:生産者,向交換機發送消息的時候,會指定一個routing key
- X:Exchange(交換機),接收生産者的消息,然後把消息傳遞給和routing key完全比對的隊列
- C1:消費者,它所在隊列指定了需要routing key為error的資訊
- C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
路由模式的特點
隊列和交換機的綁定是需要指定routing key的,不可以随意綁定
消息的發送方向交換機發送消息的時候,也需要指定消息的routing key
交換機不再把消息交給每一個綁定的隊列,而是根據消息的routing key來進行判斷,隻有隊列的routing key和消息的routing key完全一樣才會接收到消息。
2.生産者
public class Producer {
public static String DIRECT_EXCHANGE = " direct_exchange";
public static String DIRECT_QUEUE_1 = "direct_queue_1";
public static String DIRECT_QUEUE_2 = "direct_queue_2";
public static void main(String[] args) {
try {
Channel channel = ConnectUtil.getChannel();
//聲明交換機(交換機名稱,交換機類型)
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
//聲明隊列
channel.queueDeclare(DIRECT_QUEUE_1,true,false,false,null);
channel.queueDeclare(DIRECT_QUEUE_2,true,false,false,null);
//把交換機和隊列1進行綁定
channel.queueBind(DIRECT_QUEUE_1,DIRECT_EXCHANGE,"error");
//把交換機和隊列2進行綁定
channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"info");
channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"error");
channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"warning");
//發送消息
String msg="日志資訊:調用了xxx方法,日志級别是info";
channel.basicPublish(DIRECT_EXCHANGE,"info",null,msg.getBytes());
System.out.println("消息發送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
3.消費者
消費者1
public class Consumer1 {
public static void main(String[] args) {
try {
//擷取信道對象
Channel channel = ConnectUtil.getChannel();
//消費消息
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1接收到消息:"+new String(body,"UTF-8"));
System.out.println("消費者1把日志資訊儲存到資料庫");
}
};
channel.basicConsume(Producer.DIRECT_QUEUE_1,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消費者2
public class Consumer2 {
public static void main(String[] args) {
try {
//擷取信道對象
Channel channel = ConnectUtil.getChannel();
//消費消息
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2接收到消息:"+new String(body,"UTF-8"));
System.out.println("消費者2把日志資訊輸出到控制台");
}
};
channel.basicConsume(Producer.DIRECT_QUEUE_2,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
4.測試
第一次測試,發送日志級别為info的資訊
第二次測試,發送日志級别為error的資訊