天天看點

muduo緩沖區Buffer類

文章目錄

  • ​​1. retrieve相關方法​​
  • ​​2. 擴容函數Buffer::makeSpace​​
  • ​​3. appned方法​​
  • ​​4. Buffer::readFd讀取資料​​
  • ​​5. Buffer.h​​

mainLoop給subLoop傳遞的TcpConnection對象包含以下一些主要的成員,我們來看一下這兩個Buffer資料成員

muduo緩沖區Buffer類

我們在TCP通信過程中經常會發生TCP的粘包問題,我們一般會在資料頭部分加上資料的長度,通信雙方可以根據資料的長度,讀取資訊,進而避免粘包問題

接收端可能接收的資料比較多,而應用程式目前隻需要一部分資料,那剩下的資料就得存放在緩沖區

同理,發送端可能需要發送的資料比較多,一次無法發送完成,剩下的資料也需要存放在緩沖區

Buffer類具有以下資料成員

muduo緩沖區Buffer類
muduo緩沖區Buffer類
  • prependable bytes:表示資料包的位元組數
  • readerIndex:應用程式從readerIndex指向的位置開始讀緩沖區,[readerIndex, writerIndex]表示待讀取資料,讀完後向後移動len(retrieve方法)
  • writerIndex:應用程式從writerIndex指向的位置開始寫緩沖區,寫完後writerIndex向後移動len(append方法)
  • [readerIndex_, writerIndex_]:辨別可讀資料區間(readableBytes方法)

1. retrieve相關方法

retrieve就是從Buffer讀取資料

void retrieve(size_t len){
    // len就是應用程式從Buffer緩沖區讀取的資料長度
    // 必須要保證len <= readableBytes()
    if(len < readableBytes()){
        // 這裡就是可讀資料沒有讀完
        readerIndex_ += len;
    }else{
        // len == readableBytes()
        // 可讀資料讀完了,readerIndex_和writerIndex_都要複位
        retrieveAll();
    }
}

void retrieveAll(){
    readerIndex_ = kCheapPrepend;
    writerIndex_ = kCheapPrepend;
}

std::string retrieveAsString(size_t len){
    if(len <= readableBytes()){
        std::string result(peek(), len);   // peek傳回緩沖區中可讀資料的起始位址,從可讀位址開始截取len個字元
        retrieve(len);                     // result儲存了緩沖區中的可讀資料,retrieve将參數複位
        return result;
    }
}

// onMessage 有資料到來時,把資料打包成Buffer對象,retrieveAllAsString把Buffer類型轉成string
std::string retrieveAllAsString(){
    return retrieveAsString(readableBytes());
}      

2. 擴容函數Buffer::makeSpace

如果需要寫入緩沖區資料的長度要大于Buffer對象底層vector空閑的長度了,就需要擴容,其中len表示需要寫入資料的長度

void makeSpace(size_t len){
    // writableBytes() + prependableBytes() < len + kCheapPrepend
    if(buffer_.size() - (writerIndex_ - readerIndex_) < len + kCheapPrepend){
        // 直接在writerIndex_後面再擴大len的空間
        buffer_.resize(writerIndex_ + len);
    }else{
        // 如果是空閑空間足夠存放len位元組的資料,就把未讀取的資料統一往前移,移到kCheapPrepend的位置
        size_t readable = readableBytes();  // 這表示剩餘需要讀取的資料
        // 把[readerIndex_, writerIndex_]整體搬到kCheapPrepend
        std::copy(begin() + readerIndex_, begin() + writerIndex_, begin() + kCheapPrepend);
        readerIndex_ = kCheapPrepend;
        writerIndex_ = readerIndex_ + readable;  // writerIndex_指向待讀取資料的末尾
    }
}      
if(buffer_.size() - (writerIndex_ - readerIndex_) < len + kCheapPrepend)      
muduo緩沖區Buffer類

該判斷語句中​

​(writerIndex_ - readerIndex_)​

