AMQCommand是用来处理AMQ命令的,其包含了Method, Content Heaeder和Content Body.
下面是通过wireshark抓包的AMQP协议

上图中的Basic.Publish命令就包含Method, Content header以及Content body。
AMQCommand不是直接包含Method等成员变量的,而是通过CommandAssembler又做了一次封装。
接下来先看下CommandAssembler类。此类中有这些成员变量:
CAState state标识这此Command目前的状态,是准备处理Method(EXPECTING_METHOD),还是处理Content header(EXPECTING_CONTENT_HEADER),还是准备处理Content body(EXPECTING_CONTENT_BODY),还是以及完成了(COMPLETE)。
Method method代表type=Method的AMQP帧
AMQContentHeader contentHeader代表type=Content header的AMQP帧
final List<byte[]> bodyN代表type=Content body的AMQP帧,就是真正的消息体(Message body)。
bodyLength就是消息体大小
这个类中除了构造函数,getMethod, getContentHeader, getContentBody,isComplete这个几个方法,最关键的方法就是:
这个方法主要是处理AQMP帧的,根据CAState state来处理相应状态类型的帧,然后赋值给相应的成员变量。
采用consumeMethodFrame(Frame f)方法举个例子:
这个方法首先判断当前帧是否是Method帧(AMQP.FRAME_METHOD),然后调用AMQPImp.readMethodFrom的方法。就那Connection.Start这个真来将,它会从socket的输入流中读取:
对应于下图:
第一个rdr.readOctet()是指Version-Magor:0
第二个rdr.readOctet()是指Version-Minor:9
第三个rdr.readTable()是指Server-Properties
第四个rdr.readLongstr()是指Mechanisms
第五个rdr.readLongstr()是指Locales
而MethodArgumentReader.readOctet()就是:
写到这里,思路再跳回来,知道了底层其实是Socket的DataInputStream,其上只是做了封装再封装
CommandAssembler 中的handleFrame这个方法只在AMQCommand中的:
只在这个方法中调用。CommandAssembler只是对Method,Content-Header,Content-Body做了一下封装。下面继续回到AMQCommand这个类中来。
仔细阅读源码的同学会发现在handleFrame方法当遇到类似Basic.Publish时会有Method,Content-Header,Content-Body一起的报文,那么handleFrame处理完Method之后就直接返回了,没有完全处理完,这该如何是好?
这个就又要联系到AMQConnection中的MainLoop的内部类了。此类中的关键代码如下:
可以看到这是一个一直轮询读取Frame并处理Frame的过程。在遇到类似Basic.Publish这种带Method, Content-Header, Content-Body的类型的报文时,会循环处理,直到处理完成。注意这里的Method, Content-Header以及Content-Body都是看成单个Frame的,也就是这个while循环要三次,而不是将Basic.Publish看成一个帧。
可以看到只有当AMQCommand的handleFrame方法返回true时,即执行完成之后才会继续处理。
AMQCommand也有getMethod, getContentHeader, getContentBody等方法,这些都是间接调用CommandAssembler类中相应的方法的。
AMQCommand中也有个特别重要的方法:
这段主要通过传输AMQP帧的,通过AMQChannel获取到通信链路connection,然后将AMQCommand对象自身的method成员变量(或者包括content-header以及content-body)传送给broker。这段方法里还有判断payload大小是否超过broker端所设置的最大帧大小frameMax,即(frameMax == 0) ? body.length : frameMax - EMPTY_FRAME_SIZE这段代码。当frameMax=0时则没有大小限制,当frameMax不为0时则按照payload拆分成若干的payload然后发送多个FRAME_BODY帧。
<a href="http://blog.csdn.net/u013256816/article/details/70214929">[Conclusion]RabbitMQ-客户端源码之总结</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214708">[一]RabbitMQ-客户端源码之ConnectionFactory</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214730">[二]RabbitMQ-客户端源码之AMQConnection</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214754">[三]RabbitMQ-客户端源码之ChannelManager</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214776">[四]RabbitMQ-客户端源码之Frame</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214791">[五]RabbitMQ-客户端源码之AMQChannel</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214811">[六]RabbitMQ-客户端源码之AMQCommand</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214835">[七]RabbitMQ-客户端源码之AMQPImpl+Method</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214863">[八]RabbitMQ-客户端源码之ChannelN</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214903">[九]RabbitMQ-客户端源码之Consumer</a>