天天看點

RabbitMQ入門(二)1.入門2.工作隊列3.釋出/訂閱模式在RabbitMQ中實作方式4.部分API使用說明(個人了解,僅供參考)5.補充說明

在上一篇部落格,筆者簡單的介紹了一些RabbitMQ相關的内容,在這一篇部落格會根據RabbitMQ官網的入門介紹,結合筆者自身的了解更深入的在代碼方面介紹RabbitMQ的入門使用,。同樣,這篇部落客要的目的也是整理記錄自己的學習筆記,加深自己對RabbitMQ的使用與了解。

RabbitMQ入門(一)

RabbitMQ入門(二)

RabbitMQ入門(三)

目錄

1.入門

建立消息生産者

建立消息消費者

2.工作隊列

建立新的消費者

測試

設定AutoAck為false的注意事項

3.釋出/訂閱模式在RabbitMQ中實作方式

4.部分API使用說明(個人了解,僅供參考)

channel.queueDeclare

channel.queueBind

channel.basicPublish

5.補充說明

1.入門

建立消息生産者

首先我們先來看RabbitMQ官網的一段消息生産者相關代碼:

public class Send {
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] args) throws IOException, TimeoutException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setVirtualHost("myVirualHost");
		try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) {
			channel.exchangeDeclare("helloExchange ", "direct", true);
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			channel.queueBind(QUEUE_NAME, " helloExchange ", "bindKey");
			
			String message = ”message from send“;
			channel.basicPublish("helloExchange", "bindKey", null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}
	}
}
           

這裡可以看到首先 我們是建立了ConnectionFacotry 并設定了 Host和VirtualHost  因為端口 和使用者名 密碼我們使用預設的是以并沒有設定。然後 fatory.newConnection 建立Connection,之後 Connection.createChannel建立channel,而我們的操作基本都是基于 channel的。這裡使用了try-with-resources語句,在try語句中聲明channel等資源,這樣我們不用手動關閉資源。

channel.exchangeDeclare("helloExchange", "direct", true);

定義一個名為 helloExchange  類型是 direct的exchange. 如果RabbitMQ此VirtualHost中 已經有同名同類型的Exchange 那麼可能會報錯 這裡設定為true就能避免。(但是如果有類型不同的同名Exchange 則還是會報錯,那麼隻能删除此句或者去控制頁面删除Exchange)

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

定義名為 QUEUE_NAME的隊列 第二個參數是防止有同名Queue。具體函數的參數定義可以檢視後面的部分API說明。

channel.queueBind(QUEUE_NAME, " helloExchange ", "bindKey");

将名為QUEUE_NAME的隊列與helloExchange綁定在一起,并且Key為bindKey.這樣發送到helloExchange的Routing Key為bindKey的消息就會進入此隊列。

channel.basicPublish("helloExchange", "bindKey", null, message.getBytes());

釋出消息到helloExchange  Routing Key為bingKey。

從上面的代碼我們可以看出,就跟上一節我們說明的一樣,我們發送消息并沒有直接給Queue,而是發送給Exchange 并且設定Routing Key 之後RabbitMQ 内部将根據Exchange 類型和與Exchange綁定的Queue設定的Binding key 将資訊放到隊列或者舍棄。如果沒有設定Exchange 也就是傳遞空字元串 就會發送到預設的Exchange。

建立消息消費者

相關代碼如下:

public class Recv {
	private static final String QUEUE_NAME="hello";
	public static void main(String[] argv) throws Exception {
	    ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    factory.setVirtualHost("myVirualHost");
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();
	    DeliverCallback deliverCallback = (consumerTag,delivery)->{
	    	String message = new String(delivery.getBody(), "UTF-8");
	        System.out.println(" [x] Received '" + message + "'");
	    };
	    channel.basicConsume(QUEUE_NAME, true,deliverCallback,consumerTag->{});
	    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

	  }
}
           

可以看出來消息消費者  之前操作也類似 就是 建立ConnectionFacotry Connection Channel  然後通過Channel做操作。

DeliverCallback deliverCallback = (consumerTag,delivery)->{

      String message = new String(delivery.getBody(), "UTF-8");

      System.out.println(" [x] Received '" + message + "'");

};
           

這裡是定義收到消息後的回調函數,這裡面也就是我們實際處理消息的地方。

channel.basicConsume(QUEUE_NAME, true,deliverCallback,consumerTag->{});

建立 一個消費者,這個消費者從QUEUE_NAME隊列中取資訊 然後進行消費。這裡第二個參數是  消費者是否自動傳回ack。這裡暫時設定為true(這樣設定 會導緻 如果處理消息過程中,消費者出現了異常或者關閉了,消息會丢失,下節我們會做一些處理避免因為消費者消費失敗 導緻消息丢失)。

2.工作隊列

上節我們說到如果消費者已經取得消息 但是處理過程中發生了異常,會導緻消息丢失的問題,這節我們就處理一下。由于生産者沒有太大變化,我們這裡隻是将消息改為了從控制台獲得。即

