前言
网络编程中,收发包是一个比较重要的过程。这篇文章主要分析记录tars的收发包步骤(TCP链接,tars协议为例),以及整个过程中对内存的使用。
一.收包
1.收包逻辑
收包的逻辑主要在Connection::recv(recv_queue::queue_type &o)中实现:
- 整个收包过程中发生了三次copy:
- 第一次copy:调用read把内核空间的数据copy到栈上的临时空间。注意到,这里的一次read调用最大为32k(buffer的长度),如果都是大包,是否能改大呢?从而节省read调用次数。
- 第二次copy:把数据append到Connection的缓存中。这里为什么需要Connection的缓存呢?因为TCP是无边界流协议,收到的数据可能还不完整或者收到的数据包含多个包,为了处理这些情况,需要先把数据缓存下来,在适当时机再进行拆包,所以这次copy是省不了的。
- 第三次copy发生在拆包的时候。具体的过程如下:当把socket里的数据read完(返回EAGAIN)或者Connection缓存中的数据已经超过了一定大小,则进行拆包(拆包过程中用了substr,看注释说,字符串太长时substr性能会急剧下降,所以超过8192字节就开始拆包)。如果是tars协议,拆包是根据包头的长度区分包的边界的:
包头固定长度4字节,存储整个包的长度。包长度=包头(4字节)+包体长度 。
- 在拆完包后,把一个完整的包封装成tagRecvData,主要是加了一些上下文信息。在这个过程中,用了std::move避免了一次copy。
struct tagRecvData { uint32_t uid; /**连接标示*/ string buffer; /**需要发送的内容*/ string ip; /**远程连接的ip*/ uint16_t port; /**远程连接的端口*/ int64_t recvTimeStamp; /**接收到数据的时间*/ bool isOverload; /**是否已过载 */ bool isClosed; /**是否已关闭*/ int fd; /*保存产生该消息的fd,用于回包时选择网络线程*/ BindAdapterPtr adapter; /**标识哪一个adapter的消息*/ int closeType; /*如果是关闭消息包,则标识关闭类型,0:表示客户端主动关闭;1:服务端主动关闭;2:连接超时服务端主动关闭*/ };
- 最后,把每个tagRecvData包push到队列中。不管是临时队列,还是Adapter的接收队列,push的都是tagRecvData的指针,所以不会发生copy。至于为什么需要临时队列,我认为是为了节省加锁次数。原因如下:Adapter的接收队列为了保证线程安全,每次push_back都需要加锁,如果先把tagRecvData存到临时队列中,等该次read完(返回EAGAIN)或者积累了一定数量的tagRecvData后再统一push到Adapter的接收队列中,那么就只需要加一次锁。代码如下:
template<typename T, typename D> void TC_ThreadQueue<T, D>::push_back(const queue_type &qt) { Lock lock(*this); typename queue_type::const_iterator it = qt.begin(); typename queue_type::const_iterator itEnd = qt.end(); while(it != itEnd) { _queue.push_back(*it); ++it; ++_size; notify(); } }
2.内存占用
接下来,我们从内存占用的角度来看一下收包过程。上面的收包过程中用到了栈上空间(灰色部分),Connection缓存(黄色部分),临时队列(蓝色部分),这三部分的生命周期如下:
- 灰色部分:栈上空间比较简单,是固定大小的,而且随着每次read结束很快就释放了
- 蓝色部分:临时队列,生命周期是在每次EPOLLIN事件的开始到结束。需要注意的是,临时队列结束并不意味着之前存在该队列的tagRecvData包占用的内存空间释放了。tagRecvData包被push到Adapter的接收队列里,然后由业务线程消费掉这些包,才会释放空间。所以,应当尽早把包push到Adapter的接收队列里,这些包占用的空间才能越快被释放。当临时队列里的包数量大于_iMaxTemQueueSize时,就会主动把包push到Adapter的接收队列了,而不用等到EPOLLIN事件结束后:
_iMaxTemQueueSize在Connection的构造函数里面被赋值为100:if((int) o.size() > _iMaxTemQueueSize) { insertRecvQueue(o); o.clear(); }
TC_EpollServer::NetThread::Connection::Connection(BindAdapter *pBindAdapter) : _pBindAdapter(pBindAdapter) ............ , _iMaxTemQueueSize(100) ............ { ........... }
- 黄色部分:Connection的接收缓存_recvbuffer。_recvbuffer在Connection的整个生命周期里是一直存在的。_recvbuffer用std::string,避免了需要预先分配空间;当开始拆包的时候,_recvbuffer的数据被copy到tagRecvData里。拆包的时机有两种:(1)在read事件结束后;(2)_recvbuffer的大小超过8192:
在read事件结束后,会有两种情况://字符串太长时substr性能会急剧下降 if(_recvbuffer.length() > 8192) { parseProtocol(o); }
- _recvbuffer的大小为0。即收到的数据都是完整的包,已经全包拆包了。
- _recvbuffer的大小不为0。只收到了一个包的一部分,而另一部分还未到达,那么这部分数据就会留在_recvbuffer里,等待下次read数据把包收完整。如果这部分数据比较大,而且并发量上来,同时存在很多Connection占用了大量内存,那可能就会有问题了。Adapter有个配置maxconns可以控制最大链接数,缺省配置是1024,而通过默认模板生成的配置是100000:
DEFAULT_MAX_CONN = 1024, /**缺省最大连接数*/
<TestApp.HelloServer.HelloObjAdapter>
...........
maxconns=100000
...........
</TestApp.HelloServer.HelloObjAdapter>
二.发包
1.发包逻辑
首先看一下最基本的socket发送::send()。主要代码在Connection::send(const string& buffer, const string &ip, uint16_t port, bool byEpollOut)中。在::send的时候会有两种情况:
- 第一种情况,全部数据成功,整个过程只发生了一次copy
- 第二种情况,未发送部分的数据会被copy到Connection的发送缓存里。
缓存里面的数据会在触发EPOLLOUT事件的时候发送出去。所以,完整的发包其实是有2个入口的:
- 第一个入口就是我们刚讲的从发送队列取出包进行发送的过程,即ET_NOTIFY事件类型,入口函数是NetThread::processPipe()
- 第二个入口就是处理发送缓存里面的数据的过程,即ET_NET事件类型,而且epoll事件是EPOLLOUT,入口函数是NetThread::processNet()
- 注意到在第一个过程中,发包用的是tcpSend(),最终调用的是普通的io接口::send();而在发送缓存区中的数据时用的是tcpWriteV(),最终调用的是分散/聚集io接口::writev()。
2.发送缓存_sendbuffer
_sendbuffer是一个std::vector,每个元素TC_Slice是一块独立的内存:
struct TC_Slice
{
explicit TC_Slice(void* d = NULL , size_t ds = 0, size_t l = 0);
void* data;
size_t dataLen; //实际存储数据长度
size_t len; //内存长度
};
std::vector<TC_Slice> _sendbuffer;
在一次tcpSend发送后,剩余部分会分配一个新的TC_Slice来存储,然后push到_sendbuffer中。
int bytes = this->tcpSend(buffer.data(), buffer.size());
if (bytes == -1)
{
_pBindAdapter->getEpollServer()->debug("send [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection by peer.");
return -1;
}
else if (bytes < static_cast<int>(buffer.size()))
{
const char* remainData = &buffer[bytes];
const size_t remainLen = buffer.size() - static_cast<size_t>(bytes);
TC_BufferPool* pool = _pBindAdapter->getEpollServer()->getNetThreadOfFd(_sock.getfd())->_bufferPool;
// avoid too big chunk
for (size_t chunk = 0; chunk * kChunkSize < remainLen; chunk ++)
{
size_t needs = std::min<size_t>(kChunkSize, remainLen - chunk * kChunkSize);
TC_Slice slice = pool->Allocate(needs);
::memcpy(slice.data, remainData + chunk * kChunkSize, needs);
slice.dataLen = needs;
_sendbuffer.push_back(slice);
}
// end
_pBindAdapter->getEpollServer()->info("EAGAIN[" + _ip + ":" + TC_Common::tostr(_port) +
", to sent bytes " + TC_Common::tostr(remainLen) +
", total sent " + TC_Common::tostr(buffer.size()));
}
_sendbuffer将未发送的包分割成多个独立的部分,而且这种独立的内存块适合用分散/聚集io来发送。在新的socket可写事件到来时,可以直接用::writev()来发送缓存起来的包,这部分代码在Connection::send(const std::vector<TC_Slice>& slices)中。
每个网络线程都有自己的内存池TC_BufferPool,用来分配TC_Slice所需的内存。TC_BufferPool的结构如下:
有了内存池,可以避免频繁的向系统申请释放内存。这个内存池分配的内存块都是2的n次方,如果申请的内存不是2的n次方,则向上取到2的n次方,例如申请30字节,实际上会申请32字节。所以,TC_Slice才需要一个len成员存内存大小,一个datalen存实际数据大小。内存池分配的最小内存块,最大内存块,占用内存上限都是可以配置的,具体的配置项如下:
size_t minBlockSize = TC_Common::strto<size_t>(toDefault(_conf.get("/tars/application/server<poolminblocksize>"), "1024")); // 1KB
size_t maxBlockSize = TC_Common::strto<size_t>(toDefault(_conf.get("/tars/application/server<poolmaxblocksize>"), "8388608")); // 8MB
size_t maxBytes = TC_Common::strto<size_t>(toDefault(_conf.get("/tars/application/server<poolmaxbytes>"), "67108864")); // 64MB
epollServer->setNetThreadBufferPoolInfo(minBlockSize, maxBlockSize, maxBytes);
3.内存占用
与发包相关的内存占用主要有2方面:
- 内存池占用的内存。内存池里的内存块是暂时还没被分配出去给发送缓存的,属于已经从系统分配了但是还没被利用的内存。所以,如果内存池占用的内存长时间保持比较高的占用,是有问题的。可以通过配置内存池占用上限控制大小。另外,因为是每个网络线程都有自己的一个内存池,所以也要考虑网络线程数这个因素。
- 每个Connection的发送缓存_sendbuffer占用的内存。严格上来讲,_sendbuffer的占用上限更应该控制好,因为并发越高,就会有越多的_sendbuffer。tars对这里的处理方式是通过配置项<BackPacketBuffLimit>来控制的:
(1)如果BackPacketBuffLimit不配置或者配置为0,则不限制_sendbuffer的大小;
(2)如果BackPacketBufferLimit配置为非0,在_sendbuffer占用的内存大于配置的值时,服务端就会主动关闭这个链接:
size_t toSendBytes = 0;
for (const auto& slice : _sendbuffer)
{
toSendBytes += slice.dataLen;
}
if (toSendBytes >= 8 * 1024)
{
_pBindAdapter->getEpollServer()->info("big _sendbuffer > 8K");
size_t iBackPacketBuffLimit = _pBindAdapter->getBackPacketBuffLimit();
if(iBackPacketBuffLimit != 0 && toSendBytes >= iBackPacketBuffLimit)
{
_pBindAdapter->getEpollServer()->error("send [" + _ip + ":" + TC_Common::tostr(_port) + "] buffer too long close.");
clearSlices(_sendbuffer);
return -2; //返回-2,则会关闭这个链接
}
}