RabbitMQ簡介
RabbitMQ是目前非常熱門的一款消息中間件,不管是網際網路行業還是傳統行業都在大量地使用。RabbitMQ憑借其高可靠、易擴充、高可用及豐富的功能特性受到越來越多企業的青睐。RabbitMQ伺服器是用Erlang語言編寫的,而叢集和故障轉移是建構在開放電信平台架構上的。所有主要的程式設計語言均有與代理接口通訊的用戶端庫。RabbitMQ整體上是一個生産者與消費者模型,主要負責接收、存儲和轉發消息。
RabbitMQ安裝
官網下載下傳位址:https://www.rabbitmq.com/install-windows.html

不過安裝RabbitMQ之前需要先安裝對應支援版本的Erlang。
https://www.rabbitmq.com/which-erlang.html#supported-version-policy
Erlang下載下傳位址:http://www.erlang.org/downloads
RabbitMQ安裝好後再環境變量中添加系統變量ERLANG_HOMT,變量值為安裝Erlang的路徑(路徑中不要包含bin目錄)。
在系統變量path中添加%ERLANG_HOME%\bin
在浏覽器中輸入http://localhost:15672,預設使用者名和密碼都是guest,進入首頁面後就是這樣
生産者與消費者
Producer:生産者,就是投遞消息的一方。
生産者建立消息,然後釋出到RabbitMQ中。消息一般可以包含2個部分:消息體和标簽。消息體也可以稱之為payload,在實際應用中,消息體一般是一個帶有業務邏輯結構的資料,比如一個JSON字元串。當然可以進一步對這個消息體進行序列化操作。消息的标簽用來表述這條消息,比如一個交換器的名稱和一個路由鍵。生産者把消息交由RabbitMQ,RabbitMQ之後會根據标簽把消息發送給感興趣的消費者.
Consumer:消費者,就是接受消息的一方。
消費者連接配接到RabbitMQ伺服器,并訂閱到隊列上。當消費者消費一條消息時,隻是消費消息的消息體。在消息路由的過程中,消息的标簽會丢棄,存入到隊列中的消息隻有消息體,消費者也隻會消費到消息體,也就是不知道消息的生産者是誰,當然消費者也不需要知道。
隊列、交換器、路由鍵、綁定
Queue:隊列,是RabbitMQ的内部對象,用于存儲消息。
RabbitMQ中消息都隻能存儲在隊列中。多個消費者可以訂約同一個隊列,這時隊列中的消息會被平均分攤又稱輪詢分發。
Exchange:交換器。生産者将消息發送到Exchange,由交換器将消息路由到一個或者多個隊列中。如果路由不到,或許會傳回給生産者,或許直接丢棄。RabbitMQ中的交換器可以看作一個簡單的實體。
RoutingKey:路由鍵。生産者将消息發給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規則,而這個Routing Key需要與交換器類型和綁定聯合使用才能最終生效。
Binding:綁定。RabbitMQ中通過綁定将交換器與隊列關聯起來,在綁定的時候一般會指定一個綁定鍵BindingKey,這樣RabbitMQ就知道如何正确地将消息路由到隊列了。RoutingKey相當于郵寄包裹的位址,BindingKey相當于包裹的目的地。在某些情況下RoutingKey和BindingKey可以看作同一個東西。
交換器類型
RabbitMQ常用的交換器類型有fanout、direct、topic、headers這四種。
fanout
它會把發送到該交換器的消息路由到所有與該交換器綁定的隊列中。
direct
direct類型的交換器路由規則也很簡單,它會把消息路由到BindingKey和RoutingKey完全比對的隊列中,如下圖發送消息的時候設定路由鍵為"warning",消息就是同時路由到Queue1和Queue2,如果發送消息的時候設定路由鍵為"info"或者"debug",消息就隻會路由到Queue2。如果發送消息的時候設定路由鍵為其它的,那麼消息就不會發送到這兩個隊列中。
topic
topic交換器與direct相似也是将消息路由到BindingKey和RountingKey相比對的隊列中,但這裡的比對規則有些不同,它有以下一些約定:
- RoutingKey為一個點号"·“分隔的字元串(被點号”·“分隔開的每一段獨立的字元串稱為一個單詞,)如“com.rabbitmq.client”。
- BindingKey和RoutingKey一樣也是"·"分隔的字元串。
- BindingKey中可以存在兩種特殊的字元串"*“和”#",用于做模糊比對,其中"**“用于比對一個單詞,”#"用于比對多規格單詞。
在下圖中路由鍵為“com.rabbitmq.client"會同時路由到Queue1和Queue2;“com.hidden.demo"會路由到Queue2中;“java.rabbitmq.demo"會路由到Queue1中。
headers
headers類型的交換器不依賴路由鍵的比對規則來路由消息,而是根據發送消息内容中的headers屬性進行比對。在綁定隊列和交換器時指定一組鍵值對,當發送消息到交換器時,RabbitMQ會擷取到該消息的headers,對比其中的鍵值對是否完全比對隊列和交換器綁定時指定的鍵值對,如果完全比對則消息會路由到該隊列。
RabbitMq運作流程
生産者生産消息
- 連接配接到RabbitMq Broker,建立一個連接配接,開啟一個Channel
- 生産者聲明一個交換器
- 生産者聲明一個隊列并設定相關屬性
- 生産者通過路由鍵将交換器和隊列綁定起來
- 生産者發送消息至RabbitMq Broker
- 相應的交換器根據接收到的路由鍵查找相比對的隊列
- 如果找到則将生産者發送過來的消息存入應用中
- 如果沒有則丢棄或者傳回給生産者
- 關閉Channel
- 關閉連接配接
消費者接收消息
- 連接配接到RabbitMq Broker,建立一個連接配接,開啟一個Channel
- 消費者向RabbitMq Broker請求消費隊列中的消息
- 等待RabbitMq Broker回應并投遞相應隊列中的消息,消費者接收消息
- 消費者确認消息
- RabbitMq從隊列中删除相應已被确認的消息
- 關閉Channel
- 關閉連接配接
代碼事例
- 擷取MQ連接配接
public class ConnectionUtils {
public static Connection getConnection() throws IOException, TimeoutException{
//定義一個連接配接工廠
ConnectionFactory factory =new ConnectionFactory();
//設定服務位址
factory.setHost("127.0.0.1");
//AMQP 5672
factory.setPort(5672);
//vhost
factory.setVirtualHost("vhost");
//使用者名
factory.setUsername("admin");
//密碼
factory.setPassword("123456");
return factory.newConnection();
}
}
這裡的vhost可以在Virtual Hosts中通過Add virtual hosts自己添加
- 生産者,采用的交換機模式為direct
public class Send {
private static final String EXCHANGE_NAME="test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String msg="hello direct!";
String routingKey="info";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("send "+msg);
channel.close();
connection.close();
}
}
- 消費者1
public class Recv1 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.basicQos(1);
//定義一個消費者
Consumer consumer=new DefaultConsumer(channel){
//消息到達 觸發這個方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println("[1] Recv msg:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("[1] done ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck=false;//自動應答 false
channel.basicConsume(QUEUE_NAME,autoAck , consumer);
}
}
- 消費者2
public class Recv2 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
channel.basicQos(1);
//定義一個消費者
Consumer consumer=new DefaultConsumer(channel){
//消息到達 觸發這個方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println("[2] Recv msg:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("[2] done ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck=false;//自動應答 false
channel.basicConsume(QUEUE_NAME,autoAck , consumer);
}
}
queueDeclare在RabbitMq源碼如下:
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException , Object> arguments) throws IOException ;
它有五個參數:
- queue:隊列名稱
- durable:設定是否持久化
- exclusive:設定是否排他
- autoDelete:設定是否自動删除
- arguments:設定隊列一些其他參數
queueBind在RabbitMq源碼如下:
public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException ;
參數解析:
- queue:隊列名稱
- exchange:交換機名稱
- routingKey:用來綁定隊列和交換器的路由鍵
在上述代碼中交換機是direct類型,消費者1和消費者2的routingKey:
這是生産者的routingKey(bindingKey):
依次運作消費者和生産者,在消費者2的控制台會列印輸出而消費者1的控制台不會列印:
同時RabbitMq用戶端會自動生成Queue1和Queue2
如果之前沒有建立或生成消費者綁定的交換機,則會報錯:
此時要先建立交換機或者先運作生産者一次讓其自動生成交換機。