天天看点

深入Jetty源码之EndPoint

在jetty中,使用connector来抽象jetty服务器对某个端口的监听。在connector启动时,它会启动acceptors个acceptor线程用于监听在connector中配置的端口。对于客户端的每次连接,connector都会创建相应的endpoint来表示该连接,一般在创建endpoint的同时会同时创建connection,这里endpoint用于和socket打交道,而connection用于在从socket中读取到数据后的处理逻辑以及生成响应数据的处理逻辑。

不同的connector会创建不同的endpoint和connection实例。如socketconnector创建connectorendpoint和httpconnection,sslsocketconnector创建sslconnectorendpoint和httpconnection,selectchannelconnector创建selectchannelendpoint和selectchannelhttpconnection,sslselectchannelconnector创建sslselectchannelendpoint和selectchannelhttpconnection,blockingchannelconnector创建blockingchannelendpoint和httpconnection等。

jetty中endpoint接口定义如下: 

public interface endpoint {

    // endpoint是对一次客户端到服务器连接的抽象,每一个新的连接都会创建一个新的endpoint,并且在这个endpoint中包含这次连接的socket。由于endpoint包含底层的连接socket,因而它主要用于处理从socket中读取数据和向socket中写入数据,即对应endpoint接口中的fill和flush方法。

    // 从socket中读取数据,并写入buffer中直到数据读取完成或putindex到buffer的capacity。返回总共读取的字节数。在实现中,streamendpoint使用buffer直接从socket的inputstream中读取数据,而channelendpoint则向channel读取数据到buffer。

    int fill(buffer buffer) throws ioexception;

    // 将buffer中的数据(从getindex到putindex的数据)写入到socket中,同时清除缓存(调用buffer的clear方法)。在实现中,streamendpoint使用buffer直接向socket的outputstream写入数据,而chanelendpoint则将buffer中的数据写入channel中。

    int flush(buffer buffer) throws ioexception;

    // 类似上面的flush,它会将传入的header、buffer、trailer按顺序写入socket中(outputstream或者channel)。返回总共写入的字节数。

    int flush(buffer header, buffer buffer, buffer trailer) throws ioexception;

    // 当在处理http/1.0请求时或当前request中的keepalive的值为false时,在处理完成当前请求后,需要调用shutdownoutput()方法,关闭当前连接;或在处理当前请求时出现比较严重的错误、socket超时时。在调用完shutdownoutput()方法后,isoutputshutdown()方法返回true。

    void shutdownoutput() throws ioexception;

    boolean isoutputshutdown();

    // 当server无法从当前连接(socket)中读取数据时(即read返回-1)时,调用shutdowninput()方法以关闭当前连接,此时isinputshutdown()返回true。

    void shutdowninput() throws ioexception;

    boolean isinputshutdown();

    // 当socket超时或在读写socket过程中出现任何io错误时,server会直接调用close()方法以关闭当前连接。

    void close() throws ioexception;

    // 当前connection是否已经打开,对channelendpoint来说表示channel.isopen()返回true,对socketendpoint来说,表示socket没有被关闭。

    public boolean isopen();

    // 对streamendpoint来说,它的读写是阻塞式的,但是对channelendpoint来说,如果它内部的channel是selectablechannel,那么这个channel的读写可以配置成非阻塞的(通过selectablechannel.isblocking()方法判断)。因而对selectchannelendpoint需要使用blockreadable()方法来阻塞直到超时。返回true表示阻塞读取失败,此时httpparser会关闭这个endpoint,并抛出异常。blockwritable()方法类似blockreadable()用于selectchannelendpoint以等待有数据写入到channel中,如果返回false,表示在指定的时间内没有数据可写入channel中(即超时),此时会关闭该endpoint,并抛出异常。

    public boolean isblocking();

    public boolean blockreadable(long millisecs) throws ioexception;

    public boolean blockwritable(long millisecs) throws ioexception;

    // 对sslselectchannelendpoint,它是buffered,因而它的isbuffered()方法返回true,而isbufferinginput()和isbufferingoutput()根据内部的_inniobuffer和_outniobuffer字段的hascontent()方法判断是否返回true或false。对其他类型的endpoint来说,这三个方法都返回false。而flush()方法则将_outniobuffer中缓存的数据写入channel中。

    public boolean isbufferred();

    public boolean isbufferinginput();

    public boolean isbufferingoutput();

    public void flush() throws ioexception;

    // endpoint还定义了一些和endpoint相关链的信息和状态:

    // 返回该endpoint内部使用的传输工具,如channelendpoint内部使用channel,而socketendpoint内部使用socket。该方法用于对内部传输工具的配置。

    public object gettransport();

    // 用于配置socket的so_timeout的时间,即等待客户连接的超时时间。

    public int getmaxidletime();

    public void setmaxidletime(int timems) throws ioexception;

    // 返回当前endpoint所在服务器的ip地址、主机名、端口号以及客户端的ip地址、主机名、端口号。

    public string getlocaladdr();

    public string getlocalhost();

    public int getlocalport();

    public string getremoteaddr();

    public string getremotehost();

    public int getremoteport();

}

深入Jetty源码之EndPoint

endpoint最主要的方法从底层传输链路中读取数据并填入buffer中的fill方法,以及将buffer中的数据写入底层传输链路的flush方法;读数据对应input,写数据对应output,可以单独的关闭input或output,并提供方法判断input或output是否已经被关闭;可以用close方法关闭endpoint,也可以通过isopen方法判断是否这个endpoint是否已经被关闭;可以以阻塞的方式读写endpoint,并判断当前endpoint是否处于阻塞状态(主要用于selectchannelendpoint中);对sslselectchannelendpoint来说,它在读写时都可能内部缓存数据,因而endpoint中定义了一些方法用于判断当前endpoint是否有输入/输出换成,以及使用flush将缓存中的数据写入到底层链路中;对底层socket,endpoint还可以配置其最长的空闲时间;最后endpoint还提供一些方法用于获取本地和远程的地址、主机名、端口号,以及获取底层传输类,如socket、channel等。

streamendpoint采用古老的stream方法从socket中读写数据,它包含inputstream和outputstream,分别表示读写数据流;它永远是阻塞式读写,因而isblocking、blockreadable、blockwritable永远返回true;它也不会在内部缓存读写数据,因而isbufferinginput、isbufferingoutput、isbufferred永远返回false,而flush方法直接调用outputstream的flush方法;对fill实现,直接使用传入的buffer从inputstream中读取数据;对flush实现,直接将buffer中的数据写入到outputstream中;close方法同时关闭inputstream和outputstream,并将成员变量置为null;对streamendpoint本身,没有本地或远程的地址、主机名、端口号信息。

socketendpoint是streamendpoint的子类,它从socket中获取inputstream和outputstream,以及本地和远程的地址、主机名、端口号;而isinputshutdown、isoutputshutdown、shutdowninput、shutdownoutput等方法直接调用socket中相应的方法;gettransport直接返回socket实例;setmaxidletime方法同时设置socket的so_timeout值;当空闲超时,只关闭input。 

connectorendpoint继承自socketendpoint,它是socketconnector的内部类,每一个客户端的连接请求创建一个connectorendpoint实例,在创建connectorendpoint的同时,会在内部创建一个httpconnection实例;它还实现了connectedendpoint,因而可以从外部设置connection实例;在读数据时,如果遇到eof,表示连接已经断开,因而关闭当前endpoint;在关闭endpoint时,cancel当前connection中request实例的asynccontinuation。connectorendpoint还实现了runnable接口,在其run方法的实现中,它首先更新处理的connection的引用计数,然后保存当前connection实例,在socketconnector已经启动,并且connectorendpoint未被关闭的状态下循环调用connection的handle方法,在每个循环开始前检查当前connector是否处于low resources状态(如线程池的可用线程已经不多),此时更新endpoint的maxidletime为当前connector的lowresourcesmaxidletime的值,以减少一些连接的空闲等待时间;对任何exception,关闭当前endpoint;最后更新connector中的一些统计信息,将当前connection从connector的当前正在处理的connections集合中移除,如果此时socket还未关闭,读取socket中的数据直到数据读完或超过maxidletime,此时如果socket还未关闭,则关闭当前socket。而在connector创建connectorendpoint时,会调用其dispatch方法,将其自身仍给相应的线程池处理,以在某个时间在另一个线程中调用其run方法。

sslconnectorendpoint继承自connectorendpoint,它在关闭input和output时会同时关闭整个endpoint,而在执行真正的处理逻辑前有一个handle shake的过程。

channelendpoint采用nio实现,从channel中读写数据。在创建channelendpoint时传入bytechannel,如果传入的bytechannel是socketchannel,则同时纪录socket实例,以及获取本地、远程的地址信息,并设置maxidletime值为so_timeout值。如果该bytechannel是selectablechannel类型(serversocketchannel、socketchannel、diagramchannel、sinkchannel、sourcechannel),并且其isblocking()方法返回false,表示该channel是非阻塞式的读写,否则这个channel是阻塞式的读写,但是默认情况下,blockreadable、blockwritable直接返回true,表示阻塞式的读写。对非sslselectchannelendpoint的endpoint不会在内部缓存数据,因而isbufferred、isbufferingoutput、isbufferinginput直接返回false,而flush方法为空实现;对socketchannel,在设置maxidletime时,同时将该值设置到底层socket的so_timeout的值中;gettransport直接返回底层channel实例;shutdowninput、shutdownoutput、isinputshutdown、isoutputshutdown使用socket实现;fill实现只支持niobuffer,它使用channel将数据写入内部的bytebuffer中;flush实现使用channel将bytebuffer中的数据写入到channel中,或使用gatheringbytechannel将多个bytebuffer同时写入到channel中。