String message = String.join(“ ”,args);

建立新的消費者

public class Worker {
	private static final String QUEUE_NAME = "hello";

	public static void main(String[] args) throws IOException, TimeoutException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setVirtualHost("myVirualHost");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
//		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody(), "UTF-8");
			System.out.println(" [x] Received '" + message + "'");
			try {
				doWork(message);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				System.out.println(" [x] Done");
				channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
			}
		};
		boolean autoAck = false; // acknowledgment is covered below
		channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
		});
	}
	private static void doWork(String task) throws InterruptedException {
		for (char ch : task.toCharArray()) {
			if (ch == '.')
				Thread.sleep(5000);
		}
	}
}
           

可以從上面代碼看出來,我們的主要差別之處主要在于 回調函數中 我們增加了模拟處理消息的方法,并且在處理完成之後 調用

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

這代表着我們告訴RabbitMQ,此條資訊已經處理完成。

并且我們在聲明消費者時候 使用如下代碼:

boolean autoAck = false;

channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});

設定了自動傳回ack為false。

這樣我們就可以避免 消費者取到資訊 處理資訊過程中出現異常 導緻資訊丢失了。

測試

執行消息生産者 并分别傳入 參數1.1.1.1 Message 和 2.2.2.2 Message

啟動消費者  并在顯示

RabbitMQ入門(二)1.入門2.工作隊列3.釋出/訂閱模式在RabbitMQ中實作方式4.部分API使用說明(個人了解,僅供參考)5.補充說明

而未顯示[x] Done之前  關閉此消費者。

啟動第二個消費者,發現 這個消費者最後列印

RabbitMQ入門(二)1.入門2.工作隊列3.釋出/訂閱模式在RabbitMQ中實作方式4.部分API使用說明(個人了解,僅供參考)5.補充說明

說明 我們的消息 1.1.1.1 Message并沒有因為第一個消費者消費失敗 而丢失。這是因為如果消費者沒有将成功處理消息的辨別傳回給RabbitMQ ,那麼RabbitMQ會在此消費者 關閉之後将消息重新入棧。

我們将

channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});

改為上一節的

channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});

之後重複操作 會發現資訊發生了丢失。進而說明如果設定預設傳回Ack的話,會有消息處理出現異常進而導緻消息丢失的風險。

設定AutoAck為false的注意事項

之前我們已經設定了AutoAck為false,這樣的話 我們要注意一定要在處理完消息之後使用發送過來這條消息的channel(使用其他channel會報錯!)調用channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 的代碼 将消息被消費的資訊 告訴RabbitMQ 否則的話就會出現 消息被消費了 但是還是在隊列中,也就一直在記憶體中儲存此消息,更嚴重的是當這個消費此消息的消費者關閉之後,這些消息 由于是unacked狀态的,會再次被其他消費者消費。

3.釋出/訂閱模式在RabbitMQ中實作方式

在RabbitMQ中 如果我們想實作 類似之前ActiveMQ 那種 topic 模式的消息處理,也就是一個釋出者釋出消息 能讓多個訂閱者收到,并且 也是訂閱者隻收到自己訂閱後的消息,那麼我們可以 建立一個exchange 其類型是 fanout,也就是不管routingkey 此exchange收到的消息 将發送到所有它綁定的隊列中,那麼我們訂閱者發送消息給fanout的exchange時候不用指定routingkey(指定也沒作用)channel.basicPublish("fanoutExchange", "",null, msgByte);

而消費者端 會定義一個臨時隊列,建立的臨時隊列

String queueName = channel.queueDeclare().getName();

将此臨時隊列 與 fanout類型的EXCHANGE綁定

channel.queueBind(queueName, EXCHANGE_NAME, "");

然後channel.basicConsume(queueName, true, deliverCallback, consumerTak->{});

建立消費者,這樣每次啟動消費者 都會建立一個臨時隊列 并且與之前的fanoutExchange相綁定,進而接收到發送到fanoutExchange的消息,并且關閉消費者 就會自動銷毀此臨時隊列。

4.部分API使用說明(個人了解,僅供參考)

channel.queueDeclare

channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)

queue 隊列名稱(queue如果設定為空字元串,那麼會自動生成隊列名稱,使用queueDeclare().getName()就能在建立隊列之後 擷取自動生成的隊列名稱)
durable  是否持久化
exclusive 是否隻有一個channel和這個queue連接配接,如果是,那麼當這個channel斷開删除該隊列
autoDelete 是否自動删除
arguments 是其他參數 可以設定隊列長度限制 最大優先級數等等。

下面表格是對于arguments的說明。

