天天看點

[九]RabbitMQ-用戶端源碼之Consumer

這裡先來看下消費者用戶端的關鍵代碼:

可以看到QueueingConsumer作為channel.basicConsume的回調函數,之後再進行處理。

在AMQConnection中有關MainLoop的主線程,專門用來”第一線”的處理Broker發送回用戶端從幀。當Basic.Consume/.ConsumeOk開啟消費模式之後,Broker主動的向用戶端發送Basic.Delivery幀,MainLoop線程一步步的調用,最後到ChannelN的processAsync()方法中有:

之後調用processDelivery方法:

這個方法首先根據consumerTag從ChannelN中的_consumer這個HashMap中擷取相應的Consumer回調函數,然後調用這個回調函數的handleDeliver()方法進行處理,這裡有些同學會有疑問,明明是調用ConsumerDispatcher dispatcher的handleDeliver()方法,其實這裡隻是包了一層皮,ConsumerDispatcher的handleDeliver()方法就是調用了Consumer的handleDeliver()方法。

我們接下去看看QueueingConsumer這個實作Consumer接口的類是怎麼處理的:

這裡的queue就是一個LinkedBlockingQueue,用戶端程式通過調用nextDelivery()方法來擷取資料:

這個nextDelivery方法說白就是一個LinkedBlockingQueue的take()操作,也就是一個可能會阻塞等待的操作。

<a href="http://blog.csdn.net/u013256816/article/details/70214929">[Conclusion]RabbitMQ-用戶端源碼之總結</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214708">[一]RabbitMQ-用戶端源碼之ConnectionFactory</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214730">[二]RabbitMQ-用戶端源碼之AMQConnection</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214754">[三]RabbitMQ-用戶端源碼之ChannelManager</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214776">[四]RabbitMQ-用戶端源碼之Frame</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214791">[五]RabbitMQ-用戶端源碼之AMQChannel</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214811">[六]RabbitMQ-用戶端源碼之AMQCommand</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214835">[七]RabbitMQ-用戶端源碼之AMQPImpl+Method</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214863">[八]RabbitMQ-用戶端源碼之ChannelN</a>

<a href="http://blog.csdn.net/u013256816/article/details/70214903">[九]RabbitMQ-用戶端源碼之Consumer</a>

繼續閱讀