天天看點

RabbitMQ的Java用戶端API指南

目前RabbitMQ官方的Java用戶端版本已升至5.0.0,5.x系列的版本需要JDK 8支援,4.x系列的版本支援JDK 6。

1、綜述 RabbitMQ Java用戶端使用com.rabbitmq.client作為頂層包。有4個關鍵類和接口:Channel、Connection、ConnectionFactory、Consumer。其中,Channel提供了各種AMQP 0-9-1的協定操作;Connection用于建立信道、注冊連接配接的生命周期事件處理器、關閉連接配接;ConnectionFactory用于擷取Connection,并提供連接配接配置功能,如配置連接配接的虛拟主機、使用者名等。

2、連接配接到消息代理 [1] 建立ConnectionFactory [2] 設定ConnectionFactory,如主機、端口、使用者名、密碼、虛拟主機等 [3] 擷取Connection [4] 建立信道 [5] 利用信道進行各種協定操作... [6] 關閉信道 [7] 關閉連接配接 說明:在設定ConnectionFactory時,可調用方法或采用URI形式進行設定。另外,當連接配接關閉時,會自動關閉信道,是以在關閉連接配接前關閉信道不是嚴格必須的,但通常是一種好的實踐。

3、使用交換器和隊列 在使用交換器和隊列前必須進行聲明,聲明是為了確定在使用前這些交換器或隊列存在,如果不存在的話會建立它們。 [m] exchangeDeclare [m] queueDeclare [m] queueBind 上述三種方法都提供了若幹不同形參數目的重載版本,這種設計在整個用戶端API中會經常遇到。

4、釋出消息 [m] basicPublish [m] AMQP.BasicProperties#builder 說明:如果消息代理發生“資源不足告警”(記憶體使用過大或磁盤空間不足),那麼Channel#basicPublish會阻塞。

5、信道和并發 根據經驗,不應該線上程間共享Channel執行個體,應該一個線程一個Channel。

多個線程在同一Channel上并發釋出消息時,可能産生不正确的幀交錯,由此産生連接配接層協定異常并導緻連接配接關閉。線程間共用一個Channel也會幹擾生産者确認。是以應該避免多個線程在同一Channel上并發釋出消息。但如果共享同一信道的兩個線程,一個是生産者、一個是消費者,則是安全的。

消息伺服器以push方式投遞消息是并發進行的,并提供每個信道上消息接收順序保證(和消息發送順序相同)。消息伺服器的并發投遞是通過給每個連接配接配置一個ExecutorService實作的,也可以通過設定ConnectionFactory來自定義執行器,自定義的執行器會在從該連接配接工廠擷取的連接配接間共享。

當消費者手動确認時,要注意是哪個線程進行确認。如果不是接收到消息的線程進行确認,則批量确認可能産生重複确認,由此産生信道層協定異常并導緻信道關閉。一次隻确認一個消息的則是安全的。

說明:RabbitMQ具有記憶體和磁盤告警功能,設計告警的目的是為了阻塞生産者、讓消費者繼續消費。然而,AMQP允許生成者和消費者在同一個信道上操作、也允許在同一連接配接的不同信道上操作,這種設計是不完美的。在實際中,對大部分應用而言不會造成大問題,因為告警調節隻是表現為延遲。不過,其他設計也是允許的,建議對生産者和消費者使用不同的連接配接。

6、訂閱消息(push API) 消費者訂閱消息時,用消費者标簽(tag)來區分不同的訂閱,是以Consumer接口中的方法都必須傳入消費者标簽字元串。消費者标簽可以由用戶端産生,也可以由服務端提供。如果希望使用服務端産生的消費者标簽(單個節點内具有唯一性),使用不帶消費者标簽參數的Channel#basicConsume方法或傳入一個空字元串作為消費者标簽參數,從Channel#basicConsume方法的傳回值中就能擷取到(服務端産生的)消費者标簽。可以用該标簽來取消消費者。

不同的消費者執行個體必須使用不同的消費者标簽。如果在一個連接配接上使用重複的消費者标簽,可能導緻自動連接配接恢複有問題并且在監視消費者時不易區分。

子類化(用起來很便利的類)DefaultConsumer即可實作一個Consumer,将該匿名内部類執行個體傳入Channel#basicConsume方法即可建立一個訂閱。Consumer中的方法都是回調方法,即當某些事件發生時就會被自動調用,包括:handleDelivery、handleShowdonwSingle、handleConsumeOk、handleCacelOk、handleCancle等。可以通過Channel#basicCancel取消某個消費者。

在Consumer的回調方法中,可以執行Connection或Channel的阻塞方法,因為對Consumer的回調是線上程池中執行的,不是在執行個體化Channel的線程中執行的。每一個Channel都有自己的分發線程。一般情況下一個信道上一個消費者,這樣的話就不會影響其他消費者。如果一個信道上有多個消費者,當其中某個消費者耗時較長的話,會影響對其他消費者上回調(事件處理任務)的分發。

7、擷取單個消息(pull API) Channel#basicGet,支援自動确認和手動确認。

8、處理不可路由的消息 如果釋出消息時設定了 mandatory辨別(flag),但是不能被路由,消息代理會将其退回給生産者。為了能夠得到退回通知,生産者需要在信道上添加一個ReturnListener對象(Channel#addReturnListener),不然該消息就會被無聲地丢掉。

9、關閉協定 在AMQP 0-9-1協定中,連接配接和信道采用相同的方式來處理網絡失敗、内部失敗以及明确的請求關閉。它們都具有3種生命周期狀态:open、closing、closed。無論是應用請求關閉、用戶端庫錯誤、網絡請求關閉還是網絡錯誤等,這些對象最終都會處于closed狀态。

在AMQP連接配接和信道對象中,具有如下與關閉相關的方法: [m] addShutdownListener/removeShutdownListener [m] getCloseReason [m] isOpen [m] close

ShutdownSignalException對象提供了分析關閉原因的方法。該對象可以通過getCloseReason擷取,或者通過ShutdownListener#shutdownCompleted的參數進行通路。

在生産環境代碼中應該忽略isOpen方法,因為容易發生競态條件(race conditions),使用者代碼無法實作原子性。

10、進階連接配接選項 在ConnectionFactory和Connection類中提供了一些方法用于自定義連接配接行為,滿足進階定制需求。 * 定制消費者線程池 * 使用主機清單 * 支援服務發現 * 設定心跳檢測逾時 * Java NIO支援

11、網絡異常時自動恢複 當RabbitMQ節點和用戶端之間的網絡連接配接發生異常時,用戶端可以自動恢複連接配接和拓撲(隊列、交換器、綁定和消費者)。自動恢複過程如下: [1] 重連 [2] 恢複連接配接上的監聽器 [3] 重新打開信道 [4] 恢複信道上的監聽器 [5] 恢複信道的basic.qos設定,釋出者确認和事務設定 拓撲恢複(對每個信道進行)過程如下: [1] 重新聲明交換器 [2] 重新聲明隊列 [3] 恢複所有綁定 [4] 恢複所有消費者

如果自動恢複失敗(如RabbitMQ節點仍未正常工作),用戶端就會每隔一段時間重試,該時間間隔固定,預設為5秒。如果建立連接配接時使用了主機清單,那麼會将清單打亂并挨個重試。如果打開了自動連接配接恢複功能,可以在連接配接和信道上注冊恢複監聽器來處理恢複事件。

當連接配接斷開時,使用Channel#basicPublish釋出的消息會丢失。在連接配接恢複後,用戶端并不會将它們重新入隊進行再次釋出。要想確定釋出的消息到達RabbitMQ消息代理,需要使用釋出者确認并對連接配接失敗做相應處理。

當連接配接斷開時,需要過一段時間才能偵測到。是以存在一個時間視窗,Java用戶端和應用程式都沒有意識到連接配接斷開。在這個時間視窗中,釋出的消息仍然會被序列化并寫入TCP套接字。它們要想投遞成隻能靠生成者确認(屬于協定擴充)來保證,因為在AMQP 0-9-1中将消息釋出完全設計為異步方式。

當開啟了自動恢複功能的連接配接檢測到套接字或I/O操作錯誤,就會在一段時間後(預設5秒)開始恢複。之是以這樣設計,是考慮到雖然很多網絡失敗是瞬态的且一般隻存在很短的時間,但并不會馬上消失。自動連接配接恢複會以固定的時間間隔進行重試直到建立新連接配接。 當連接配接處于正在恢複狀态時,在該連接配接的信道上進行消息釋出會抛出異常。此時用戶端不會對這些消息進行任何緩存,應該由應用開發者來記錄這些消息并在恢複成功後重新釋出。如果生産者不能承受消息丢失的風險,就應該使用生産者确認來確定消息釋出成功。 如果因為信道級異常造成信道關閉,不會觸發自動連接配接恢複。這樣的異常一般說明是應用級有問題,Java用戶端庫無法做出一個知情的決定。

當使用消費者(手動)确認時,在消息發送到消費者後、确認完成前,從消費者到RabbitMQ節點的網絡連接配接可能會斷開。在自動連接配接恢複後,RabbitMQ會重置所有信道上的投遞标簽,此時使用老的投遞标簽進行basic.ack、basic.nack和basic.reject會産生信道異常。為了避免異常發生,RabbitMQ會記錄和更新投遞标簽確定在恢複前後它們是單調增加的。Channel#basicAck、Channel#basicNack和Chanel#basicReject會将調整後的投遞标簽轉換為RabbitMQ使用的投遞标簽。使用老投遞标簽的确認不會被發送。使用消費者确認和自動恢複的消費者必須能夠處理二次投遞。

12、未處理的異常 關于連接配接、信道、恢複和消費者生命周期的未處理異常都交由異常處理器完成,使用ConnectionFactory#setExceptionHandler設定。