blockingchannelendpoint类是blockingchannelconnector的内部类,它继承自channelendpoint,并实现了connectedendpoint和runnable接口。在创建blcokingchannelendpoint时,同样也会创建httpconnection实例;每次调用fill、flush方法时,都会更新_idletimestamp的值为当前时间戳(该值也会在每一次connection开始重新被处理时更新),在blockingchannelconnector启动时会生成一个task,它没400毫秒遍历一次所有正在处理的endpoint,如果发现有endpoint已经超时(checkidletimestamp()方法,即空闲时间超过maxidletime),则调用其idleexpired()方法,将该endpoint关闭;blockingchannelconnector在接到一个连接后,先会设置socketchannel的blockingchannel为true,然后使用这个socketchannel创建一个blockingchannelendpoint,并调用其dispatch()方法,将它丢到一个线程池中,在blockingchannelendpoint的run方法实现中,首先更新一些统计数据,纪录当前正在处理的endpoint;只要当前endpoint还处于打开状态,先更新_idletimestamp为当前时间戳,然后如果当前threadpool处于lowonthread状态,将timeout时间更新为lowresourcesmaxidletime,而后调用connection的handle方法;对任何exception,直接关闭endpoint;在最后退出时,如果endpoint还未关闭,读取endpoint的数据,直到超时,并强制关闭endpoint。

selectchannelendpoint类在selectchannelconnector中被使用,它继承自channelendpoint,并实现了connectedendpoint和asyncendpoint接口,selectchannelconnector采用nio中多路复用的机制,因而实现会比较复杂一些。在创建connector时,首先创建connectorselectormanager实例(_manager),在selectchannelconnector启动时,设置_manager的selectsets(acceptors)、maxidletime、lowresourcesconnections、lowresourcesidletime,然后启动_manager,并且启动acceptors个线程,只要selectchannelconnector处于running状态,就不断的调用_manager.doselect()方法。connectorselectormanager在启动时会创建_selectsets个selectset;而doselect方法会调用根据传入的索引号对应的selectset的doselect()方法。当客户端的连接到来后,selectchannelconnector首先会配置socketchannel的configureblocking为false,然后将该socketchannel注册到_manager中,在注册过程中,根据当前的selectset索引值找到相应的selectset(之后索引自增),然后调用selectset的addchange(传入socketchannel)和wakeup方法。因而这里最重要的就是selectset的实现,它是selectormanager中的一个内部类。

selectset类的实现中,它内部有一个selector,一个concurrentlinkedqueue的changes队列,以及selectchannelendpoint到selectset的集合(它用于调用selectchannelendpoint中的checkidletimestamp()方法以检查并关闭处于idle timeout的selectchannelendpoint)。selectset使用addchange()方法添加需要改变状态的对象,这些对象有endpoint、channelandattachment、socketchannel、runnable。在doselect()方法中,首先检查changes队列中是否有对象,如果有selectchannelendpoint对象,则调用其doupdatekey()方法;如果是socketchannel对象,则注册op_read操作到selector中,创建新的selectchannelendpoint,attach新创建的selectchannelendpoint到selectionkey中,调用selectchannelendpoint的schedule()方法。对channelandattachment对象,如果其channel是socketchannel,并且处于connected状态,则类似对socketchannel对象的处理,否则,注册op_connect操作到selector;如果是runnable对象,则dispatch该runnable对象。然后调用selector的selectnow()方法,如果没有任何可用的事件,则计算出等待时间,然后带等待时间的调用selector的select()方法;遍历所有selected keys,对invalid的selectionkey,直接调用其attach的selectchannelendpoint的doupdatekey()方法,否则对类型是selectchannelendpoint的attachment调用其schedule()方法,对connectable的selectionkey创建新的selectchannelendpoint并调用schedule()方法,否则创建新的selectchannelendpoint并对readable的selectionkey调用其schedule()方法。

