PS:近期在南甯出差,工作比較忙,是以更新會比較慢。
說到消息通信,可能我們首先會想到的是郵箱,QQ,微信,短信等等這些通信方式,這些通信方式都有發送者,接收者,還有一個中間存儲離線消息的容器。但是這些通信方式和我們要講的 RabbitMQ 的通信模型是不一樣的,比如和郵件的通信方式相比,郵件伺服器基于 POP3/SMTP 協定,通信雙方需要明确指定,并且發送的郵件内容有固定的結構。而 RabbitMQ 伺服器基于 AMQP 協定,這個協定是不需要明确指定發送方和接收方的,而且發送的消息也沒有固定的結構,甚至可以直接存儲二進制資料,并且和郵件伺服器一樣,也能存儲離線消息,最關鍵的是 RabbitMQ 既能夠以一對一的方式進行路由,還能夠以一對多的方式進行廣播。
下面這張圖是大緻展示了 RabbitMQ 消息通信的過程:

1、生産者和消費者
在 RabbitMQ 的通信過程中,有兩個主要的角色:生産者和消費者。類比于郵件通信的發送方和接收方。
這裡首先我們要明确 RabbtiMQ 伺服器是不能夠産生資料的,正如同其名字——消息中間件,是一個用來傳遞消息的中間商。生産者産生建立消息,然後釋出到代理伺服器(RabbitMQ),而消費者則從代理伺服器擷取消息(不是直接找生産者要消息),而且在實際應用中,生産者和消費者也是可以角色互相轉換的,是以當我們應用程式連接配接到 RabbitMQ 伺服器時,必須要明确我是生産者呢還是消費者。
2、消息
生産者建立消息,然後釋出到 RabbitMQ 伺服器中,那麼什麼是消息?
這裡的消息分為兩部分:有效内容和内容标簽。
①、有效内容:可以是任何内容,一個數組,一個集合,甚至二進制資料都可以。RabbitMQ 不會在意你發什麼資料,盡管發就行了。
②、内容标簽:描述有效内容,是 RabbitMQ 用來決定誰将獲得消息。前面說的郵件通信,必須明确指定發送方位址和收件方位址,而基于 AMQP 協定的 RabbitMQ 則是通過生産者發送消息附帶的内容标簽将消息發送個感興趣的消費者。
後面我們會詳細解析标簽是什麼,這裡隻需要知道生産者會建立消息并設定标簽。注意最上面的大圖,一般來說生産者建立消息會設定标簽,但是傳輸到消費者那裡就沒有标簽了,除非你在有效内容中說明誰是生産者,一般消費者是不知道誰産生的消息的。
3、信道
生産者産生了消息,然後釋出到 RabbitMQ 伺服器,釋出之前肯定要先連接配接上伺服器,也就是要在應用程式和rabbitmq 伺服器之間建立一條 TCP 連接配接,一旦連接配接建立,應用程式就可以建立一條 AMQP 信道。
信道是建立在“真實的”TCP 連接配接内的虛拟連接配接,AMQP 指令都是通過信道發送出去的,每條信道都會被指派一個唯一的ID(AMQP庫會幫你記住ID的),不論是釋出消息、訂閱隊列或者接收消息,這些動作都是通過信道來完成的。
可能有人會問,為什麼不直接通過 TCP 連接配接來發送AMQP指令呢?
這裡原因是效率問題,因為對于作業系統來說,每次建立和銷毀 TCP 會話是非常昂貴的開銷,而實際系統中,比如電商雙十一,每秒鐘高峰期成千上萬條連接配接,一般來說作業系統建立TCP連接配接是有數量限制的,那麼這就會遇到瓶頸。
引入信道的概念,我們可以在一條 TCP 連接配接上建立 N多個信道,這樣既能發送指令,也能夠保證每條信道的私密性,我們可以将其想象為光纖電纜。
4、交換器和隊列
截取上面的一部分圖:
交換器和隊列都是 RabbitMQ 伺服器的一部分,我們知道生産者會将消息發送到 RabbitMQ 伺服器,而進入該伺服器後,首先進入交換機部分,然後由交換器根據消息附帶的内容标簽,将消息綁定到相應的隊列。我們首先來看什麼是隊列:
①、容納消息的場所,生産者發送到RabbitMQ伺服器的消息會在隊列中等待消費者消費。
②、隊列是 RabbitMQ 伺服器中最後的終點(除非消息進入了黑洞,黑洞的概念下面會介紹)。
③、隊列可以實作負載均衡,我們可以增加一堆消費者,然後讓 RabbitMQ 以循環的方式來均勻的配置設定消息。
搞清楚了隊列是什麼了,那麼消息是如何到達隊列的呢?沒錯,就是通過交換器。
消息進入RabbitMQ 伺服器時,會首先将消息發送到交換器,然後交換器會根據特定的路由算法以及消息的内容标簽将消息綁定到相應的隊列。在 AMQP 協定中有四種交換器:direct、fanout、topic和 headers,每種交換器都實作了不同的路由算法,這也對應 RabbitMQ 工作的幾種不同方式,這是重點,後面部落格會進行詳細介紹。
5、虛拟主機
最上面那張大圖,我畫了虛拟主機A以及虛拟主機B,說明在 RabbitMQ 伺服器中存在着多個虛拟主機,那麼虛拟主機到底是什麼?
首先我們抛出這樣一個問題,一個 RabbitMQ 肯定不是隻服務一個應用程式,那麼多個應用程式同時使用 RabbitMQ 伺服器,如何保證彼此之間不會沖突?
答案就是使用虛拟主機,虛拟主機其實就是一個迷你版的RabbitMQ 伺服器,它擁有自己的交換器和隊列,更重要的是虛拟主機擁有自己的權限機制,一個伺服器能夠建立多個虛拟主機。那麼我們在使用RabbitMQ伺服器的時候,隻需要将一個應用程式對應一個虛拟主機,這種各個執行個體間邏輯上的分離就能夠保證不同的應用程式安全的傳遞消息。
預設的虛拟主機是“/”。
6、簡單執行個體
介紹完RabbitMQ 消息通信過程中的一些基本概念後,下面我們通過一個代碼執行個體來實際感受一下。
這是一個Maven工程,首先我們看 pom.xml 檔案:導入 amqp-client 依賴即可
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ys.rabbitmq</groupId>
<artifactId>RabbitMQTest</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<name>RabbitMQTest Maven Webapp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
</dependencies>
</project>
生産者:
1 package com.ys.simple;
2
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.Connection;
5 import com.ys.utils.ConnectionUtil;
6
7 /**
8 * Create by hadoop
9 */
10 public class Send {
11 private final static String QUEUE_NAME = "hello";
12
13 public static void main(String[] args) throws Exception{
14 //1、擷取連接配接
15 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
16 //2、聲明通道
17 Channel channel = connection.createChannel();
18 //3、聲明(建立)隊列
19 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
20 //4、定義消息内容
21 String message = "hello rabbitmq ";
22 //5、釋出消息
23 channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
24 System.out.println("[x] Sent'"+message+"'");
25 //6、關閉通道和連接配接
26 channel.close();
27 connection.close();
28 }
29 }
消費者:
1 package com.ys.simple;
2
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.Connection;
5 import com.rabbitmq.client.QueueingConsumer;
6 import com.ys.utils.ConnectionUtil;
7
8
9 /**
10 * Create by hadoop
11 */
12 public class Recv {
13
14 private final static String QUEUE_NAME = "hello";
15
16 public static void main(String[] args) throws Exception{
17 //1、擷取連接配接
18 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
19 //2、聲明通道
20 Channel channel = connection.createChannel();
21 //3、聲明隊列
22 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23 //4、定義隊列的消費者
24 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
25 //5、監聽隊列
26 channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
27 //6、擷取消息
28 while (true){
29 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
30 String message = new String(delivery.getBody());
31 System.out.println(" [x] Received '" + message + "'");
32 }
33 }
34
35 }
工具類:ConnectionUtil
1 package com.ys.utils;
2
3 import com.rabbitmq.client.Connection;
4 import com.rabbitmq.client.ConnectionFactory;
5
6 /**
7 * Create by hadoop
8 */
9 public class ConnectionUtil {
10
11 public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{
12 //1、定義連接配接工廠
13 ConnectionFactory factory = new ConnectionFactory();
14 //2、設定伺服器位址
15 factory.setHost(host);
16 //3、設定端口
17 factory.setPort(port);
18 //4、設定虛拟主機、使用者名、密碼
19 factory.setVirtualHost(vHost);
20 factory.setUsername(userName);
21 factory.setPassword(passWord);
22 //5、通過連接配接工廠擷取連接配接
23 Connection connection = factory.newConnection();
24 return connection;
25 }
26 }
作者:
YSOcean出處:
http://www.cnblogs.com/ysocean/本文版權歸作者所有,歡迎轉載,但未經作者同意不能轉載,否則保留追究法律責任的權利。