天天看点

SRS源码分析-读写类SrsStSocket

关键类SrsStSocket封装了socket的读写操作,将负责将数据发送给对端(send)以及读取对端发送过来的数据(read)。

继承关系为:

SRS源码分析-读写类SrsStSocket

相关源码如下:

//使用协程的TCP
class SrsStSocket : public ISrsProtocolReaderWriter
{
private:
    // The recv/send timeout in ms.
    // @remark Use SRS_CONSTS_NO_TMMS for never timeout in ms.
    int64_t rtm; //recv timeout
    int64_t stm; //send timeout
    // The recv/send data in bytes
    int64_t rbytes; //recv data bytes
    int64_t sbytes; //send data bytes
    // The underlayer st fd.
    srs_netfd_t stfd; //st fd
public:
    SrsStSocket();
    virtual ~SrsStSocket();
public:
    // Initialize the socket with stfd, user must manage it.
    virtual srs_error_t initialize(srs_netfd_t fd); //初始化
public:
    virtual bool is_never_timeout(int64_t tm);
    virtual void set_recv_timeout(int64_t tm);
    virtual int64_t get_recv_timeout();
    virtual void set_send_timeout(int64_t tm);
    virtual int64_t get_send_timeout();
    virtual int64_t get_recv_bytes();
    virtual int64_t get_send_bytes();
public:
    /**
     * @param nread, the actual read bytes, ignore if NULL.
     */
    virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
    virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
    /**
     * @param nwrite, the actual write bytes, ignore if NULL.
     */
    virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
    virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};


//构造函数
SrsStSocket::SrsStSocket()
{
    stfd = NULL;
    stm = rtm = SRS_CONSTS_NO_TMMS;
    rbytes = sbytes = 0;
}

//析构函数
SrsStSocket::~SrsStSocket()
{
}

//初始化,srs的fd
srs_error_t SrsStSocket::initialize(srs_netfd_t fd)
{
    stfd = fd;
    return srs_success;
}

//是否永不超时
bool SrsStSocket::is_never_timeout(int64_t tm)
{
    return tm == SRS_CONSTS_NO_TMMS;
}

//设置接收消息的超时时间
void SrsStSocket::set_recv_timeout(int64_t tm)
{
    rtm = tm;
}

//获取接收消息的超时时间
int64_t SrsStSocket::get_recv_timeout()
{
    return rtm;
}

//设置发送消息的超时时间
void SrsStSocket::set_send_timeout(int64_t tm)
{
    stm = tm;
}

//获取发送消息的超时时间
int64_t SrsStSocket::get_send_timeout()
{
    return stm;
}

//获取接收到的消息的字节数
int64_t SrsStSocket::get_recv_bytes()
{
    return rbytes;
}

//获取发送的消息的字节数
int64_t SrsStSocket::get_send_bytes()
{
    return sbytes;
}

//读:从stfd中读取size bytes数据到buf,读取到的字节数为nread
srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{
    srs_error_t err = srs_success;
    
    ssize_t nb_read; //读取的字节数
    //从stfd中读取size个字节的消息到buf中
    if (rtm == SRS_CONSTS_NO_TMMS) {
        nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
    } else {
        nb_read = st_read((st_netfd_t)stfd, buf, size, rtm * 1000);
    }
    
    if (nread) {
        *nread = nb_read;
    }
    
    // On success a non-negative integer indicating the number of bytes actually read is returned
    // (a value of 0 means the network connection is closed or end of file is reached).
    // Otherwise, a value of -1 is returned and errno is set to indicate the error.
    //nb_read < 0:超时
    //nb_read = 0:对端已经关闭
    //nb_read > 0:得到读取的字节数nb_read
    if (nb_read <= 0) {
        // @see https://github.com/ossrs/srs/issues/200
        if (nb_read < 0 && errno == ETIME) {
            return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm);
        }
        
        if (nb_read == 0) {
            errno = ECONNRESET;
        }
        
        return srs_error_new(ERROR_SOCKET_READ, "read");
    }
    
    rbytes += nb_read; //读取到的字节数增加nb_read
    
    return err;
}

//读取size,必须读完
srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{
    srs_error_t err = srs_success;
    
    ssize_t nb_read;
    if (rtm == SRS_CONSTS_NO_TMMS) {
        nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
    } else {
        nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm * 1000);
    }
    
    if (nread) {
        *nread = nb_read;
    }
    
    // On success a non-negative integer indicating the number of bytes actually read is returned
    // (a value less than nbyte means the network connection is closed or end of file is reached)
    // Otherwise, a value of -1 is returned and errno is set to indicate the error.
    if (nb_read != (ssize_t)size) {
        // @see https://github.com/ossrs/srs/issues/200
        if (nb_read < 0 && errno == ETIME) {
            return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm);
        }
        
        if (nb_read >= 0) {
            errno = ECONNRESET;
        }
        
        return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully");
    }
    
    rbytes += nb_read;
    
    return err;
}

//向fd写入数据,buf为存放数据的数组,size为写入的字节数,nwrite为实际写入的字节数
//在实际st库中,调用write/writev写入数据,提高写入的效率
srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{
    srs_error_t err = srs_success;
    
    ssize_t nb_write;
    if (stm == SRS_CONSTS_NO_TMMS) {
        nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
    } else {
        nb_write = st_write((st_netfd_t)stfd, buf, size, stm * 1000);
    }
    
    if (nwrite) {
        *nwrite = nb_write;
    }
    
    // On success a non-negative integer equal to nbyte is returned.
    // Otherwise, a value of -1 is returned and errno is set to indicate the error.
    if (nb_write <= 0) {
        // @see https://github.com/ossrs/srs/issues/200
        if (nb_write < 0 && errno == ETIME) {
            return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", stm);
        }
        
        return srs_error_new(ERROR_SOCKET_WRITE, "write");
    }
    
    sbytes += nb_write;
    
    return err;
}

//writev,一次写入多个buffer
srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
    srs_error_t err = srs_success;
    
    ssize_t nb_write;
    if (stm == SRS_CONSTS_NO_TMMS) {
        nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
    } else {
        nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm * 1000);
    }
    
    if (nwrite) {
        *nwrite = nb_write;
    }
    
    // On success a non-negative integer equal to nbyte is returned.
    // Otherwise, a value of -1 is returned and errno is set to indicate the error.
    if (nb_write <= 0) {
        // @see https://github.com/ossrs/srs/issues/200
        if (nb_write < 0 && errno == ETIME) {
            return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", stm);
        }
        
        return srs_error_new(ERROR_SOCKET_WRITE, "writev");
    }
    
    sbytes += nb_write;
    
    return err;
}
           
srs