AMQP協定中的核心思想就是生産者和消費者隔離,生産者從不直接将消息發送給隊列。生産者通常不知道是否一個消息會被發送到隊列中,隻是将消息發送到一個交換機。先由Exchange來接收,然後Exchange按照特定的政策轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就類似于一個交換機,轉發各個消息分發到相應的隊列中。
RabbitMQ提供了四種Exchange模式:fanout、direct、topic、header 。 header模式在實際使用中較少。
Direct Exchange – 處理路由鍵。需要将一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全比對。這是一個完整的比對。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則隻有被标記為“dog”的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,隻會轉發dog。

- Channel channel = connection.createChannel();
- channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic
- channel.queueDeclare("queueName");
- channel.queueBind("queueName", "exchangeName", "routingKey");
- byte[] messageBodyBytes = "hello world".getBytes();
- //需要綁定路由鍵
- channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
Fanout Exchange – 不處理路由鍵。你隻需要簡單的将隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每台子網内的主機都獲得了一份複制的消息。Fanout交換機轉發消息是最快的。
- channel.exchangeDeclare("exchangeName", "fanout"); //direct fanout topic
- channel.queueDeclare("queueName1");
- channel.queueBind("queueName1", "exchangeName", "routingKey1");
- //路由鍵需要設定為空
- channel.basicPublish("exchangeName", "", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
Topic Exchange – 将路由鍵和某模式進行比對。此時隊列需要綁定要一個模式上。符号“#”比對一個或多個詞,符号“*”比對不多不少一個詞。是以“audit.#”能夠比對到“audit.irs.corporate”,但是“audit.*” 隻會比對到“audit.irs”。我在RedHat的朋友做了一張不錯的圖,來表明topic交換機是如何工作的:
- channel.exchangeDeclare("exchangeName", "topic"); //direct fanout topic
- channel.queueBind("queueName", "exchangeName", "routingKey.*");
- channel.basicPublish("exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
header exchange:
Headers類型的exchange使用的比較少,它也是忽略routingKey的一種路由方式。是使用Headers來比對的。Headers是一個鍵值對,可以定義成Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者比對的話,則對應的隊列就可以收到消息。比對有兩種方式all和any。這兩種方式是在接收端必須要用鍵值"x-mactch"來定義。all代表定義的多個鍵值對都要滿足,而any則代碼隻要滿足一個就可以了。fanout,direct,topic exchange的routingKey都需要要字元串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型。
1.生産者Producer
- package cn.slimsmart.rabbitmq.demo.headers;
- import java.util.Date;
- import java.util.Hashtable;
- import java.util.Map;
- import org.springframework.amqp.core.ExchangeTypes;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.AMQP.BasicProperties.Builder;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class Producer {
- private final static String EXCHANGE_NAME = "header-exchange";
- @SuppressWarnings("deprecation")
- public static void main(String[] args) throws Exception {
- // 建立連接配接和頻道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.36.102");
- // 指定使用者 密碼
- factory.setUsername("admin");
- factory.setPassword("admin");
- // 指定端口
- factory.setPort(AMQP.PROTOCOL.PORT);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //聲明轉發器和類型headers
- channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);
- String message = new Date().toLocaleString() + " : log something";
- Map<String,Object> headers = new Hashtable<String, Object>();
- headers.put("aaa", "01234");
- Builder properties = new BasicProperties.Builder();
- properties.headers(headers);
- // 指定消息發送到的轉發器,綁定鍵值對headers鍵值對
- channel.basicPublish(EXCHANGE_NAME, "",properties.build(),message.getBytes());
- System.out.println("Sent message :'" + message + "'");
- channel.close();
- connection.close();
- }
- }
2.消費者Consumer.java
- import com.rabbitmq.client.QueueingConsumer;
- public class Consumer {
- private final static String QUEUE_NAME = "header-queue";
- channel.queueDeclare(QUEUE_NAME,false, false, true,null);
- Map<String, Object> headers = new Hashtable<String, Object>();
- headers.put("x-match", "any");//all any
- headers.put("bbb", "56789");
- // 為轉發器指定隊列,設定binding 綁定header鍵值對
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", headers);
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定接收者,第二個參數為自動應答,無需手動應答
- channel.basicConsume(QUEUE_NAME, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(message);
- }