天天看点

[二]RabbitMQ-客户端源码之AMQConnection

首先来看看start()方法的源码,这个方法有点长,这里拆开来一一分析,首先是注释:

首先来看看方法上的注释说了什么:

方法的作用是启动连接(start up the connection), 包括启动MainLoop线程,这个MainLoop线程主要是和broker进行通信交互处理通信帧(Frame)的一个线程(非常的重要!!!)。

这个方法会在建立连接的初始化阶段(negotiation)会进行Connection.Start/.StartOk, Connection.Tune/.TuneOk, 调用Connection.Open之后再等待Conenction.OpenOk(这里的都是指AMQP协议层面的),这个可以参考本文中第一张使用wireshark抓包的网络截图,一一对应的关系。

通过broker回复的Connection.Tune帧(帧中包含Channel-Max, Frame-Max, Heartbeat三个参数)设置channelMax, frameMax以及Heartbeat的参数值。

一些异常情况。

首先是初始化工作线程池(initializeConsumerWorkService)和初始化心跳线程(initializeHeartbeatSender)并设置运行状态为true(this._isrunning=true,这个值会在MainLoop线程中有用,控制MainLoop线程是否继续运行)。

“AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation();”这句代码,从命名上来说像是rpc, 其实这么理解也没错。RabbitMQ-Client这个版本(3.5.3)的客户端与broker端的通信是采用java原生socket.当然后面也改成了NIO,这个自然是后话。RabbitMQ-Client程序中会对各种帧进行处理,处理的方式也不是单一化的,这里举Connection.Start这个类型的报文做分析。当broker发送Connection.Start至client端,client收到之后进行处理(MainLoop线程中),然后将此报文存入SimpleBlockingRpcContinuation中,照着SimpleBlockingRpcContinuation深究下去,其就是一个容量为1的BlockingQueue,也就是当MainLoop主导的线程将收到的Connection.Start存入其中,然后AMQConnction类的start()线程在等待(start()方法下面的代码):

然后继续处理。这看上去也算是个rpc,等待别的线程(这个线程同样在等待broker的返回)处理完毕。

AMQCommand(这个之后会讲到), 下面的“_channel0.enqueueRpc(connStartBlocker)”将这个rpc任务放入Channel中,如果深入代码看的话,channel中当前至多只能enqueue一个rpc,如果当前的rpc没有处理完再enqueue的话会被阻塞(wait())直到处理完当前的rpc才能enqueue下一个rpc。

接下来“_frameHandler.sendHeader()”主要是发送Protocol-Header 0-9-1帧(可参考下图),这个客户端与broker建立连接的AMQP协议的第一帧,帧中的内容包括AMQP的版本号。这里发_frameHandler就是前面Connection提到的SocketFrameHandler对象,我们来看看sendHeader()做了什么:

上面这段对照着下图一目了然:

[二]RabbitMQ-客户端源码之AMQConnection

接着回顾MainLoop, 在start()方法中关于MainLoop的代码主要有:

这段代码主要是初始化MainLoop线程对象,然后让其运行。没有什么特别之处,而特别之处在于MainLoop本身。

MainLoop类是AMQConnection类的私有内部类:

MainLoop线程当读取到通信帧之后,判断是否是心跳帧,如果是则忽略继续监听。如果是其他帧,则判断其frame.channel值是否为0,frame.channel值为0代表的是特殊帧,这些特殊帧是和Connection有关的,而不是和Channel有关的(上面代码里的frame.channel就是Channel里的channel number, 一般Connection类型的帧的channel number为0,而其余Channel类别帧的channel number大于0。)

这里就分channel_number=0和channel_number !=0分别进行处理。

当channel_number=0即frame.channel=0则直接调用_channel0的handleFrame方法。

这个_channel0是在AMQConnection类中创建的私有变量:

进而调用AMQChannel的handleCompleteInboundCommand(command)方法:

进而调用AMQChannel的processAsync方法。这个方法在AMQChannel类中是一个抽象方法,而观察AMQConnection中的AMQChannel _channel0私有变量其正好实现了这个方法:

进而调用了AMQConnection的processControlCommand方法:

这个方法是用来处理AMQP控制命令的:Connection.Close/CloseOk, Connection.Blocked/.Unblocked。正常情况下(比如Connection.Start/.StartOk)直接返回false。

我们假设有一个可靠的面向流的网络传输层(TCP/IP或相当)。在单个套接字连接中,可以存在多个独立控制线程,这些称之为通道。每个帧都使用通道编号来编号。通过交织他们的帧,不同的通道共享连接。对于给定的通道,帧运行在一个严格的序列,这样可以用来驱动一个协议解析器(通常是一个状态机)。

当channel_number!=0则需要从ChannelManager中根据channel number找出相应的AMQChannel再调用handleFrame方法处理。

这里的ChannelManager从何而来?

这里就还是要到AMQChannel的start()方法来看,有这么一句:_channelManager = instantiateChannelManager(channelMax, threadFactory);

ChannelManager构造方法中的ConsumerWorkService参数就是AMQConnection中start()方法第一行代码初始化的ConsumerWorkService对象。

当channel number等于0的时候是调用AMQChannel,也可以说是AQMConnection的内部成员变量AMQChannel _channel0来处理。

<a href="http://blog.csdn.net/u013256816/article/details/70214929">[Conclusion]RabbitMQ-客户端源码之总结</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214708">[一]RabbitMQ-客户端源码之ConnectionFactory</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214730">[二]RabbitMQ-客户端源码之AMQConnection</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214754">[三]RabbitMQ-客户端源码之ChannelManager</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214776">[四]RabbitMQ-客户端源码之Frame</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214791">[五]RabbitMQ-客户端源码之AMQChannel</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214811">[六]RabbitMQ-客户端源码之AMQCommand</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214835">[七]RabbitMQ-客户端源码之AMQPImpl+Method</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214863">[八]RabbitMQ-客户端源码之ChannelN</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214903">[九]RabbitMQ-客户端源码之Consumer</a>