首先來看看start()方法的源碼,這個方法有點長,這裡拆開來一一分析,首先是注釋:
首先來看看方法上的注釋說了什麼:
方法的作用是啟動連接配接(start up the connection), 包括啟動MainLoop線程,這個MainLoop線程主要是和broker進行通信互動處理通信幀(Frame)的一個線程(非常的重要!!!)。
這個方法會在建立連接配接的初始化階段(negotiation)會進行Connection.Start/.StartOk, Connection.Tune/.TuneOk, 調用Connection.Open之後再等待Conenction.OpenOk(這裡的都是指AMQP協定層面的),這個可以參考本文中第一張使用wireshark抓包的網絡截圖,一一對應的關系。
通過broker回複的Connection.Tune幀(幀中包含Channel-Max, Frame-Max, Heartbeat三個參數)設定channelMax, frameMax以及Heartbeat的參數值。
一些異常情況。
首先是初始化工作線程池(initializeConsumerWorkService)和初始化心跳線程(initializeHeartbeatSender)并設定運作狀态為true(this._isrunning=true,這個值會在MainLoop線程中有用,控制MainLoop線程是否繼續運作)。
“AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation();”這句代碼,從命名上來說像是rpc, 其實這麼了解也沒錯。RabbitMQ-Client這個版本(3.5.3)的用戶端與broker端的通信是采用java原生socket.當然後面也改成了NIO,這個自然是後話。RabbitMQ-Client程式中會對各種幀進行處理,處理的方式也不是單一化的,這裡舉Connection.Start這個類型的封包做分析。當broker發送Connection.Start至client端,client收到之後進行處理(MainLoop線程中),然後将此封包存入SimpleBlockingRpcContinuation中,照着SimpleBlockingRpcContinuation深究下去,其就是一個容量為1的BlockingQueue,也就是當MainLoop主導的線程将收到的Connection.Start存入其中,然後AMQConnction類的start()線程在等待(start()方法下面的代碼):
然後繼續處理。這看上去也算是個rpc,等待别的線程(這個線程同樣在等待broker的傳回)處理完畢。
AMQCommand(這個之後會講到), 下面的“_channel0.enqueueRpc(connStartBlocker)”将這個rpc任務放入Channel中,如果深入代碼看的話,channel中目前至多隻能enqueue一個rpc,如果目前的rpc沒有處理完再enqueue的話會被阻塞(wait())直到處理完目前的rpc才能enqueue下一個rpc。
接下來“_frameHandler.sendHeader()”主要是發送Protocol-Header 0-9-1幀(可參考下圖),這個用戶端與broker建立連接配接的AMQP協定的第一幀,幀中的内容包括AMQP的版本号。這裡發_frameHandler就是前面Connection提到的SocketFrameHandler對象,我們來看看sendHeader()做了什麼:
上面這段對照着下圖一目了然:

接着回顧MainLoop, 在start()方法中關于MainLoop的代碼主要有:
這段代碼主要是初始化MainLoop線程對象,然後讓其運作。沒有什麼特别之處,而特别之處在于MainLoop本身。
MainLoop類是AMQConnection類的私有内部類:
MainLoop線程當讀取到通信幀之後,判斷是否是心跳幀,如果是則忽略繼續監聽。如果是其他幀,則判斷其frame.channel值是否為0,frame.channel值為0代表的是特殊幀,這些特殊幀是和Connection有關的,而不是和Channel有關的(上面代碼裡的frame.channel就是Channel裡的channel number, 一般Connection類型的幀的channel number為0,而其餘Channel類别幀的channel number大于0。)
這裡就分channel_number=0和channel_number !=0分别進行處理。
當channel_number=0即frame.channel=0則直接調用_channel0的handleFrame方法。
這個_channel0是在AMQConnection類中建立的私有變量:
進而調用AMQChannel的handleCompleteInboundCommand(command)方法:
進而調用AMQChannel的processAsync方法。這個方法在AMQChannel類中是一個抽象方法,而觀察AMQConnection中的AMQChannel _channel0私有變量其正好實作了這個方法:
進而調用了AMQConnection的processControlCommand方法:
這個方法是用來處理AMQP控制指令的:Connection.Close/CloseOk, Connection.Blocked/.Unblocked。正常情況下(比如Connection.Start/.StartOk)直接傳回false。
我們假設有一個可靠的面向流的網絡傳輸層(TCP/IP或相當)。在單個套接字連接配接中,可以存在多個獨立控制線程,這些稱之為通道。每個幀都使用通道編号來編号。通過交織他們的幀,不同的通道共享連接配接。對于給定的通道,幀運作在一個嚴格的序列,這樣可以用來驅動一個協定解析器(通常是一個狀态機)。
當channel_number!=0則需要從ChannelManager中根據channel number找出相應的AMQChannel再調用handleFrame方法處理。
這裡的ChannelManager從何而來?
這裡就還是要到AMQChannel的start()方法來看,有這麼一句:_channelManager = instantiateChannelManager(channelMax, threadFactory);
ChannelManager構造方法中的ConsumerWorkService參數就是AMQConnection中start()方法第一行代碼初始化的ConsumerWorkService對象。
當channel number等于0的時候是調用AMQChannel,也可以說是AQMConnection的内部成員變量AMQChannel _channel0來處理。
<a href="http://blog.csdn.net/u013256816/article/details/70214929">[Conclusion]RabbitMQ-用戶端源碼之總結</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214708">[一]RabbitMQ-用戶端源碼之ConnectionFactory</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214730">[二]RabbitMQ-用戶端源碼之AMQConnection</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214754">[三]RabbitMQ-用戶端源碼之ChannelManager</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214776">[四]RabbitMQ-用戶端源碼之Frame</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214791">[五]RabbitMQ-用戶端源碼之AMQChannel</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214811">[六]RabbitMQ-用戶端源碼之AMQCommand</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214835">[七]RabbitMQ-用戶端源碼之AMQPImpl+Method</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214863">[八]RabbitMQ-用戶端源碼之ChannelN</a>
<a href="http://blog.csdn.net/u013256816/article/details/70214903">[九]RabbitMQ-用戶端源碼之Consumer</a>