了使用RabbitMQ可能會遇到的一個問題,即生産者不知道消息是否真正到達broker代理伺服器,随後通過AMQP協定層面為我們提供的事務機制解決了這個問題,但是采用事務機制實作會降低RabbitMQ的消息吞吐量,那麼有沒有更加高效的解決方式呢?RabbitMQ團隊為我們拿出了更好的方案,即采用發送方确認模式;
生産者确認模式實作原理:
生産者将信道設定成confirm模式,一旦信道進入confirm模式,所有在該信道上面釋出的消息都将會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有比對的隊列之後,broker就會發送一個确認給生産者(包含消息的唯一ID),這就使得生産者知道消息已經正确到達目的隊列了,如果消息和隊列是可持久化的,那麼确認消息會在将消息寫入磁盤之後發出,broker回傳給生産者的确認消息中delivery-tag域包含了确認消息的序列号,此外broker也可以設定basic.ack的multiple域,表示到這個序列号之前的所有消息都已經得到了處理;
confirm模式最大的好處在于他是異步的,一旦釋出一條消息,生産者應用程式就可以在等信道傳回确認的同時繼續發送下一條消息,當消息最終得到确認之後,生産者應用便可以通過回調方法來處理該确認消息,如果RabbitMQ因為自身内部錯誤導緻消息丢失,就會發送一條nack消息,生産者應用程式同樣可以在回調方法中處理該nack消息;
開啟confirm模式的方法:
(注意一點,已經在transaction事務模式的channel是不能再設定成confirm模式的,即這兩種模式是不能共存的),如果沒有設定no-wait标志的話,broker會傳回confirm.select-ok表示同意發送者将目前channel信道設定為confirm模式(從目前RabbitMQ最新版本3.6來看,如果調用了channel.confirmSelect方法,預設情況下是直接将no-wait設定成false的,也就是預設情況下broker是必須回傳confirm.select-ok的,而且我也沒找到我們自己能夠設定no-wait标志的方法);
生産者實作confiem模式有三種程式設計方式:
(1):普通confirm模式,每發送一條消息,調用waitForConfirms()方法等待服務端confirm,這實際上是一種串行的confirm,每publish一條消息之後就等待服務端confirm,如果服務端傳回false或者逾時時間内未傳回,用戶端進行消息重傳;
(2):批量confirm模式,每發送一批消息之後,調用waitForConfirms()方法,等待服務端confirm,這種批量确認的模式極大的提高了confirm效率,但是如果一旦出現confirm傳回false或者逾時的情況,用戶端需要将這一批次的消息全部重發,這會帶來明顯的重複消息,如果這種情況頻繁發生的話,效率也會不升反降;
講完了基本的原理之後,代碼級别我們該怎麼設定channel信道為confirm模式呢?以及我們該怎麼擷取broker傳回給我們的确認消息呢?
測試1:普通confirm模式
首先從最簡單的開始,僅僅将channel設定成confirm模式,并且生産者每發送一條消息就等待broker回應确認消息,至于确認消息是什麼我們不去做任何處理,為了測試友善,此處生産者隻發送了5條消息,實作代碼如下:
[java]
view plain
copy
1. public class ProducerTest {
2. public static void main(String[] args) {
3. "confirmExchange";
4. "confirmQueue";
5. "confirmRoutingKey";
6. "confirmRoutingKey";
7. int count = 5;
8.
9. new ConnectionFactory();
10. "172.16.151.74");
11. "test");
12. "test");
13. 5672);
14.
15. //建立生産者
16. new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
17. producer.run();
18. }
19. }
20.
21. class Sender
22. {
23. private ConnectionFactory factory;
24. private int count;
25. private String exchangeName;
26. private String queueName;
27. private String routingKey;
28. private String bindingKey;
29.
30. public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
31. this.factory = factory;
32. this.count = count;
33. this.exchangeName = exchangeName;
34. this.queueName = queueName;
35. this.routingKey = routingKey;
36. this.bindingKey = bindingKey;
37. }
38.
39. public void run() {
40. null;
41. try {
42. Connection connection = factory.newConnection();
43. channel = connection.createChannel();
44. //建立exchange
45. "direct", true, false, null);
46. //建立隊列
47. true, false, false, null);
48. //綁定exchange和queue
49. channel.queueBind(queueName, exchangeName, bindingKey);
50. channel.confirmSelect();
51. //發送持久化消息
52. for(int i = 0;i < count;i++)
53. {
54. //第一個參數是exchangeName(預設情況下代理伺服器端是存在一個""名字的exchange的,
55. //是以如果不建立exchange的話我們可以直接将該參數設定成"",如果建立了exchange的話
56. //我們需要将該參數設定成建立的exchange的名字),第二個參數是路由鍵
57. "第"+(i+1)+"條消息").getBytes());
58. if(channel.waitForConfirms())
59. {
60. "發送成功");
61. }
62. }
63. final long start = System.currentTimeMillis();
64. "執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
65. catch (Exception e) {
66. e.printStackTrace();
67. }
68. }
69. }
在第50行調用Channel信道的confirmSelect方法将目前信道設定成了confirm模式,第57行通過for循環調用Channel的basicPublish方法發送了5條消息到消息隊列中,第58行調用waitForConfirms方法等待broker服務端傳回ack或者nack消息,這種模式每發送一條消息就會等待broker代理伺服器傳回消息,這點我們可以從抓包的角度觀察結果:

