前言
我們建構了一個簡單的日志記錄系統。我們能夠向許多接收者廣播日志消息。在本節我們将向其中添加一些特别的功能-比方說我們隻讓某個消費者訂閱釋出的部分消息。例如我們隻把嚴重錯誤消息定向存儲到日志檔案(以節省磁盤空間),同時仍然能夠在控制台上列印所有日志消息。
綁定是交換機和隊列之間的橋梁關系。也可以這麼了解:
隊列隻對它綁定的交換機的消息感興趣。綁定用參數:routingKey來表示也可稱該參數為binding key,建立綁定我們用代碼:channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");綁定之後的意義由其交換類型決定。
Direct exchange介紹
我們希望将日志消息寫入磁盤的程式僅接收嚴重錯誤(errros),而不存儲哪些警告(warning)或資訊(info)日志消息避免浪費磁盤空間。Fanout這種交換類型并不能給我們帶來很大的靈活性-它隻能進行無意識的廣播,在這裡我們将使用direct這種類型來進行替換,這種類型的工作方式是,消息隻去到它綁定的routingKey隊列中去。

在上面這張圖中,我們可以看到X綁定了兩個隊列,綁定類型是direct。隊列Q1綁定鍵為orange,隊列Q2綁定鍵有兩個:一個綁定鍵為black,另一個綁定鍵為green.
在這種綁定情況下,生産者釋出消息到exchange上,綁定鍵為orange的消息會被釋出到隊列
Q1。綁定鍵為blackgreen和的消息會被釋出到隊列Q2,其他消息類型的消息将被丢棄。
多重綁定
當然如果exchange的綁定類型是direct,但是它綁定的多個隊列的key如果都相同,在這種情況下雖然綁定類型是direct但是它表現的就和fanout有點類似了,就跟廣播差不多,如下圖所示。
實戰代碼
工具類:
public class untils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.231.132");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
消費者1:
public class ReceiveLogsDirect01 {
private static final String EXCHANG_NAME="direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
/**
* 聲明交換機
*/
channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.DIRECT);
String queueName="disk";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANG_NAME,"error");
System.out.println("正在準備接收消息....");
DeliverCallback deliverCallback=(Consumer,delivert)->
{
String s = new String(delivert.getBody());
System.out.println("錯誤消息--->:"+s);
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{} );
}
}
消費者2:
public class ReceiveLogsDirect02 {
private static final String EXCHANG_NAME="direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
/**
* 聲明交換機
*/
channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.DIRECT);
String queueName="console";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANG_NAME,"info");
channel.queueBind(queueName,EXCHANG_NAME,"warning");
System.out.println("正在準備接收消息....");
DeliverCallback deliverCallback=(Consumer, delivert)->
{
String s = new String(delivert.getBody());
System.out.println("info warning消息--->:"+s);
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{} );
}
}
生産者:
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args)throws Exception {
Channel channel = untils.getChannel();
//建立多個bindingKeyMap
Map<String,String> bindingKeyMap=new HashMap<>();
bindingKeyMap.put("info","普通info消息");
bindingKeyMap.put("warning","警告warning消息");
bindingKeyMap.put("error","錯誤error消息");
//debug沒有的消費者接收這個消息,是以就失去
bindingKeyMap.put("debug","調試debug消息");
for (Map.Entry<String,String> bindingKeyEntry:bindingKeyMap.entrySet())
{
String bindingKey=bindingKeyEntry.getKey();
String message=bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生産者發出消息"+message);
}
}
}
結果: