天天看點

RabbitMQ基礎知識及使用

RabbitMQ簡介

RabbitMQ是目前非常熱門的一款消息中間件,不管是網際網路行業還是傳統行業都在大量地使用。RabbitMQ憑借其高可靠、易擴充、高可用及豐富的功能特性受到越來越多企業的青睐。RabbitMQ伺服器是用Erlang語言編寫的,而叢集和故障轉移是建構在開放電信平台架構上的。所有主要的程式設計語言均有與代理接口通訊的用戶端庫。RabbitMQ整體上是一個生産者與消費者模型,主要負責接收、存儲和轉發消息。

RabbitMQ安裝

官網下載下傳位址:https://www.rabbitmq.com/install-windows.html

RabbitMQ基礎知識及使用

不過安裝RabbitMQ之前需要先安裝對應支援版本的Erlang。

https://www.rabbitmq.com/which-erlang.html#supported-version-policy

RabbitMQ基礎知識及使用

Erlang下載下傳位址:http://www.erlang.org/downloads

RabbitMQ基礎知識及使用

RabbitMQ安裝好後再環境變量中添加系統變量ERLANG_HOMT,變量值為安裝Erlang的路徑(路徑中不要包含bin目錄)。

RabbitMQ基礎知識及使用

在系統變量path中添加%ERLANG_HOME%\bin

RabbitMQ基礎知識及使用

在浏覽器中輸入http://localhost:15672,預設使用者名和密碼都是guest,進入首頁面後就是這樣

RabbitMQ基礎知識及使用

生産者與消費者

Producer:生産者,就是投遞消息的一方。

生産者建立消息,然後釋出到RabbitMQ中。消息一般可以包含2個部分:消息體和标簽。消息體也可以稱之為payload,在實際應用中,消息體一般是一個帶有業務邏輯結構的資料,比如一個JSON字元串。當然可以進一步對這個消息體進行序列化操作。消息的标簽用來表述這條消息,比如一個交換器的名稱和一個路由鍵。生産者把消息交由RabbitMQ,RabbitMQ之後會根據标簽把消息發送給感興趣的消費者.

Consumer:消費者,就是接受消息的一方。

消費者連接配接到RabbitMQ伺服器,并訂閱到隊列上。當消費者消費一條消息時,隻是消費消息的消息體。在消息路由的過程中,消息的标簽會丢棄,存入到隊列中的消息隻有消息體,消費者也隻會消費到消息體,也就是不知道消息的生産者是誰,當然消費者也不需要知道。

隊列、交換器、路由鍵、綁定

Queue:隊列,是RabbitMQ的内部對象,用于存儲消息。

RabbitMQ中消息都隻能存儲在隊列中。多個消費者可以訂約同一個隊列,這時隊列中的消息會被平均分攤又稱輪詢分發。

Exchange:交換器。生産者将消息發送到Exchange,由交換器将消息路由到一個或者多個隊列中。如果路由不到,或許會傳回給生産者,或許直接丢棄。RabbitMQ中的交換器可以看作一個簡單的實體。

RoutingKey:路由鍵。生産者将消息發給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規則,而這個Routing Key需要與交換器類型和綁定聯合使用才能最終生效。

Binding:綁定。RabbitMQ中通過綁定将交換器與隊列關聯起來,在綁定的時候一般會指定一個綁定鍵BindingKey,這樣RabbitMQ就知道如何正确地将消息路由到隊列了。RoutingKey相當于郵寄包裹的位址,BindingKey相當于包裹的目的地。在某些情況下RoutingKey和BindingKey可以看作同一個東西。

交換器類型

RabbitMQ常用的交換器類型有fanout、direct、topic、headers這四種。

fanout

它會把發送到該交換器的消息路由到所有與該交換器綁定的隊列中。

direct

direct類型的交換器路由規則也很簡單,它會把消息路由到BindingKey和RoutingKey完全比對的隊列中,如下圖發送消息的時候設定路由鍵為"warning",消息就是同時路由到Queue1和Queue2,如果發送消息的時候設定路由鍵為"info"或者"debug",消息就隻會路由到Queue2。如果發送消息的時候設定路由鍵為其它的,那麼消息就不會發送到這兩個隊列中。

RabbitMQ基礎知識及使用

topic

topic交換器與direct相似也是将消息路由到BindingKey和RountingKey相比對的隊列中,但這裡的比對規則有些不同,它有以下一些約定:

  • RoutingKey為一個點号"·“分隔的字元串(被點号”·“分隔開的每一段獨立的字元串稱為一個單詞,)如“com.rabbitmq.client”。
  • BindingKey和RoutingKey一樣也是"·"分隔的字元串。
  • BindingKey中可以存在兩種特殊的字元串"*“和”#",用于做模糊比對,其中"**“用于比對一個單詞,”#"用于比對多規格單詞。

在下圖中路由鍵為“com.rabbitmq.client"會同時路由到Queue1和Queue2;“com.hidden.demo"會路由到Queue2中;“java.rabbitmq.demo"會路由到Queue1中。

RabbitMQ基礎知識及使用

headers

headers類型的交換器不依賴路由鍵的比對規則來路由消息,而是根據發送消息内容中的headers屬性進行比對。在綁定隊列和交換器時指定一組鍵值對,當發送消息到交換器時,RabbitMQ會擷取到該消息的headers,對比其中的鍵值對是否完全比對隊列和交換器綁定時指定的鍵值對,如果完全比對則消息會路由到該隊列。

RabbitMq運作流程

生産者生産消息

  • 連接配接到RabbitMq Broker,建立一個連接配接,開啟一個Channel
  • 生産者聲明一個交換器
  • 生産者聲明一個隊列并設定相關屬性
  • 生産者通過路由鍵将交換器和隊列綁定起來
  • 生産者發送消息至RabbitMq Broker
  • 相應的交換器根據接收到的路由鍵查找相比對的隊列
  • 如果找到則将生産者發送過來的消息存入應用中
  • 如果沒有則丢棄或者傳回給生産者
  • 關閉Channel
  • 關閉連接配接

消費者接收消息

  • 連接配接到RabbitMq Broker,建立一個連接配接,開啟一個Channel
  • 消費者向RabbitMq Broker請求消費隊列中的消息
  • 等待RabbitMq Broker回應并投遞相應隊列中的消息,消費者接收消息
  • 消費者确認消息
  • RabbitMq從隊列中删除相應已被确認的消息
  • 關閉Channel
  • 關閉連接配接

代碼事例

  • 擷取MQ連接配接
public class ConnectionUtils {
	public static Connection  getConnection() throws IOException, TimeoutException{
		//定義一個連接配接工廠
		ConnectionFactory factory =new ConnectionFactory();		
		//設定服務位址
		factory.setHost("127.0.0.1");
		//AMQP 5672
		factory.setPort(5672);		
		//vhost
		factory.setVirtualHost("vhost");
		//使用者名 
		factory.setUsername("admin");		
		//密碼
		factory.setPassword("123456");
		return factory.newConnection();
	}
}
           

這裡的vhost可以在Virtual Hosts中通過Add virtual hosts自己添加

RabbitMQ基礎知識及使用
  • 生産者,采用的交換機模式為direct
public class Send {
	private static final String EXCHANGE_NAME="test_exchange_direct";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtils.getConnection();
		Channel channel = connection.createChannel();
		//exchange
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		String  msg="hello direct!";
		String routingKey="info";
		channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
		System.out.println("send "+msg);
		channel.close();
		connection.close();
	}
}
           
  • 消費者1
public class Recv1 {
	private static final String EXCHANGE_NAME = "test_exchange_direct";
	private static final String QUEUE_NAME = "test_queue_direct_1";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtils.getConnection();
		final Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
		channel.basicQos(1);
		//定義一個消費者
		Consumer consumer=new DefaultConsumer(channel){
			//消息到達 觸發這個方法
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				String msg=new String(body,"utf-8");
				System.out.println("[1] Recv msg:"+msg);
				try {
					Thread.sleep(2000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}finally{
					System.out.println("[1] done ");
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		boolean autoAck=false;//自動應答 false
		channel.basicConsume(QUEUE_NAME,autoAck , consumer);
	}
}
           
  • 消費者2
public class Recv2 {
	private static final String EXCHANGE_NAME = "test_exchange_direct";
	private static final String QUEUE_NAME = "test_queue_direct_2";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtils.getConnection();
		final Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
		channel.basicQos(1);
		//定義一個消費者
		Consumer consumer=new DefaultConsumer(channel){
			//消息到達 觸發這個方法
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
			 
				String msg=new String(body,"utf-8");
				System.out.println("[2] Recv msg:"+msg);
				
				try {
					Thread.sleep(2000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}finally{
					System.out.println("[2] done ");
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		boolean autoAck=false;//自動應答 false
		channel.basicConsume(QUEUE_NAME,autoAck , consumer);
	}
}
           

queueDeclare在RabbitMq源碼如下:

RabbitMQ基礎知識及使用

public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException , Object> arguments) throws IOException ;

它有五個參數:

  • queue:隊列名稱
  • durable:設定是否持久化
  • exclusive:設定是否排他
  • autoDelete:設定是否自動删除
  • arguments:設定隊列一些其他參數

queueBind在RabbitMq源碼如下:

RabbitMQ基礎知識及使用

public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException ;

參數解析:

  • queue:隊列名稱
  • exchange:交換機名稱
  • routingKey:用來綁定隊列和交換器的路由鍵

在上述代碼中交換機是direct類型,消費者1和消費者2的routingKey:

RabbitMQ基礎知識及使用
RabbitMQ基礎知識及使用

這是生産者的routingKey(bindingKey):

RabbitMQ基礎知識及使用

依次運作消費者和生産者,在消費者2的控制台會列印輸出而消費者1的控制台不會列印:

RabbitMQ基礎知識及使用
RabbitMQ基礎知識及使用

同時RabbitMq用戶端會自動生成Queue1和Queue2

RabbitMQ基礎知識及使用

如果之前沒有建立或生成消費者綁定的交換機,則會報錯:

RabbitMQ基礎知識及使用

此時要先建立交換機或者先運作生産者一次讓其自動生成交換機。

RabbitMQ基礎知識及使用