​表示未讀取資料的區間,用buffer_長度一減,則表示剩餘空閑區間

整個if條件就是,剩餘空閑區間不夠存儲将要寫入緩沖區的len資料了,就需要makeSpace擴容,直接在writerIndex_後面再擴大len的空間

如果剩餘空閑區間夠存儲将要寫入緩沖區的len資料,就把待讀取資料[readerIndex_, writerIndex_]整體搬到kCheapPrepend

3. appned方法

不管是從fd上讀資料寫到緩沖區inputBuffer_,還是發資料要寫入outputBuffer_,我們都要往writeable區間内添加資料

void append(const char* data, size_t len){
    // 確定可寫空間不小于len
    ensureWritableBytes(len);
    // 把data中的資料往可寫的位置writerIndex_寫入
    std::copy(data, data+len, beginWrite());
    // len位元組的data寫入到Buffer後,就移動writerIndex_
    writerIndex_ += len;
}      

4. Buffer::readFd讀取資料

從fd上讀取資料,存放到writerIndex_,傳回實際讀取的資料大小

ssize_t readFd(int fd, int* saveErrno);      

Buffer緩沖區是有大小的(占用堆區記憶體),但是我們無法知道fd上的流式資料有多少,如果我們将緩沖區開的非常大,大到肯定能容納所有讀取的資料,這就太浪費空間了,muduo庫中使用readv方法,根據讀取的資料多少開動态開辟緩沖區

muduo緩沖區Buffer類

readv可以在不連續的多個位址寫入同一個fd上讀取的資料

ssize_t readv(int fd, const struct iovec *iov, int iovcnt);      
  • fd:讀取資料的檔案描述符
  • iov:封裝了緩沖區位址和可寫空間大小的結構體
  • iovcnt:緩沖區個數
struct iovec {
    void  *iov_base;    /* Starting address */
    size_t iov_len;     /* Number of bytes to transfer */
};      
  • iov_base:緩沖區的起始位址
  • iov_len:緩沖區的可允許寫的長度

​Buffer.cc​

#include "Buffer.h"

#include <sys/uio.h>
#include <errno.h>
#include <unistd.h>

ssize_t Buffer::readFd(int fd, int* saveErrno){
    char extrabuff[65536] = {0};  // 64K棧空間,會随着函數棧幀回退,記憶體自動回收
    struct iovec vec[2];

    const size_t writable = writableBytes();   // Buffer底層剩餘的可寫空間

    vec[0].iov_base = begin() + writerIndex_;  // 第一塊緩沖區
    vec[0].iov_len = writable;                 // iov_base緩沖區可寫的大小

    vec[1].iov_base = extrabuff;               // 第二塊緩沖區
    vec[1].iov_len = sizeof(extrabuff);
    
    // 如果Buffer有65536位元組的空閑空間,就不使用棧上的緩沖區,如果不夠65536位元組,就使用棧上的緩沖區,即readv一次最多讀取65536位元組資料
    const int iovcnt = (writable < sizeof(extrabuff)) ? 2 : 1;  
    const ssize_t n = ::readv(fd, vec, iovcnt);

    if(n < 0){
        *saveErrno = errno;
    }else if(n <= writable){
        // 讀取的資料n小于Buffer底層的可寫空間,readv會直接把資料存放在begin() + writerIndex_
        writerIndex_ += n;
    }else{
        // Buffer底層的可寫空間不夠存放n位元組資料,extrabuff有部分資料(n - writable)
        // 從緩沖區末尾再開始寫
        writerIndex_ = buffer_.size();
        // 從extrabuff裡讀取 n - writable 位元組的資料存入Buffer底層的緩沖區
        append(extrabuff, n - writable);
    }

    return n;
}

ssize_t Buffer::writeFd(int fd, int* saveErrno){
    ssize_t n = ::write(fd, peek(), readableBytes());
    if (n < 0){
        *saveErrno = errno;
    }
    return n;
}      

5. Buffer.h

#pragma once

