在jetty中,使用connector來抽象jetty伺服器對某個端口的監聽。在connector啟動時,它會啟動acceptors個acceptor線程用于監聽在connector中配置的端口。對于用戶端的每次連接配接,connector都會建立相應的endpoint來表示該連接配接,一般在建立endpoint的同時會同時建立connection,這裡endpoint用于和socket打交道,而connection用于在從socket中讀取到資料後的處理邏輯以及生成響應資料的處理邏輯。
不同的connector會建立不同的endpoint和connection執行個體。如socketconnector建立connectorendpoint和httpconnection,sslsocketconnector建立sslconnectorendpoint和httpconnection,selectchannelconnector建立selectchannelendpoint和selectchannelhttpconnection,sslselectchannelconnector建立sslselectchannelendpoint和selectchannelhttpconnection,blockingchannelconnector建立blockingchannelendpoint和httpconnection等。
jetty中endpoint接口定義如下:
public interface endpoint {
// endpoint是對一次用戶端到伺服器連接配接的抽象,每一個新的連接配接都會建立一個新的endpoint,并且在這個endpoint中包含這次連接配接的socket。由于endpoint包含底層的連接配接socket,因而它主要用于處理從socket中讀取資料和向socket中寫入資料,即對應endpoint接口中的fill和flush方法。
// 從socket中讀取資料,并寫入buffer中直到資料讀取完成或putindex到buffer的capacity。傳回總共讀取的位元組數。在實作中,streamendpoint使用buffer直接從socket的inputstream中讀取資料,而channelendpoint則向channel讀取資料到buffer。
int fill(buffer buffer) throws ioexception;
// 将buffer中的資料(從getindex到putindex的資料)寫入到socket中,同時清除緩存(調用buffer的clear方法)。在實作中,streamendpoint使用buffer直接向socket的outputstream寫入資料,而chanelendpoint則将buffer中的資料寫入channel中。
int flush(buffer buffer) throws ioexception;
// 類似上面的flush,它會将傳入的header、buffer、trailer按順序寫入socket中(outputstream或者channel)。傳回總共寫入的位元組數。
int flush(buffer header, buffer buffer, buffer trailer) throws ioexception;
// 當在處理http/1.0請求時或目前request中的keepalive的值為false時,在處理完成目前請求後,需要調用shutdownoutput()方法,關閉目前連接配接;或在處理目前請求時出現比較嚴重的錯誤、socket逾時時。在調用完shutdownoutput()方法後,isoutputshutdown()方法傳回true。
void shutdownoutput() throws ioexception;
boolean isoutputshutdown();
// 當server無法從目前連接配接(socket)中讀取資料時(即read傳回-1)時,調用shutdowninput()方法以關閉目前連接配接,此時isinputshutdown()傳回true。
void shutdowninput() throws ioexception;
boolean isinputshutdown();
// 當socket逾時或在讀寫socket過程中出現任何io錯誤時,server會直接調用close()方法以關閉目前連接配接。
void close() throws ioexception;
// 目前connection是否已經打開,對channelendpoint來說表示channel.isopen()傳回true,對socketendpoint來說,表示socket沒有被關閉。
public boolean isopen();
// 對streamendpoint來說,它的讀寫是阻塞式的,但是對channelendpoint來說,如果它内部的channel是selectablechannel,那麼這個channel的讀寫可以配置成非阻塞的(通過selectablechannel.isblocking()方法判斷)。因而對selectchannelendpoint需要使用blockreadable()方法來阻塞直到逾時。傳回true表示阻塞讀取失敗,此時httpparser會關閉這個endpoint,并抛出異常。blockwritable()方法類似blockreadable()用于selectchannelendpoint以等待有資料寫入到channel中,如果傳回false,表示在指定的時間内沒有資料可寫入channel中(即逾時),此時會關閉該endpoint,并抛出異常。
public boolean isblocking();
public boolean blockreadable(long millisecs) throws ioexception;
public boolean blockwritable(long millisecs) throws ioexception;
// 對sslselectchannelendpoint,它是buffered,因而它的isbuffered()方法傳回true,而isbufferinginput()和isbufferingoutput()根據内部的_inniobuffer和_outniobuffer字段的hascontent()方法判斷是否傳回true或false。對其他類型的endpoint來說,這三個方法都傳回false。而flush()方法則将_outniobuffer中緩存的資料寫入channel中。
public boolean isbufferred();
public boolean isbufferinginput();
public boolean isbufferingoutput();
public void flush() throws ioexception;
// endpoint還定義了一些和endpoint相關鍊的資訊和狀态:
// 傳回該endpoint内部使用的傳輸工具,如channelendpoint内部使用channel,而socketendpoint内部使用socket。該方法用于對内部傳輸工具的配置。
public object gettransport();
// 用于配置socket的so_timeout的時間,即等待客戶連接配接的逾時時間。
public int getmaxidletime();
public void setmaxidletime(int timems) throws ioexception;
// 傳回目前endpoint所在伺服器的ip位址、主機名、端口号以及用戶端的ip位址、主機名、端口号。
public string getlocaladdr();
public string getlocalhost();
public int getlocalport();
public string getremoteaddr();
public string getremotehost();
public int getremoteport();
}
endpoint最主要的方法從底層傳輸鍊路中讀取資料并填入buffer中的fill方法,以及将buffer中的資料寫入底層傳輸鍊路的flush方法;讀資料對應input,寫資料對應output,可以單獨的關閉input或output,并提供方法判斷input或output是否已經被關閉;可以用close方法關閉endpoint,也可以通過isopen方法判斷是否這個endpoint是否已經被關閉;可以以阻塞的方式讀寫endpoint,并判斷目前endpoint是否處于阻塞狀态(主要用于selectchannelendpoint中);對sslselectchannelendpoint來說,它在讀寫時都可能内部緩存資料,因而endpoint中定義了一些方法用于判斷目前endpoint是否有輸入/輸出換成,以及使用flush将緩存中的資料寫入到底層鍊路中;對底層socket,endpoint還可以配置其最長的空閑時間;最後endpoint還提供一些方法用于擷取本地和遠端的位址、主機名、端口号,以及擷取底層傳輸類,如socket、channel等。
streamendpoint采用古老的stream方法從socket中讀寫資料,它包含inputstream和outputstream,分别表示讀寫資料流;它永遠是阻塞式讀寫,因而isblocking、blockreadable、blockwritable永遠傳回true;它也不會在内部緩存讀寫資料,因而isbufferinginput、isbufferingoutput、isbufferred永遠傳回false,而flush方法直接調用outputstream的flush方法;對fill實作,直接使用傳入的buffer從inputstream中讀取資料;對flush實作,直接将buffer中的資料寫入到outputstream中;close方法同時關閉inputstream和outputstream,并将成員變量置為null;對streamendpoint本身,沒有本地或遠端的位址、主機名、端口号資訊。
socketendpoint是streamendpoint的子類,它從socket中擷取inputstream和outputstream,以及本地和遠端的位址、主機名、端口号;而isinputshutdown、isoutputshutdown、shutdowninput、shutdownoutput等方法直接調用socket中相應的方法;gettransport直接傳回socket執行個體;setmaxidletime方法同時設定socket的so_timeout值;當空閑逾時,隻關閉input。
connectorendpoint繼承自socketendpoint,它是socketconnector的内部類,每一個用戶端的連接配接請求建立一個connectorendpoint執行個體,在建立connectorendpoint的同時,會在内部建立一個httpconnection執行個體;它還實作了connectedendpoint,因而可以從外部設定connection執行個體;在讀資料時,如果遇到eof,表示連接配接已經斷開,因而關閉目前endpoint;在關閉endpoint時,cancel目前connection中request執行個體的asynccontinuation。connectorendpoint還實作了runnable接口,在其run方法的實作中,它首先更新處理的connection的引用計數,然後儲存目前connection執行個體,在socketconnector已經啟動,并且connectorendpoint未被關閉的狀态下循環調用connection的handle方法,在每個循環開始前檢查目前connector是否處于low resources狀态(如線程池的可用線程已經不多),此時更新endpoint的maxidletime為目前connector的lowresourcesmaxidletime的值,以減少一些連接配接的空閑等待時間;對任何exception,關閉目前endpoint;最後更新connector中的一些統計資訊,将目前connection從connector的目前正在處理的connections集合中移除,如果此時socket還未關閉,讀取socket中的資料直到資料讀完或超過maxidletime,此時如果socket還未關閉,則關閉目前socket。而在connector建立connectorendpoint時,會調用其dispatch方法,将其自身仍給相應的線程池處理,以在某個時間在另一個線程中調用其run方法。
sslconnectorendpoint繼承自connectorendpoint,它在關閉input和output時會同時關閉整個endpoint,而在執行真正的處理邏輯前有一個handle shake的過程。
channelendpoint采用nio實作,從channel中讀寫資料。在建立channelendpoint時傳入bytechannel,如果傳入的bytechannel是socketchannel,則同時紀錄socket執行個體,以及擷取本地、遠端的位址資訊,并設定maxidletime值為so_timeout值。如果該bytechannel是selectablechannel類型(serversocketchannel、socketchannel、diagramchannel、sinkchannel、sourcechannel),并且其isblocking()方法傳回false,表示該channel是非阻塞式的讀寫,否則這個channel是阻塞式的讀寫,但是預設情況下,blockreadable、blockwritable直接傳回true,表示阻塞式的讀寫。對非sslselectchannelendpoint的endpoint不會在内部緩存資料,因而isbufferred、isbufferingoutput、isbufferinginput直接傳回false,而flush方法為空實作;對socketchannel,在設定maxidletime時,同時将該值設定到底層socket的so_timeout的值中;gettransport直接傳回底層channel執行個體;shutdowninput、shutdownoutput、isinputshutdown、isoutputshutdown使用socket實作;fill實作隻支援niobuffer,它使用channel将資料寫入内部的bytebuffer中;flush實作使用channel将bytebuffer中的資料寫入到channel中,或使用gatheringbytechannel将多個bytebuffer同時寫入到channel中。
blockingchannelendpoint類是blockingchannelconnector的内部類,它繼承自channelendpoint,并實作了connectedendpoint和runnable接口。在建立blcokingchannelendpoint時,同樣也會建立httpconnection執行個體;每次調用fill、flush方法時,都會更新_idletimestamp的值為目前時間戳(該值也會在每一次connection開始重新被處理時更新),在blockingchannelconnector啟動時會生成一個task,它沒400毫秒周遊一次所有正在處理的endpoint,如果發現有endpoint已經逾時(checkidletimestamp()方法,即空閑時間超過maxidletime),則調用其idleexpired()方法,将該endpoint關閉;blockingchannelconnector在接到一個連接配接後,先會設定socketchannel的blockingchannel為true,然後使用這個socketchannel建立一個blockingchannelendpoint,并調用其dispatch()方法,将它丢到一個線程池中,在blockingchannelendpoint的run方法實作中,首先更新一些統計資料,紀錄目前正在處理的endpoint;隻要目前endpoint還處于打開狀态,先更新_idletimestamp為目前時間戳,然後如果目前threadpool處于lowonthread狀态,将timeout時間更新為lowresourcesmaxidletime,而後調用connection的handle方法;對任何exception,直接關閉endpoint;在最後退出時,如果endpoint還未關閉,讀取endpoint的資料,直到逾時,并強制關閉endpoint。
selectchannelendpoint類在selectchannelconnector中被使用,它繼承自channelendpoint,并實作了connectedendpoint和asyncendpoint接口,selectchannelconnector采用nio中多路複用的機制,因而實作會比較複雜一些。在建立connector時,首先建立connectorselectormanager執行個體(_manager),在selectchannelconnector啟動時,設定_manager的selectsets(acceptors)、maxidletime、lowresourcesconnections、lowresourcesidletime,然後啟動_manager,并且啟動acceptors個線程,隻要selectchannelconnector處于running狀态,就不斷的調用_manager.doselect()方法。connectorselectormanager在啟動時會建立_selectsets個selectset;而doselect方法會調用根據傳入的索引号對應的selectset的doselect()方法。當用戶端的連接配接到來後,selectchannelconnector首先會配置socketchannel的configureblocking為false,然後将該socketchannel注冊到_manager中,在注冊過程中,根據目前的selectset索引值找到相應的selectset(之後索引自增),然後調用selectset的addchange(傳入socketchannel)和wakeup方法。因而這裡最重要的就是selectset的實作,它是selectormanager中的一個内部類。
selectset類的實作中,它内部有一個selector,一個concurrentlinkedqueue的changes隊列,以及selectchannelendpoint到selectset的集合(它用于調用selectchannelendpoint中的checkidletimestamp()方法以檢查并關閉處于idle timeout的selectchannelendpoint)。selectset使用addchange()方法添加需要改變狀态的對象,這些對象有endpoint、channelandattachment、socketchannel、runnable。在doselect()方法中,首先檢查changes隊列中是否有對象,如果有selectchannelendpoint對象,則調用其doupdatekey()方法;如果是socketchannel對象,則注冊op_read操作到selector中,建立新的selectchannelendpoint,attach新建立的selectchannelendpoint到selectionkey中,調用selectchannelendpoint的schedule()方法。對channelandattachment對象,如果其channel是socketchannel,并且處于connected狀态,則類似對socketchannel對象的處理,否則,注冊op_connect操作到selector;如果是runnable對象,則dispatch該runnable對象。然後調用selector的selectnow()方法,如果沒有任何可用的事件,則計算出等待時間,然後帶等待時間的調用selector的select()方法;周遊所有selected keys,對invalid的selectionkey,直接調用其attach的selectchannelendpoint的doupdatekey()方法,否則對類型是selectchannelendpoint的attachment調用其schedule()方法,對connectable的selectionkey建立新的selectchannelendpoint并調用schedule()方法,否則建立新的selectchannelendpoint并對readable的selectionkey調用其schedule()方法。
selectchannelendpoint采用nio的非阻塞讀寫方式,而nio基于channel的非阻塞操作是基于注冊的操作集(op_read, op_write, o_connect, op_accept)以從selector中選出已經可用的selectionkey(包含對應的channel、interestops、readable、writable、attachment等),之後可以使用對應的channel以及根據selectionkey中對應的已經可用的操作執行相應的操作(如讀寫),因而selectchannelendpoint的其中一個任務是要實時的更新目前它感興趣的操作集,并重新像selector中注冊。 selectchannelendpoint使用updatekey()方法跟新感興趣操作集合,并且它隻關注op_read和op_write操作,在實作時,op_read隻需要在socket的輸入沒有關閉,且還沒有dispatch或目前處于readblocked狀态下才需要關注;op_write隻需要在socket的輸出沒有關閉,且writable為false(當需要向channel中寫資料,但是還沒有寫完的情況下)或目前處于writeblocked狀态下才需要關注;如果和目前已注冊的操作集相同,則不需要重新注冊,否者将自身通過selectset的addchange()方法添加到selectset中,在selectset的doselect()方法中會最終調用selectchannelendpoint中的doupdatekey()方法,該方法的實作:1. 當channel處于open狀态,存在感興趣的操作,selectionkey為null或invalid,如果channel已經注冊了,重新調用updatekey()方法(感覺這裡一般不會被調用到,如果被調用到了,則可能出現死循環),否則将channel重新向selector中重新注冊interestops的操作集(如果出錯,則canncel selectionkey,并且從selectset中銷毀目前endpoint)。2. 當channel處于open狀态,存在感興趣的操作,selectionkey存在且valid,則直接使用interestops更新selectionkey的感興趣集(調用selectionkey的interestops()方法)。3. 當channel處于open狀态,不存在感興趣的操作,清空selectionkey的interestops,或清理selectionkey引用。4. 如果channel處于關閉狀态,則canncel selectionkey,并從selectset中銷毀目前endpoint。
對阻塞讀寫(readblocked、writeblocked),在blockreadable()、blockwritable()方法中,會設定readblocked、writeblocked為true,調用updatekey()方法,然後計算等待時間并進入等待(調用wait方法),如果因為逾時而退出等待,則傳回false,否則傳回true(在傳回時設定readblocked、writeblocked為false);當調用selectchannelendpoint的schedule()方法時,它會更新readblocked、writeblocked、interestops的值(同時使用該值更新selectionkey中的狀态),并調用notifyall()方法喚醒blockreadable()、blockwritable()方法:1. 如果selectionkey為null或invalid,readblocked、writeblocked設定為false,調用notifyall(),并傳回;2. 如果readblocked或writeblocked為true,使用selectionkey的readable、writable更新readblocked和writeblocked的值,調用notifyall(),如果已經dispatched,清除所有interestops,并傳回;3. 如果還沒有dispatched,直接清除所有interestops,并傳回;4. 如果注冊了op_write,并且已經可寫,則清除op_write操作,設定writable為true;5. 如果還沒有dispatched,則調用dispatch()方法。在dispatch()方法中,它設定dispatched為true,并将handler扔給threadpool(在handler調用connection的handle()方法,由于selectchannelendpoint的生命周期是在selectmanager維護,并且dispatch()方法可能被多次調用,因而沒有在handler的handle()方法中判斷endpoint的close狀态,并循環的調用connection的handle()方法,而是在每次handle()方法結束後退出目前線程,在下次schedule()時會使用重新将handler扔給threadpool以支援asynccontinuation的實作,并且asyncendpoint接口的定義也是用于asynccontinuation的實作,這個将在以後的部落格中詳述)。在flush()方法中,如果沒有任何資料能寫入channel時,設定writable為false(進而在updatekey()方法中能将op_write注冊到selectionkey的interestops中),并在沒有dispatch的情況下調用updatekey。最後清理selector中的selectedkeys,expire所有timeout中注冊的task(使用scheduletimeout()方法注冊),依次調用檢查endpoints中是否已經timeout。
在selectchannelendpoint的建構中,它使用socketchannel、selectset、selectionkey建構,内部從selectset中擷取selectormanager,并使用selectormanager 建立connection執行個體,初始化_dispatched、_redispatched為false,_open、_writable為true,_readblocked、_writeblocked為false,_interestops為0,最後更新_idletimestamp為目前時間。當一個用戶端連接配接到來後,selectchannelconnector會向selectormanager(selectset)中注冊一個socketchannel,當背景線程調用selectset中的doselect()方法時,它使用該socketchannel,向該selectset中的selector注冊op_read得到一個selectionkey,并使用這個socketchannel、目前selectset、以及這個selectionkey建立一個selectchannelendpoint,而後調用selectchannelendpoint的schedule()方法。
sslselectchannelendpoint類采用buffer的形式先将資料讀寫到内部緩存中,然後使用sslengine來wrap或unwrap(encode/decode)資料。這裡不再詳述。