✨ RabbitMQ:Topics主題/通配符模式
- 1.基本介紹
- 2.生産者
- 3.消費者
- 4.測試
📃個人首頁:不斷前進的皮卡丘
🌞部落格描述:夢想也許遙不可及,但重要的是追夢的過程,用部落格記錄自己的成長,記錄自己一步一步向上攀登的印記
🔥個人專欄:消息中間件
1.基本介紹
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5CN2ATM4IWOllzNmZGNhBDNzYzX3ITNxYDM4AzLcBTMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
- Topic類型與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。隻不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符
- Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
- 通配符規則:
- #:比對0個或者多個詞
- *:剛好可以比對一個詞
2.生産者
public class Producer {
public static String TOPIC_EXCHANGE = "topic_exchange";
public static String TOPIC_QUEUE_1 = "topic_queue_1";
public static String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) {
try {
Channel channel = ConnectUtil.getChannel();
//聲明交換機(交換機名稱,交換機類型)
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
//聲明隊列
channel.queueDeclare(TOPIC_QUEUE_1,true,false,false,null);
channel.queueDeclare(TOPIC_QUEUE_2,true,false,false,null);
//把交換機和隊列1進行綁定
channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHANGE,"#.error");
//把交換機和隊列2進行綁定
channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHANGE,"order.*");
channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHANGE,"*.orange.*");
channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHANGE,"*.*");
//發送消息
String msg="日志資訊:調用了xxx方法,日志級别是error";
channel.basicPublish(TOPIC_EXCHANGE,"error",null,msg.getBytes());
System.out.println("消息發送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
3.消費者
消費者1
public class Consumer1 {
public static void main(String[] args) {
try {
//擷取信道對象
Channel channel = ConnectUtil.getChannel();
//消費消息
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1接收到消息:"+new String(body,"UTF-8"));
System.out.println("消費者1把日志資訊儲存到資料庫");
}
};
channel.basicConsume(Producer.TOPIC_QUEUE_1,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消費者2
public class Consumer2 {
public static void main(String[] args) {
try {
//擷取信道對象
Channel channel = ConnectUtil.getChannel();
//消費消息
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2接收到消息:"+new String(body,"UTF-8"));
System.out.println("消費者2把日志資訊儲存到資料庫");
}
};
channel.basicConsume(Producer.TOPIC_QUEUE_2,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}