#include <vector>
#include <string>
#include <algorithm>

class Buffer{
    public:
        static const size_t kCheapPrepend = 8;   // 資料包長度用8位元組存儲
        static const size_t kInitailSize = 1024; // 緩沖區初始大小

        explicit Buffer(size_t initialSize = kInitailSize)
            : buffer_(kCheapPrepend + initialSize)              // 底層vector的長度
            , readerIndex_(kCheapPrepend)
            , writerIndex_(kCheapPrepend)
        {}

        // 待讀取資料長度:[readerIndex_, writerIndex_]
        const size_t readableBytes() const{
            return writerIndex_ - readerIndex_;                 
        }

        // 可寫空閑大小
        const size_t writableBytes() const{
            return buffer_.size() - writerIndex_;               
        }

        const size_t prependableBytes() const{
            return readerIndex_;
        }

        // 傳回緩沖區中可讀資料的起始位址
        const char* peek() const{
            return begin() + readerIndex_;
        }

        void retrieve(size_t len){
            // len就是應用程式從Buffer緩沖區讀取的資料長度
            // 必須要保證len <= readableBytes()
            if(len < readableBytes()){
                // 這裡就是可讀資料沒有讀完
                readerIndex_ += len;
            }else{
                // len == readableBytes()
                // 可讀資料讀完了,readerIndex_和writerIndex_都要複位
                retrieveAll();
            }
        }

        void retrieveAll(){
            readerIndex_ = kCheapPrepend;
            writerIndex_ = kCheapPrepend;
        }

        std::string retrieveAsString(size_t len){
            if(len <= readableBytes()){
                std::string result(peek(), len);   // peek傳回緩沖區中可讀資料的起始位址,從可讀位址開始截取len個字元
                retrieve(len);                     // result儲存了緩沖區中的可讀資料,retrieve将參數複位
                return result;
            }
        }

        // onMessage 有資料到來時,把資料打包成Buffer對象,retrieveAllAsString把Buffer類型轉成string
        std::string retrieveAllAsString(){
            return retrieveAsString(readableBytes());
        }

        // 可寫緩沖區為[writerIndex, buffer_.size()],需要寫入的資料長度為len
        void ensureWritableBytes(size_t len){
            if(writableBytes() < len){
                makeSpace(len);     // 擴容
            }
        }

        void append(const char* data, size_t len){
            // 確定可寫空間不小于len
            ensureWritableBytes(len);
            // 把data中的資料往可寫的位置writerIndex_寫入
            std::copy(data, data+len, beginWrite());
            // len位元組的data寫入到Buffer後,就移動writerIndex_
            writerIndex_ += len;
        }
        
        char* beginWrite() {
            return begin() + writerIndex_;
        }

        const char* beginWrite() const{
            return begin() + writerIndex_;
        }

        // 從fd上讀取資料,存放到writerIndex_,傳回實際讀取的資料大小
        ssize_t readFd(int fd, int* saveErrno);

        
    private:
        // 傳回Buffer底層資料首位址
        char* begin(){
            return &(*buffer_.begin());
        }

        // 常對象隻能調用常方法,不能調用普通方法
        const char* begin() const {
            return &(*buffer_.begin());
        }

        void makeSpace(size_t len){
            // writableBytes() + prependableBytes() < len + kCheapPrepend
            if(buffer_.size() - (writerIndex_ - readerIndex_) < len + kCheapPrepend){
                buffer_.resize(writerIndex_ + len);
            }else{
                size_t readable = readableBytes();
                std::copy(begin() + readerIndex_, begin() + writerIndex_, begin() + kCheapPrepend);
                readerIndex_ = kCheapPrepend;
                writerIndex_ = readerIndex_ + readable;
            }
        }

        std::vector<char> buffer_;                               // vector管理的資源自動釋放,Buffer對象在哪個區,buffer_就在哪個區,主要利用vector自動擴容的功能
        size_t readerIndex_;
        size_t writerIndex_;
};      

繼續閱讀