天天看点

tars服务端(三):收发包管理

前言

网络编程中,收发包是一个比较重要的过程。这篇文章主要分析记录tars的收发包步骤(TCP链接,tars协议为例),以及整个过程中对内存的使用。

一.收包

1.收包逻辑

收包的逻辑主要在Connection::recv(recv_queue::queue_type &o)中实现:

tars服务端(三):收发包管理
  • 整个收包过程中发生了三次copy:
  1. 第一次copy:调用read把内核空间的数据copy到栈上的临时空间。注意到,这里的一次read调用最大为32k(buffer的长度),如果都是大包,是否能改大呢?从而节省read调用次数。
  2. 第二次copy:把数据append到Connection的缓存中。这里为什么需要Connection的缓存呢?因为TCP是无边界流协议,收到的数据可能还不完整或者收到的数据包含多个包,为了处理这些情况,需要先把数据缓存下来,在适当时机再进行拆包,所以这次copy是省不了的。
  3. 第三次copy发生在拆包的时候。具体的过程如下:当把socket里的数据read完(返回EAGAIN)或者Connection缓存中的数据已经超过了一定大小,则进行拆包(拆包过程中用了substr,看注释说,字符串太长时substr性能会急剧下降,所以超过8192字节就开始拆包)。如果是tars协议,拆包是根据包头的长度区分包的边界的:
tars服务端(三):收发包管理

         包头固定长度4字节,存储整个包的长度。包长度=包头(4字节)+包体长度  。

  • 在拆完包后,把一个完整的包封装成tagRecvData,主要是加了一些上下文信息。在这个过程中,用了std::move避免了一次copy。
    struct tagRecvData                                                                                                         
        {                                                                                                                          
            uint32_t        uid;            /**连接标示*/                                                                          
            string          buffer;         /**需要发送的内容*/                                                                    
            string          ip;             /**远程连接的ip*/                                                                      
            uint16_t        port;           /**远程连接的端口*/                                                                    
            int64_t         recvTimeStamp;  /**接收到数据的时间*/                                                                  
            bool            isOverload;     /**是否已过载 */                                                                       
            bool            isClosed;       /**是否已关闭*/                                                                        
            int                fd;                /*保存产生该消息的fd,用于回包时选择网络线程*/                                   
            BindAdapterPtr  adapter;        /**标识哪一个adapter的消息*/                                                           
            int             closeType;     /*如果是关闭消息包,则标识关闭类型,0:表示客户端主动关闭;1:服务端主动关闭;2:连接超时服务端主动关闭*/                                                                                                                   
        };                     
               
  • 最后,把每个tagRecvData包push到队列中。不管是临时队列,还是Adapter的接收队列,push的都是tagRecvData的指针,所以不会发生copy。至于为什么需要临时队列,我认为是为了节省加锁次数。原因如下:Adapter的接收队列为了保证线程安全,每次push_back都需要加锁,如果先把tagRecvData存到临时队列中,等该次read完(返回EAGAIN)或者积累了一定数量的tagRecvData后再统一push到Adapter的接收队列中,那么就只需要加一次锁。代码如下:
    template<typename T, typename D> void TC_ThreadQueue<T, D>::push_back(const queue_type &qt)
    {                                                                                                                              
        Lock lock(*this);
                                                                                                                                   
        typename queue_type::const_iterator it = qt.begin();                                                                       
        typename queue_type::const_iterator itEnd = qt.end();
        while(it != itEnd)                                                                                                         
        {                                                                                                                          
            _queue.push_back(*it);                                                                                                 
            ++it;                                                                                                                  
            ++_size;                                                                                                               
            notify();
        }                                                                                                                          
    }
               

2.内存占用

接下来,我们从内存占用的角度来看一下收包过程。上面的收包过程中用到了栈上空间(灰色部分),Connection缓存(黄色部分),临时队列(蓝色部分),这三部分的生命周期如下:

tars服务端(三):收发包管理
  • 灰色部分:栈上空间比较简单,是固定大小的,而且随着每次read结束很快就释放了
  • 蓝色部分:临时队列,生命周期是在每次EPOLLIN事件的开始到结束。需要注意的是,临时队列结束并不意味着之前存在该队列的tagRecvData包占用的内存空间释放了。tagRecvData包被push到Adapter的接收队列里,然后由业务线程消费掉这些包,才会释放空间。所以,应当尽早把包push到Adapter的接收队列里,这些包占用的空间才能越快被释放。当临时队列里的包数量大于_iMaxTemQueueSize时,就会主动把包push到Adapter的接收队列了,而不用等到EPOLLIN事件结束后:
    if((int) o.size() > _iMaxTemQueueSize)                                                                         
    {                                                                                                              
        insertRecvQueue(o);                                                                                        
        o.clear();                                                                                                 
    }           
               
    _iMaxTemQueueSize在Connection的构造函数里面被赋值为100:
    TC_EpollServer::NetThread::Connection::Connection(BindAdapter *pBindAdapter)                                                   
    : _pBindAdapter(pBindAdapter)                                                                                                  
    ............                                                                                                               
    , _iMaxTemQueueSize(100)                                                                                                       
    ............
    
    {
        ...........
    }
               
  • 黄色部分:Connection的接收缓存_recvbuffer。_recvbuffer在Connection的整个生命周期里是一直存在的。_recvbuffer用std::string,避免了需要预先分配空间;当开始拆包的时候,_recvbuffer的数据被copy到tagRecvData里。拆包的时机有两种:(1)在read事件结束后;(2)_recvbuffer的大小超过8192:
    //字符串太长时substr性能会急剧下降                                                                                 
    if(_recvbuffer.length() > 8192)                                                                                    
    {                                                                                                                  
        parseProtocol(o);                                                                                              
    }              
    
    ​
               
    在read事件结束后,会有两种情况:
  1. _recvbuffer的大小为0。即收到的数据都是完整的包,已经全包拆包了。
  2. _recvbuffer的大小不为0。只收到了一个包的一部分,而另一部分还未到达,那么这部分数据就会留在_recvbuffer里,等待下次read数据把包收完整。如果这部分数据比较大,而且并发量上来,同时存在很多Connection占用了大量内存,那可能就会有问题了。Adapter有个配置maxconns可以控制最大链接数,缺省配置是1024,而通过默认模板生成的配置是100000:
DEFAULT_MAX_CONN        = 1024,       /**缺省最大连接数*/


<TestApp.HelloServer.HelloObjAdapter>                      
        ...........                                                                    
        maxconns=100000
        ...........                                                                                                                                                                                                                      
</TestApp.HelloServer.HelloObjAdapter>   
           

二.发包

1.发包逻辑

首先看一下最基本的socket发送::send()。主要代码在Connection::send(const string& buffer, const string &ip, uint16_t port, bool byEpollOut)中。在::send的时候会有两种情况:

tars服务端(三):收发包管理
  • 第一种情况,全部数据成功,整个过程只发生了一次copy
  • 第二种情况,未发送部分的数据会被copy到Connection的发送缓存里。

缓存里面的数据会在触发EPOLLOUT事件的时候发送出去。所以,完整的发包其实是有2个入口的:

tars服务端(三):收发包管理
  1. 第一个入口就是我们刚讲的从发送队列取出包进行发送的过程,即ET_NOTIFY事件类型,入口函数是NetThread::processPipe()
  2. 第二个入口就是处理发送缓存里面的数据的过程,即ET_NET事件类型,而且epoll事件是EPOLLOUT,入口函数是NetThread::processNet()
  3. 注意到在第一个过程中,发包用的是tcpSend(),最终调用的是普通的io接口::send();而在发送缓存区中的数据时用的是tcpWriteV(),最终调用的是分散/聚集io接口::writev()。

2.发送缓存_sendbuffer

_sendbuffer是一个std::vector,每个元素TC_Slice是一块独立的内存:

struct TC_Slice
{
    explicit TC_Slice(void* d = NULL , size_t ds = 0, size_t l = 0);
    void* data;                                                                                                                
    size_t dataLen;      //实际存储数据长度                                                                                                      
    size_t len;          //内存长度                                                                                                               
                                                                                                                               
};        
std::vector<TC_Slice>  _sendbuffer;
           

在一次tcpSend发送后,剩余部分会分配一个新的TC_Slice来存储,然后push到_sendbuffer中。

