Rabbit MQ 用戶端發送消息
- 1. 連接配接 Rabbit MQ
- 2. 使用交換器和隊列
-
- 2.1 Channel 接口的 API 方法重載
-
- 2.1.1 關于 exchangeDeclare 方法
- 2.1.2 關于 queueDeclare 方法
- 2.1.3 關于 queueBind 方法
- 3. 發送消息 channel.basicPublish 方法
1. 連接配接 Rabbit MQ
直接上核心代碼:
@Component
public class RabbitMQConfig {
/**
* 注入配置檔案屬性
*/
@Value("${spring.rabbitmq.addresses}")
String addresses;//MQ位址
@Value("${spring.rabbitmq.username}")
String username;//MQ登入名
@Value("${spring.rabbitmq.password}")
String password;//MQ登入密碼
@Value("${spring.rabbitmq.virtual-host}")
String vHost;//MQ的虛拟主機名
@Value("${spring.rabbitmq.port}")
String port;//MQ的虛拟主機名
@Bean
public Connection getConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vHost);
factory.setHost(addresses);
factory.setPort(port);
Connection conn = factory.newConnection();
return conn;
}
}
也可以采用 URI 的方式來實作,執行個體代碼如下:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:[email protected]:portNumber/virtualHost");
Connection conn = factory.newConnection();
/** Connection 接口被用來建立一個 Channel , Channel 可以用來發送或接收消息*/
Channel channel = conn.createChannel();
注意要點:
Connection 可以用來建立多個 Channel 執行個體,但是 Channel 執行個體不能線上程間共享,應用程式應該為每個線程開辟一個 Channel 。某些情況下 Channel 的操作可以并發運作,但在其他情況下會導緻在網絡出現錯誤的通信幀交錯,同時也會影響發送方确認(publisherconfirm)機制的運作,是以多線程間共享 Channel 執行個體是非線程安全的。
2. 使用交換器和隊列
交換器和隊列是 AMQP 中 high-level 層面的構模組化塊,應用程式需確定在使用它們的時候就已經存在了,故在使用前需要先聲明(declare)它們。
// 聲明交換器 (非持久化的、非自動删除的、綁定類型為 direct 的交換器)
channel.exchangeDeclare(exchangeName, "direct", true);
// 隊列名 (建立了一個非持久化的、排他的、自動删除的隊列,該隊列的名稱是由 Rabbit MQ 自動生成的)
String queueName = channel.queueDeclare().getQueue();
// 使用路由鍵(routingKey)将隊列和交換器綁定起來
channel.queueBind(queueName, exchangeName, routingKey);
以上聲明的隊列有以下特性:隻對目前應用中同一個 Connection 層面可用,同一個 Connection 的不同 Channel 可共用,并且也會在應用連接配接斷開時自動删除。
如果要在應用中共享一個隊列,可以做以下聲明:
/** 聲明一個持久化、非排他的、非自動删除的隊列,此處的隊列名稱不是 Rabbit MQ 自動生成 */
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, ture, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
生産者和消費者都可以聲明一個交換器或者隊列。如果嘗試聲明一個已經存在的交換器或者隊列,隻需要聲明的參數完全比對現存的交換器或者隊列, Rabbit MQ 就可以啥都不做,并成功傳回。如果聲明的參數不比對則會抛出異常。
2.1 Channel 接口的 API 方法重載
Channel 的 API 方法都是可以重載的,例如:exchangeDeclare、queueDeclare 等。根據參數不同,可以有不同的重載形式,可根據具體業務需要,選擇性進行調用。
2.1.1 關于 exchangeDeclare 方法
exchangeDeclare 有多個重載方法,這些重載方法都是由下面這個方法中預設的某些參數構成。
/** 傳回值 Exchange.DeclareOk 是用來辨別成功聲明了一個交換器 */
Exchange.DeclareOk exchangeDeclare(Stirng exchange, String type, boolean durable,
boolean autoDelete, boolean internal, Map<String, Oject> arguments)
throws IOException;
上面方法中,各參數詳細說明如下所述:
- exchange:交換器的名稱。
- type:交換器的類型,常見的有 fanout、direct、topic.
- durable:設定是否持久化。durable 設定為 true 時表示持久化,否則非持久化。持久化可以将交換器存盤,在伺服器重新開機的時候不會丢失相關資訊。
- autoDelete:設定是否自動删除。 autoDelete 設定為 true 時表示自動删除。自動删除的前提是至少有一個隊列或者交換器與這個交換器綁定,之後所有與這個交換器綁定的隊列或者交換器都與此解綁。需要注意的是,不能錯誤的把這個參數了解為:“當與此交換器連接配接的用戶端都斷開時,Rabbit MQ 會自動删除本交換器”。
- internal:設定是否是内置的。如果設定為 true ,則表示為内置的交換器,用戶端程式無法直接發送消息到這個交換器中,隻能通過交換器路由到交換器這種方式。
- argument:其他一些結構化參數,比如 alternate-exchange 等。
exchangeDeclare 的其他幾個常用重載方法如下:(這裡僅為部分重載方法)
- Exchange.DeclareOk exchangeDeclare (String exchange, String type) throws IOException;
- Exchange.DeclareOk exchangeDeclare (String exchange, String type, boolean durable) throws IOException;
- Exchange.DeclareOk exchangeDeclare (String exchange, String type, boolean durable, boolean autoDelete, Map<Stirng, Object> arguments) throws IOException;
關于删除交換器的方法:
- Exchange.DeleteOk exchangeDelete (String exchange) throws IOException;
- Exchange.DeleteOk exchangeDelete (String exchange, boolean ifUnused) throws IOException;
以上方法參數說明:
- exchange:表示交換器名稱。
- ifUnused:用來設定是否在交換器沒有被使用的情況下删除。如果 ifUnused 設定為 true,則隻有在此交換器沒有被使用的情況下才會被删除;若為 false 則無論如何這個交換器都要被删除。
2.1.2 關于 queueDeclare 方法
queueDeclare 方法隻有兩個重載方法:
/**該不帶參數的方法,預設建立一個由Rabbit MQ命名的排他的、自動删除的、非持久化隊列(匿名隊列)*/
Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<String, Object> arguments) throws IOException;
以下是 queueDeclare 方法中參數的詳細說明:
- queue:隊列的名稱。
- durable:設定是否持久化。設定為 true 時代表持久化,持久化的隊列會存盤,在伺服器重新開機的時候可以保證不丢失相關資訊。
- exclusive:設定是否排他。設為 true 時表示排他的,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接配接可見,并在連接配接斷開時自動删除。需要注意的是:排他隊列是基于連接配接(Connection)可見的,同一個連接配接的不同信道(Channel)是可以同時通路同一個連接配接建立的排他隊列;“首次”是指如果一個連接配接已經聲明了一個排他隊列,其他連接配接是不允許建立同名的排他隊列,這個與普通隊列不同;即使該隊列是持久化的,一旦連接配接關閉 或者用戶端退出,該排他隊列都會被自動删除,這種隊列适用于一個用戶端同時發送和讀取消息的應用場景。
- autoDelete:設定是否自動删除。設為 true 時表示隊列為自動删除。自動删除的前提是:至少有一個消費者連接配接到這個隊列,之後所有與這個隊列連接配接的消費者都斷開時,才會自動删除。注意:不要錯誤的了解為:“當連接配接到此隊列的所有用戶端斷開時,這個隊列自動删除”,因為生産者用戶端建立這個隊列,或沒有消費者用戶端與這個隊列連接配接時,都不會自動删除該隊列。
- arguments:設定隊列的其他一些參數。
要點:
生産者和消費者都能夠使用 queueDeclare 來聲明一個隊列,但是如果消費者在同一個信道上訂閱了另一個隊列,就無法再聲明隊列了。必須先取消訂閱,然後将信道置為“傳輸”模式後才能聲明隊列。
/** 該方法用來檢測相應的隊列是否存在,如果存在則正常傳回,
如果不存在則抛出異常:404 channel exception.同時,
Channel 也會被關閉。 */
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
關于删除隊列的方法:
Queue.DeleteOk queueDelete(String queue) throws IOException;
Queue.DeleteOk queueDelete(String queue, boolean ifUnused,
boolean ifEmpty) throws IOException;
以上方法參數說明:
- queue:表示隊列名稱。
- ifUnused:用來設定是否在隊列沒有被使用的情況下删除。如果 ifUnused 設定為 true,則隻有在此隊列沒有被使用的情況下才會被删除;若為 false 則無論如何這個隊列都要被删除。
- ifEmpty:設定為 true 時表示隊列為空(隊列裡面沒有任何消息堆積)的情況下才能被删除。
/** 該方法用來清空隊列中的内容,但是不删除隊列本身。 */
Queue.PurgeOk queueOurge(String queue) throws IOException;
2.1.3 關于 queueBind 方法
queueBind 方法用于将交換器和隊列進行綁定。以下是可以重載的方法:
Queue.BindOk queueBind(String queue, String exchange,
String routingKey) throws IOException;
Queue.BindOk queueBind(Stirng queue, String exchange, String routingKey,
Map<String, Object> arguments) throws IOException;
void queueBindNoWait(String queue, String exchange, Stirng routingKey,
Map<String, Object> arguments) throws IOException;
以下是 queueBind 方法中參數的詳細說明:
- queue:隊列名稱。
- exchange:交換器的名稱。
- routingKey:用來綁定隊列和交換器的路由鍵。
- arguments:定義綁定的一些參數。
将已經綁定的隊列和交換器進行解綁:
Queue.UnbindOk queueUnbind(String queue, String exchange,
String routingKey) throws IOException;
Queue.UnbindOk queueUnbind(Stirng queue, String exchange,
String routingKey, Map<String, Object> arguments) throws IOException;
3. 發送消息 channel.basicPublish 方法
Rabbit MQ 發送消息使用 channel.basicPublish 方法,其有以下幾個重載方法:
- void basicPublish (String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
- void basicPublish (String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
- void basicPublish (String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
以上方法具體參數說明如下:
- exchange:交換器的名稱,指明消息需要發送到哪個交換器中。如果設定為空字元串,則消息會被發送到 Rabbit MQ 預設的交換器中。
- routingKey:路由鍵,交換器根據路由鍵将消息存儲到相應的隊列中。
- props:消息的基本屬性集。其包含14個屬性成員:contentType、contentEncoding、headers(Map<String, Object>)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId.
- byte[] body:消息體(payload),真正需要發送的消息。
- mandatory:該參數設定為 true 時,當交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列時,Rabbit MQ 會調用 Basic.Return 指令将消息傳回給生産者。當設定為 false 時,出現上述情形時,消息直接被丢棄。
- immediate:該參數設定為 true 時,如果交換器在将消息路由到隊列時,發現隊列上并不存在任何消費者,那麼這條消息将不會存入隊列中。當與路由鍵比對的所有隊列都沒有消費者時,該消息會通過 Basic.Return 傳回給生産者。Rabbit MQ 3.0 版本開始去掉了對 immediate 參數的支援。
示例1:
/** 發送一條簡單的消息: Hello, World !!! */
byte[] messageBodyBytes = "Hello, World !!!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
示例2:
/** 發送一條簡單的消息: Hello, World !!! */
byte[] messageBodyBytes = "Hello, World !!!".getBytes();
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder())
.contentType("text/plain")
.deliveryMode(2) //消息的投遞模式設定為2,即消息會被持久化(存入磁盤)在伺服器中
.priority(1) //該消息的優先級設定為 1
.userId("hidden")
.build()),
messageBodyBytes);
示例3:
/** 發送一條簡單的消息: "Hello, World !!!",帶有過期時間(expiration) */
byte[] messageBodyBytes = "Hello, World !!!".getBytes();
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder())
.expiration("60000")
.build()),
messageBodyBytes);
示例4:
/** 發送一條簡單的消息: "Hello, World !!!",帶有 headers */
byte[] messageBodyBytes = "Hello, World !!!".getBytes();
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("localtion", "here");
headers.put("time", "today");
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder())
.headers(headers)
.build()),
messageBodyBytes);