天天看點

[六]RabbitMQ-用戶端源碼之AMQCommand

AMQCommand是用來處理AMQ指令的,其包含了Method, Content Heaeder和Content Body.

下面是通過wireshark抓包的AMQP協定

[六]RabbitMQ-用戶端源碼之AMQCommand

上圖中的Basic.Publish指令就包含Method, Content header以及Content body。

AMQCommand不是直接包含Method等成員變量的,而是通過CommandAssembler又做了一次封裝。

接下來先看下CommandAssembler類。此類中有這些成員變量:

CAState state辨別這此Command目前的狀态,是準備處理Method(EXPECTING_METHOD),還是處理Content header(EXPECTING_CONTENT_HEADER),還是準備處理Content body(EXPECTING_CONTENT_BODY),還是以及完成了(COMPLETE)。

Method method代表type=Method的AMQP幀

AMQContentHeader contentHeader代表type=Content header的AMQP幀

final List<byte[]> bodyN代表type=Content body的AMQP幀,就是真正的消息體(Message body)。

bodyLength就是消息體大小

這個類中除了構造函數,getMethod, getContentHeader, getContentBody,isComplete這個幾個方法,最關鍵的方法就是:

這個方法主要是處理AQMP幀的,根據CAState state來處理相應狀态類型的幀,然後指派給相應的成員變量。

采用consumeMethodFrame(Frame f)方法舉個例子:

這個方法首先判斷目前幀是否是Method幀(AMQP.FRAME_METHOD),然後調用AMQPImp.readMethodFrom的方法。就那Connection.Start這個真來将,它會從socket的輸入流中讀取:

對應于下圖:

[六]RabbitMQ-用戶端源碼之AMQCommand

第一個rdr.readOctet()是指Version-Magor:0

第二個rdr.readOctet()是指Version-Minor:9

第三個rdr.readTable()是指Server-Properties

第四個rdr.readLongstr()是指Mechanisms

第五個rdr.readLongstr()是指Locales

而MethodArgumentReader.readOctet()就是:

寫到這裡,思路再跳回來,知道了底層其實是Socket的DataInputStream,其上隻是做了封裝再封裝

CommandAssembler 中的handleFrame這個方法隻在AMQCommand中的:

隻在這個方法中調用。CommandAssembler隻是對Method,Content-Header,Content-Body做了一下封裝。下面繼續回到AMQCommand這個類中來。

仔細閱讀源碼的同學會發現在handleFrame方法當遇到類似Basic.Publish時會有Method,Content-Header,Content-Body一起的封包,那麼handleFrame處理完Method之後就直接傳回了,沒有完全處理完,這該如何是好?

這個就又要聯系到AMQConnection中的MainLoop的内部類了。此類中的關鍵代碼如下:

可以看到這是一個一直輪詢讀取Frame并處理Frame的過程。在遇到類似Basic.Publish這種帶Method, Content-Header, Content-Body的類型的封包時,會循環處理,直到處理完成。注意這裡的Method, Content-Header以及Content-Body都是看成單個Frame的,也就是這個while循環要三次,而不是将Basic.Publish看成一個幀。

可以看到隻有當AMQCommand的handleFrame方法傳回true時,即執行完成之後才會繼續處理。

AMQCommand也有getMethod, getContentHeader, getContentBody等方法,這些都是間接調用CommandAssembler類中相應的方法的。

AMQCommand中也有個特别重要的方法:

這段主要通過傳輸AMQP幀的,通過AMQChannel擷取到通信鍊路connection,然後将AMQCommand對象自身的method成員變量(或者包括content-header以及content-body)傳送給broker。這段方法裡還有判斷payload大小是否超過broker端所設定的最大幀大小frameMax,即(frameMax == 0) ? body.length : frameMax - EMPTY_FRAME_SIZE這段代碼。當frameMax=0時則沒有大小限制,當frameMax不為0時則按照payload拆分成若幹的payload然後發送多個FRAME_BODY幀。

<a href="http://blog.csdn.net/u013256816/article/details/70214929">[Conclusion]RabbitMQ-用戶端源碼之總結</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214708">[一]RabbitMQ-用戶端源碼之ConnectionFactory</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214730">[二]RabbitMQ-用戶端源碼之AMQConnection</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214754">[三]RabbitMQ-用戶端源碼之ChannelManager</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214776">[四]RabbitMQ-用戶端源碼之Frame</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214791">[五]RabbitMQ-用戶端源碼之AMQChannel</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214811">[六]RabbitMQ-用戶端源碼之AMQCommand</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214835">[七]RabbitMQ-用戶端源碼之AMQPImpl+Method</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214863">[八]RabbitMQ-用戶端源碼之ChannelN</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214903">[九]RabbitMQ-用戶端源碼之Consumer</a>