int bytes = this->tcpSend(buffer.data(), buffer.size());
            if (bytes == -1)
            { 
                _pBindAdapter->getEpollServer()->debug("send [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection by peer.");
                return -1;
            } 
            else if (bytes < static_cast<int>(buffer.size()))
            { 
                const char* remainData = &buffer[bytes];
                const size_t remainLen = buffer.size() - static_cast<size_t>(bytes);

                TC_BufferPool* pool = _pBindAdapter->getEpollServer()->getNetThreadOfFd(_sock.getfd())->_bufferPool;
                // avoid too big chunk
                for (size_t chunk = 0; chunk * kChunkSize < remainLen; chunk ++)
                {
                    size_t needs = std::min<size_t>(kChunkSize, remainLen - chunk * kChunkSize);

                    TC_Slice slice = pool->Allocate(needs);
                    ::memcpy(slice.data, remainData + chunk * kChunkSize, needs);
                    slice.dataLen = needs;

                    _sendbuffer.push_back(slice);
                }
                // end
                _pBindAdapter->getEpollServer()->info("EAGAIN[" + _ip + ":" + TC_Common::tostr(_port) +
                        ", to sent bytes " + TC_Common::tostr(remainLen) +
                        ", total sent " + TC_Common::tostr(buffer.size()));
            } 
           

_sendbuffer将未发送的包分割成多个独立的部分,而且这种独立的内存块适合用分散/聚集io来发送。在新的socket可写事件到来时,可以直接用::writev()来发送缓存起来的包,这部分代码在Connection::send(const std::vector<TC_Slice>& slices)中。

tars服务端(三):收发包管理

每个网络线程都有自己的内存池TC_BufferPool,用来分配TC_Slice所需的内存。TC_BufferPool的结构如下:

tars服务端(三):收发包管理

有了内存池,可以避免频繁的向系统申请释放内存。这个内存池分配的内存块都是2的n次方,如果申请的内存不是2的n次方,则向上取到2的n次方,例如申请30字节,实际上会申请32字节。所以,TC_Slice才需要一个len成员存内存大小,一个datalen存实际数据大小。内存池分配的最小内存块,最大内存块,占用内存上限都是可以配置的,具体的配置项如下:

size_t minBlockSize = TC_Common::strto<size_t>(toDefault(_conf.get("/tars/application/server<poolminblocksize>"), "1024")); // 1KB                                                                                                                    
size_t maxBlockSize = TC_Common::strto<size_t>(toDefault(_conf.get("/tars/application/server<poolmaxblocksize>"), "8388608")); // 8MB                                                                                                                 
size_t maxBytes = TC_Common::strto<size_t>(toDefault(_conf.get("/tars/application/server<poolmaxbytes>"), "67108864")); // 64MB                                                                                                                       
epollServer->setNetThreadBufferPoolInfo(minBlockSize, maxBlockSize, maxBytes); 
           

3.内存占用

与发包相关的内存占用主要有2方面:

  1. 内存池占用的内存。内存池里的内存块是暂时还没被分配出去给发送缓存的,属于已经从系统分配了但是还没被利用的内存。所以,如果内存池占用的内存长时间保持比较高的占用,是有问题的。可以通过配置内存池占用上限控制大小。另外,因为是每个网络线程都有自己的一个内存池,所以也要考虑网络线程数这个因素。
  2. 每个Connection的发送缓存_sendbuffer占用的内存。严格上来讲,_sendbuffer的占用上限更应该控制好,因为并发越高,就会有越多的_sendbuffer。tars对这里的处理方式是通过配置项<BackPacketBuffLimit>来控制的:

         (1)如果BackPacketBuffLimit不配置或者配置为0,则不限制_sendbuffer的大小;

         (2)如果BackPacketBufferLimit配置为非0,在_sendbuffer占用的内存大于配置的值时,服务端就会主动关闭这个链接:

size_t toSendBytes = 0;                                                                                                    
    for (const auto& slice : _sendbuffer)                                                                                      
    {                                                                                                                          
        toSendBytes += slice.dataLen;                                                                                          
    }                                                                                                                          
                                                                                                                               
    if (toSendBytes >= 8 * 1024)                                                                                               
    {                                                                                                                          
        _pBindAdapter->getEpollServer()->info("big _sendbuffer > 8K");                                                         
        size_t iBackPacketBuffLimit = _pBindAdapter->getBackPacketBuffLimit();                                                 
                                                                                                                               
        if(iBackPacketBuffLimit != 0 && toSendBytes >= iBackPacketBuffLimit)                                                   
        {                                                                                                                      
            _pBindAdapter->getEpollServer()->error("send [" + _ip + ":" + TC_Common::tostr(_port) + "] buffer too long close.");                                                                                                                              
            clearSlices(_sendbuffer);                                                                                          
            return -2;   //返回-2,则会关闭这个链接                                                                                                        
        }                                                                                                                      
    }                     
           

继续阅读