天天看點

[八]RabbitMQ-用戶端源碼之ChannelN

ChannelN是整個RabbitMQ用戶端最核心的一個類了,其包含的功能點甚多,這裡需要分類闡述。

首先來看看ChannelN的成員變量:

源代碼中有關ChannelN的呈現順序有所不同,這裡部落客為了區分開來,重新排了序。

在AMQChannel這個抽象類中唯一的抽象方法即為此方法,這個方法主要用來針對接受到broker的AMQCommand進行進一步的處理,至于怎麼接受Socket,怎麼封裝成幀,怎麼确定一個AMQComand已經封裝完畢,都已在調用此方法前完成。此方法可以處理:Channel.Close, Basic.Deliver, Basic.Return, Channel.Flow, Basic.Ack, Basic.Nack, Basic.RecoverOk, Basic.Cancel, Channel.CloseOk等這些從broker端回傳的AMQComand.

這個方法也比較長,下面也會涉及到這個方法内的内容。

和Confirm機制有關的成員變量有:

在使用Confirm機制的時候,首先要置Channel為Confirm模式,即向broker端發送Confirm.Select。

業務代碼(DEMO執行個體):

在建立完Channel之後調用channel.confirmSelect()方法即可,confirmSelect()代碼如下:

這裡的成員變量nextPublishSeqNo是用來為Confirm機制服務的,當Channel開啟Confirm模式的時候,nextPublishSeqNo=1,标記第一條publish的序号,當Publish時:

client端向broker端Basic.Pubish發送消息并将目前的序号加入到unconfirmedSet中,并自加nextPublishSeqNo++等待下一個消息的發送。

之後等待broker的确認回複(Basic.Ack/.Nack):channel.waitForConfirms()

可以看到waitForConfirms其實本質上是在等待unconfirmedSet變成empty,否則就線程wait()。

當接收到broker端的ACK/NACK回複時,一步步的經過處理到達processAsync(Command command)方法,然後進而處理Basic.Ack/.Nack幀。

首先是将相應的Method做一下轉換,之後callConfirmListeners(),這個方法是調用成員變量confirmListeners這個list裡的所有的ConfirmListener:

這個ConfirmListener的list就需要在channel.basicPushlish()調用之前先:

在調用完ConfirmListener之後繼續調用handleAckNack方法:

這個方法本意上是對收到某條消息的ACK或者NACK的處理,發送消息時Basic.Publish的nextPublishNo對應于相應的ACK/NACK的deliveryTag,将其從unconfirmedSet中删除即可,如果有NACK幀,則将其相應的辨別onlyAcksReceived設定為false,判斷此時unconfirmedSet是否為空,如果條件成立則notifyAll(),将waitForConfirm喚起,傳回onlyAcksReceived的狀态。

如果channel.waitForConfirm()傳回為false,則說明broker沒有接受client發送的消息,此時需要在業務代碼中做進一步處理,比如重發。

消費者在開啟ACK的情況下,對接受到的消息可以根據業務的需要異步對消息進行确認。

然而在實際使用過程中,由于消費者自身處理能力有限,從RabbitMQ擷取一定數量的消息好厚,希望rabbitmq不再将隊列中的消息推送過來,當對消息處理完後(即對消息進行了ack,并且有能力處理更多的消息)再接受來自隊列的消息。在這種場景下,我們可以設定Basic.Qos中的prefetch_count來達到這個效果。

與消費有關的成員變量:

源碼如下:

這個方法最精簡的隻要兩個參數,即String queue和Consumer callback:public String basicConsume(String queue, Consumer callback)。

當發送Basic.Consume幀之後,由broker傳回的是Basic.ConsumeOk幀+Basic.Deliver幀,Basic.ConsumerOk幀由上面方法處理,Basic.Deliver幀由processAsync處理。

dispatcher.handleConsumeOk(callback, actualConsumerTag);這段代碼實際上就是:callback.handleConsumeOk(actualConsumerTag),這個還是調用到Consumer的方法處理。

上面的Basic.Consume是基于push模式的,而Basic.Get是基于pull模式的。相關的代碼如下:

基本上就是用戶端發送Basic.Get至Broker,Broker傳回Basic.GetOK并攜帶資料。注意方法最後傳回GetResponse對象,這個對象就是包裝了一下資料。

和事務有關的代碼:

ChannelN還有:

關于Exchange,Queue的申明建立,删除,綁定,解綁

關閉處理

Basic.Return

Basic.Flow

Basic.Recover

Basic.Cancel

Basic.Ack/.Nack/.Reject

這些就不做詳細介紹了。有興趣的同學可以繼續翻閱源碼,這些都比較簡單。

<a href="http://blog.csdn.net/u013256816/article/details/70214929">[Conclusion]RabbitMQ-用戶端源碼之總結</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214708">[一]RabbitMQ-用戶端源碼之ConnectionFactory</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214730">[二]RabbitMQ-用戶端源碼之AMQConnection</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214754">[三]RabbitMQ-用戶端源碼之ChannelManager</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214776">[四]RabbitMQ-用戶端源碼之Frame</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214791">[五]RabbitMQ-用戶端源碼之AMQChannel</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214811">[六]RabbitMQ-用戶端源碼之AMQCommand</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214835">[七]RabbitMQ-用戶端源碼之AMQPImpl+Method</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214863">[八]RabbitMQ-用戶端源碼之ChannelN</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214903">[九]RabbitMQ-用戶端源碼之Consumer</a>

繼續閱讀