x-max-length 限定隊列消息的最大值長度,超過指定長度會把最早的幾條删除
x-max-length-bytes 限定隊列的最大占用空間大小
x-dead-letter-exchange 将因為長度不夠或者過期的消息 從隊列删除 并推送到指定交換機中而不是丢棄
x-dead-letter-routing-key 上面删除的消息 推送到指定交換機的指定路由鍵的隊列中
x-max-priority 優先級隊列,聲明隊列時先定義最大優先級值(定義最大值一般不要太大),在釋出消息的時候指定該消息的優先級, 優先級更高(數值更大的)的消息先被消費
x-queue-mode 為lazy 則先将消息儲存到磁盤上,不放在記憶體中,當消費者開始消費的時候才加載到記憶體中
x-expires 當隊列在指定的時間沒有被通路(consume, basicGet, queueDeclare…)就會被删除
x-message-ttl 設定隊列中的所有消息的生存周期(統一為整個隊列的所有消息設定生命周期), 也可以在釋出消息的時候單獨為某個消息指定剩餘生存時間,機關毫秒, 類似于redis中的ttl,生存時間到了,消息會被從隊裡中删除,注意是消息被删除,而不是隊列被删除

當設定exclusive 以及autoDelete 都為true,也就是這個queue隻能被一個channel使用 而且channel斷開後這個queue直接被删除(queue如果設定為空字元串 那麼就會直接删除)

 在RabbitMQ中 使用channel.queueDeclare  第二個參數是 是否持久化 。如果釋出者和消費者都調用了queueDeclare (并且設定的QueueName是一樣的) 那麼這個參數要一緻  否則會報錯。ExchangeDeclare同樣可以設定是否持久化。如果想RabbitMQ被異常關閉之後 消息也不會丢失,那麼我們可以将queue設定為持久化,然後在發送消息的時候 将第三個參數 也就是設定消息是持久化的文本形式!!

channel.basicPublish("leiTopicExchange", "leiTopicMsg",MessageProperties.PERSISTENT_TEXT_PLAIN, msgByte);

這樣 RabbitMQ關閉之後 重新開機 原來在消息隊列的消息也不會丢失。

channel.queueBind

channel.queueBind(queue, exchange, routingKey, arguments)

主要用來綁定隊列和exchange

queue 要綁定的隊列名
exchange 綁定的exchange名
routingKey 綁定exchange和隊列的Bindkey (如果是fanout或者headers其實無所謂 可以傳遞空字元串)
arguments 綁定傳遞的額外參數,如exchange是headers時候 添加的鍵值對,進而讓消息根據發送消息設定的headers與這裡的headers對比,符合條件才發送到這個queue

channel.basicPublish

channel.basicPublish(exchange, routingKey, props, body)

exchange 是發送消息到哪個exchange
routingKey exchange為topic或者direct時候  決定routingKey和綁定在此exchange的queue的bindingKey 是否比對的值
props 消息的其他資訊 具體可以檢視BasicProperties 這個類,裡面可以設定資訊過期等
body 就是傳遞的消息體

其中的props我們可以從BasicProperties這個類中得到一些資訊:

public BasicProperties(
            String contentType,
            String contentEncoding,
            Map<String,Object> headers,
            Integer deliveryMode,
            Integer priority,
            String correlationId,
            String replyTo,
            String expiration,
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)
        {
            this.contentType = contentType;
            this.contentEncoding = contentEncoding;
            this.headers = headers==null ? null : Collections.unmodifiableMap(new HashMap<String,Object>(headers));
            this.deliveryMode = deliveryMode;
            this.priority = priority;
            this.correlationId = correlationId;
            this.replyTo = replyTo;
            this.expiration = expiration;
            this.messageId = messageId;
            this.timestamp = timestamp;
            this.type = type;
            this.userId = userId;
            this.appId = appId;
            this.clusterId = clusterId;
        }
           

如contentType就是消息的類型如text/plain就代表是普通文本,contentEncoding就是消息的編碼,而headers就決定在消息進入Type是headers的Exchange的時候,消息該發往哪個隊列,priority代表消息的優先級,後續的一些在之後用到的時候會再進行講解。

5.補充說明

之前我們設定中 會發現 兩個Worker,有時候會出現一個Worker一直執行,而另外一個Worker好像幾乎不工作 這是因為RabbitMQ隻是在消息進入隊列的時候發送消息,而不考慮消費者未确認的數量,為了避免這種情況,我們可以在消費者使用channel.basicQos(prefetchcount)  prefetchcount是整數,也就是預存計數,這就告訴rabbitMQ  一次最多向消費者發送prefetchcount條資訊。(也可以認為是這個消費者最多能持有prefetchcount條資訊,比如設定為3 ,如果消息夠多 那麼每次處理并發送ack給rabbitMQ,rabbitMQ會再給一條資訊,也就是保持消費者有3條資訊)如果設定為1,那麼就是在worker處理并确認一條消息之前,不向這個worker發送新消息 相反他會發送給下一個不忙的勞工。

RabbitMQ中消息的狀态,1.準備傳遞(未給消費者),2.已傳遞但是消費者尚未确認(沒發送ack)。