ChannelN是整个RabbitMQ客户端最核心的一个类了,其包含的功能点甚多,这里需要分类阐述。
首先来看看ChannelN的成员变量:
源代码中有关ChannelN的呈现顺序有所不同,这里博主为了区分开来,重新排了序。
在AMQChannel这个抽象类中唯一的抽象方法即为此方法,这个方法主要用来针对接受到broker的AMQCommand进行进一步的处理,至于怎么接受Socket,怎么封装成帧,怎么确定一个AMQComand已经封装完毕,都已在调用此方法前完成。此方法可以处理:Channel.Close, Basic.Deliver, Basic.Return, Channel.Flow, Basic.Ack, Basic.Nack, Basic.RecoverOk, Basic.Cancel, Channel.CloseOk等这些从broker端回传的AMQComand.
这个方法也比较长,下面也会涉及到这个方法内的内容。
和Confirm机制有关的成员变量有:
在使用Confirm机制的时候,首先要置Channel为Confirm模式,即向broker端发送Confirm.Select。
业务代码(DEMO实例):
在创建完Channel之后调用channel.confirmSelect()方法即可,confirmSelect()代码如下:
这里的成员变量nextPublishSeqNo是用来为Confirm机制服务的,当Channel开启Confirm模式的时候,nextPublishSeqNo=1,标记第一条publish的序号,当Publish时:
client端向broker端Basic.Pubish发送消息并将当前的序号加入到unconfirmedSet中,并自加nextPublishSeqNo++等待下一个消息的发送。
之后等待broker的确认回复(Basic.Ack/.Nack):channel.waitForConfirms()
可以看到waitForConfirms其实本质上是在等待unconfirmedSet变成empty,否则就线程wait()。
当接收到broker端的ACK/NACK回复时,一步步的经过处理到达processAsync(Command command)方法,然后进而处理Basic.Ack/.Nack帧。
首先是将相应的Method做一下转换,之后callConfirmListeners(),这个方法是调用成员变量confirmListeners这个list里的所有的ConfirmListener:
这个ConfirmListener的list就需要在channel.basicPushlish()调用之前先:
在调用完ConfirmListener之后继续调用handleAckNack方法:
这个方法本意上是对收到某条消息的ACK或者NACK的处理,发送消息时Basic.Publish的nextPublishNo对应于相应的ACK/NACK的deliveryTag,将其从unconfirmedSet中删除即可,如果有NACK帧,则将其相应的标识onlyAcksReceived设置为false,判断此时unconfirmedSet是否为空,如果条件成立则notifyAll(),将waitForConfirm唤起,返回onlyAcksReceived的状态。
如果channel.waitForConfirm()返回为false,则说明broker没有接受client发送的消息,此时需要在业务代码中做进一步处理,比如重发。
消费者在开启ACK的情况下,对接受到的消息可以根据业务的需要异步对消息进行确认。
然而在实际使用过程中,由于消费者自身处理能力有限,从RabbitMQ获取一定数量的消息好厚,希望rabbitmq不再将队列中的消息推送过来,当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接受来自队列的消息。在这种场景下,我们可以设置Basic.Qos中的prefetch_count来达到这个效果。
与消费有关的成员变量:
源码如下:
这个方法最精简的只要两个参数,即String queue和Consumer callback:public String basicConsume(String queue, Consumer callback)。
当发送Basic.Consume帧之后,由broker返回的是Basic.ConsumeOk帧+Basic.Deliver帧,Basic.ConsumerOk帧由上面方法处理,Basic.Deliver帧由processAsync处理。
dispatcher.handleConsumeOk(callback, actualConsumerTag);这段代码实际上就是:callback.handleConsumeOk(actualConsumerTag),这个还是调用到Consumer的方法处理。
上面的Basic.Consume是基于push模式的,而Basic.Get是基于pull模式的。相关的代码如下:
基本上就是客户端发送Basic.Get至Broker,Broker返回Basic.GetOK并携带数据。注意方法最后返回GetResponse对象,这个对象就是包装了一下数据。
和事务有关的代码:
ChannelN还有:
关于Exchange,Queue的申明创建,删除,绑定,解绑
关闭处理
Basic.Return
Basic.Flow
Basic.Recover
Basic.Cancel
Basic.Ack/.Nack/.Reject
这些就不做详细介绍了。有兴趣的同学可以继续翻阅源码,这些都比较简单。
<a href="http://blog.csdn.net/u013256816/article/details/70214929">[Conclusion]RabbitMQ-客户端源码之总结</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214708">[一]RabbitMQ-客户端源码之ConnectionFactory</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214730">[二]RabbitMQ-客户端源码之AMQConnection</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214754">[三]RabbitMQ-客户端源码之ChannelManager</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214776">[四]RabbitMQ-客户端源码之Frame</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214791">[五]RabbitMQ-客户端源码之AMQChannel</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214811">[六]RabbitMQ-客户端源码之AMQCommand</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214835">[七]RabbitMQ-客户端源码之AMQPImpl+Method</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214863">[八]RabbitMQ-客户端源码之ChannelN</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214903">[九]RabbitMQ-客户端源码之Consumer</a>