selectchannelendpoint采用nio的非阻塞读写方式,而nio基于channel的非阻塞操作是基于注册的操作集(op_read, op_write, o_connect, op_accept)以从selector中选出已经可用的selectionkey(包含对应的channel、interestops、readable、writable、attachment等),之后可以使用对应的channel以及根据selectionkey中对应的已经可用的操作执行相应的操作(如读写),因而selectchannelendpoint的其中一个任务是要实时的更新当前它感兴趣的操作集,并重新像selector中注册。 selectchannelendpoint使用updatekey()方法跟新感兴趣操作集合,并且它只关注op_read和op_write操作,在实现时,op_read只需要在socket的输入没有关闭,且还没有dispatch或当前处于readblocked状态下才需要关注;op_write只需要在socket的输出没有关闭,且writable为false(当需要向channel中写数据,但是还没有写完的情况下)或当前处于writeblocked状态下才需要关注;如果和当前已注册的操作集相同,则不需要重新注册,否者将自身通过selectset的addchange()方法添加到selectset中,在selectset的doselect()方法中会最终调用selectchannelendpoint中的doupdatekey()方法,该方法的实现:1. 当channel处于open状态,存在感兴趣的操作,selectionkey为null或invalid,如果channel已经注册了,重新调用updatekey()方法(感觉这里一般不会被调用到,如果被调用到了,则可能出现死循环),否则将channel重新向selector中重新注册interestops的操作集(如果出错,则canncel selectionkey,并且从selectset中销毁当前endpoint)。2. 当channel处于open状态,存在感兴趣的操作,selectionkey存在且valid,则直接使用interestops更新selectionkey的感兴趣集(调用selectionkey的interestops()方法)。3. 当channel处于open状态,不存在感兴趣的操作,清空selectionkey的interestops,或清理selectionkey引用。4. 如果channel处于关闭状态,则canncel selectionkey,并从selectset中销毁当前endpoint。

对阻塞读写(readblocked、writeblocked),在blockreadable()、blockwritable()方法中,会设置readblocked、writeblocked为true,调用updatekey()方法,然后计算等待时间并进入等待(调用wait方法),如果因为超时而退出等待,则返回false,否则返回true(在返回时设置readblocked、writeblocked为false);当调用selectchannelendpoint的schedule()方法时,它会更新readblocked、writeblocked、interestops的值(同时使用该值更新selectionkey中的状态),并调用notifyall()方法唤醒blockreadable()、blockwritable()方法:1. 如果selectionkey为null或invalid,readblocked、writeblocked设置为false,调用notifyall(),并返回;2. 如果readblocked或writeblocked为true,使用selectionkey的readable、writable更新readblocked和writeblocked的值,调用notifyall(),如果已经dispatched,清除所有interestops,并返回;3. 如果还没有dispatched,直接清除所有interestops,并返回;4. 如果注册了op_write,并且已经可写,则清除op_write操作,设置writable为true;5. 如果还没有dispatched,则调用dispatch()方法。在dispatch()方法中,它设置dispatched为true,并将handler扔给threadpool(在handler调用connection的handle()方法,由于selectchannelendpoint的生命周期是在selectmanager维护,并且dispatch()方法可能被多次调用,因而没有在handler的handle()方法中判断endpoint的close状态,并循环的调用connection的handle()方法,而是在每次handle()方法结束后退出当前线程,在下次schedule()时会使用重新将handler扔给threadpool以支持asynccontinuation的实现,并且asyncendpoint接口的定义也是用于asynccontinuation的实现,这个将在以后的博客中详述)。在flush()方法中,如果没有任何数据能写入channel时,设置writable为false(从而在updatekey()方法中能将op_write注册到selectionkey的interestops中),并在没有dispatch的情况下调用updatekey。最后清理selector中的selectedkeys,expire所有timeout中注册的task(使用scheduletimeout()方法注册),依次调用检查endpoints中是否已经timeout。

在selectchannelendpoint的构建中,它使用socketchannel、selectset、selectionkey构建,内部从selectset中获取selectormanager,并使用selectormanager 创建connection实例,初始化_dispatched、_redispatched为false,_open、_writable为true,_readblocked、_writeblocked为false,_interestops为0,最后更新_idletimestamp为当前时间。当一个客户端连接到来后,selectchannelconnector会向selectormanager(selectset)中注册一个socketchannel,当后台线程调用selectset中的doselect()方法时,它使用该socketchannel,向该selectset中的selector注册op_read得到一个selectionkey,并使用这个socketchannel、当前selectset、以及这个selectionkey创建一个selectchannelendpoint,而后调用selectchannelendpoint的schedule()方法。

sslselectchannelendpoint类采用buffer的形式先将数据读写到内部缓存中,然后使用sslengine来wrap或unwrap(encode/decode)数据。这里不再详述。