可以看到上面生産者通過Confirm.Select将目前Channel信道設定成confirm模式,broker代理伺服器收到之後回傳Confirm.Select-Ok同一将目前Channel設定成confirm模式,此外看到傳回5條Basic.Ack消息;
測試2:批量confirm模式
這種模式生産者不是每發送一條就等待broker确認,而是發送一批,實作代碼見下:
[java]
view plain
copy
1. public class ProducerTest {
2. public static void main(String[] args) {
3. "confirmExchange";
4. "confirmQueue";
5. "confirmRoutingKey";
6. "confirmRoutingKey";
7. int count = 100;
8.
9. new ConnectionFactory();
10. "172.16.151.74");
11. "test");
12. "test");
13. 5672);
14.
15. //建立生産者
16. new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
17. producer.run();
18. }
19. }
20.
21. class Sender
22. {
23. private ConnectionFactory factory;
24. private int count;
25. private String exchangeName;
26. private String queueName;
27. private String routingKey;
28. private String bindingKey;
29.
30. public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
31. this.factory = factory;
32. this.count = count;
33. this.exchangeName = exchangeName;
34. this.queueName = queueName;
35. this.routingKey = routingKey;
36. this.bindingKey = bindingKey;
37. }
38.
39. public void run() {
40. null;
41. try {
42. Connection connection = factory.newConnection();
43. channel = connection.createChannel();
44. //建立exchange
45. "direct", true, false, null);
46. //建立隊列
47. true, false, false, null);
48. //綁定exchange和queue
49. channel.queueBind(queueName, exchangeName, bindingKey);
50. channel.confirmSelect();
51. //發送持久化消息
52. for(int i = 0;i < count;i++)
53. {
54. //第一個參數是exchangeName(預設情況下代理伺服器端是存在一個""名字的exchange的,
55. //是以如果不建立exchange的話我們可以直接将該參數設定成"",如果建立了exchange的話
56. //我們需要将該參數設定成建立的exchange的名字),第二個參數是路由鍵
57. "第"+(i+1)+"條消息").getBytes());
58. }
59. long start = System.currentTimeMillis();
60. channel.waitForConfirmsOrDie();
61. "執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
62. catch (Exception e) {
63. e.printStackTrace();
64. }
65. }
66. }
第50行調用channel.confirmSelect将目前channel信道設定成confirm模式,接着在第57行通過for循環發送了100條消息,第60行調用了channel的waitForConfirmsOrDie,從waitForConfirmsOrDie方法的注釋上可以看出,該方法會等到最後一條消息得到确認或者得到nack才會結束,也就是說在waitForConfirmsOrDie處會造成目前程式的阻塞,以測試1程式發送100條消息為例,阻塞時間是135ms,我們再來看看對測試1的抓包情況:
從紅色箭頭的标号1出可以看到:首先是24向74發送了Confirm.Select消息表示請求将目前信道設定為confirm模式,接着74向24回送了Confirm.Select-Ok消息表示同意将信道設定成confirm模式,從紅色标号2處NoWait字段的值為false也印證了我們如果直接調用Channel信道的confirmSelect()方法的話,實際上預設是開啟broker回傳Confirm.Select-Ok确認消息的;
接下來我們看看broker回傳給用戶端的确認消息資料包是什麼樣子的呢?同樣通過抓包看看結果:
你會發現,在上面測試1中我們通過for循環發送了100條消息,但是在抓包的時候我們僅僅看到有兩個Basic.Ack确認消息回傳回來,原因在于上面截圖的标号3處,你會發現Multiple域的值是True的,之前我們已經講過broker可以設定Multiple域表示broker已經收到目前确認消息的Delivery-Tag域之前标号的消息,以上面截圖為例的話表示broker告訴發送者編号4之前的消息已經全部收到了,從這點我們看出broker端預設情況下是進行批量回複的,并不是針對每條消息都發送一條ack消息;
測試2:
測試1我們僅僅是測試發送者能夠收到broker的确認消息以及知道了broker對消息預設是采用批量回複方式的,那麼在程式中我們該怎麼擷取到broker回傳回來的确認消息呢,假如我們有時候需要在收到确認消息之後做一些提示性操作該怎麼辦呢?測試1中,我們采用的是Channel信道的waitForConfirmsOrDie等待broker端回傳回ack确認消息的,但我們沒法拿到這個ack消息進行後期操作,要想拿到ack消息的話,我們可以給目前Channel信道綁定監聽器,具體來說就是調用Channel信道的addConfirmListener方法進行設定,Channel信道在收到broker的ack消息之後會回調設定在該信道監聽器上的handleAck方法,在收到nack消息之後會回調設定在該信道監聽器上的handleNack方法。
實作代碼:
[java]
view plain
copy
1. public class ProducerTest {
2. public static void main(String[] args) {
3. "confirmExchange";
4. "confirmQueue";
5. "confirmRoutingKey";
6. "confirmRoutingKey";
7. int count = 100;
8.
9. new ConnectionFactory();
10. "172.16.151.74");
11. "test");
12. "test");
13. 5672);
14.
15. //建立生産者
16. new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
17. producer.run();
18. }
19. }
20.
21. class Sender
22. {
23. private ConnectionFactory factory;
24. private int count;
25. private String exchangeName;
26. private String queueName;
27. private String routingKey;
28. private String bindingKey;
29.
30. public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
31. this.factory = factory;
32. this.count = count;
33. this.exchangeName = exchangeName;
34. this.queueName = queueName;
35. this.routingKey = routingKey;
36. this.bindingKey = bindingKey;
37. }
38.
39. public void run() {
40. null;
41. try {
42. Connection connection = factory.newConnection();
43. channel = connection.createChannel();
44. //建立exchange
45. "direct", true, false, null);
46. //建立隊列
47. true, false, false, null);
48. //綁定exchange和queue
49. channel.queueBind(queueName, exchangeName, bindingKey);
50. channel.confirmSelect();
51. //發送持久化消息
52. for(int i = 0;i < count;i++)
53. {
54. //第一個參數是exchangeName(預設情況下代理伺服器端是存在一個""名字的exchange的,
55. //是以如果不建立exchange的話我們可以直接将該參數設定成"",如果建立了exchange的話
56. //我們需要将該參數設定成建立的exchange的名字),第二個參數是路由鍵
57. "第"+(i+1)+"條消息").getBytes());
58. }
59. long start = System.currentTimeMillis();
60. new ConfirmListener() {
61.
62. @Override
63. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
64. "nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
65. }
66.
67. @Override
68. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
69. "ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
70. }
71. });
72. "執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
73. catch (Exception e) {
74. e.printStackTrace();
75. }
76. }
77. }
第60行我們調用了Channel信道的addConfirmListener設定了監聽器,并且在監聽器的handleAck和handleNack方法中列印了資訊,運作程式檢視輸出:
可以看到,雖然我們還是發送了100條消息,同樣我們并沒有收到100個ack消息 ,隻收到兩個ack消息,并且這兩個ack消息的multiple域都為true,這點和測試1是相同的,你多次運作程式會發現每次發送回來的ack消息中的deliveryTag域的值并不是一樣的,說明broker端批量回傳給發送者的ack消息并不是以固定的批量大小回傳的;
也就是我們通過信道Channel的waitForConfirmsOrDie方法或者為信道設定監聽器都可以保證發送者收到broker回傳的ack或者nack消息,那麼這兩種方式有什麼差別呢?從測試一的第61行代碼以及測試2的第72行代碼處你就能找到答案啦,測試1中調用waitForConfirmsOrDie方法發送100條消息并且全部收到确認需要135ms,測試2中通過監聽器的方式僅僅需要1ms,說明調用waitForConfirmsOrDie會造成程式的阻塞,通過監聽器并不會造成程式的阻塞,下一篇部落格我會試着從RabbitMQ的源碼層面來分析這兩種方式造成這種差別的原因啦啦;
參考資料: