✨ RabbitMQ:釋出訂閱模式
- 1.訂閱模式基本介紹
- 2.交換機
- 3.釋出訂閱模式
- 3.1基本介紹
- 3.2生産者
- 3.3消費者
- 3.4測試
📃個人首頁:不斷前進的皮卡丘
🌞部落格描述:夢想也許遙不可及,但重要的是追夢的過程,用部落格記錄自己的成長,記錄自己一步一步向上攀登的印記
🔥個人專欄:消息中間件
1.訂閱模式基本介紹
- P:生産者,發送消息給交換機
- C:消費者,接收消息
- X:交換機,一方面接收生産者發送的消息,另一方面知道怎麼處理消息,是否應将其附加到特定隊列?是否應将其附加到多個隊列中?或者它應該被丢棄。其規則由交換類型定義。
- Queue:消息隊列,接收消息,緩存消息
- 每個消費者都監聽自己的隊列
- 生産者把消息發送給broker,然後交換機把消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都将接收到消息。
2.交換機
- RabbitMQ 中消息傳遞模型的核心思想是,生産者從不将任何消息直接發送到隊列。實際上,很多時候,生産者甚至不知道消息是否會傳遞到任何隊列。相反,生産者隻能将消息發送到_交換機_。交換機的工作是一件非常簡單的事情。一方面,它接收來自生産者的消息,另一方面則将它們推送到隊列。交換必須确切地知道如何處理它收到的消息。是否應将其附加到特定隊列?是否應将其附加到多個隊列中?或者它應該被丢棄。其規則由_交換類型_定義。
- 交換機隻負責轉發消息,并沒有存儲消息的能力,是以如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那麼消息會丢失!
交換機類型
- Fanout:廣播,将消息交給所有綁定到交換機的隊列
- Direct:定向,把消息交給符合指定routing key 的隊列
- Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
3.釋出訂閱模式
3.1基本介紹
要配置一個fanout類型的交換機,不需要指定對應的路由key,同時會把消息路由到每一個消息隊列中,每個消息隊列都可以對相同的消息進行存儲,然被由各自的消息隊列相關聯的消費者消費
3.2生産者
public class Producer {
public static String FANOUT_EXCHANGE = " fanout_exchange";
public static String FANOUT_QUEUE_1 = "fanout_queue_1";
public static String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) {
try {
Channel channel = ConnectUtil.getChannel();
//聲明交換機(交換機名稱,交換機類型)
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
//聲明隊列
channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
//把交換機和隊列進行綁定
channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
//發送消息
for (int i = 1; i <=10 ; i++) {
String msg="你好,小兔子,釋出訂閱模式 : "+i;
channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
3.3消費者
消費者1
public class Consumer1 {
public static void main(String[] args) {
try {
Channel channel = ConnectUtil.getChannel();
channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
//把隊列和交換機綁定 隊列名稱,交換機名稱,路由key
channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHANGE, "");
//接受消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 消費回調函數,當收到消息以後,會自動執行這個方法
* @param consumerTag 消費者辨別
* @param envelope 消息包的内容(比如交換機,路由key,消息id等)
* @param properties 屬性資訊
* @param body 消息資料
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));
}
};
//監聽消息(隊列名稱,是否自動确認消息,消費對象)
channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消費者2
public class Consumer2 {
public static void main(String[] args) {
try {
Channel channel = ConnectUtil.getChannel();
channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
//把隊列和交換機綁定 隊列名稱,交換機名稱,路由key
channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE, "");
//接受消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 消費回調函數,當收到消息以後,會自動執行這個方法
* @param consumerTag 消費者辨別
* @param envelope 消息包的内容(比如交換機,路由key,消息id等)
* @param properties 屬性資訊
* @param body 消息資料
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));
}
};
//監聽消息(隊列名稱,是否自動确認消息,消費對象)
channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}