天天看點

muduo源碼學習(2):異步日志——異步日志的實作什麼是異步日志異步日志的實作開啟異步日志功能總結

目錄

什麼是異步日志

異步日志的實作

前端與後端

前端與後端的互動

資源回收

後端與日志檔案

滾動日志

自動flush緩沖區

開啟異步日志功能

總結

        在前文中分析了日志消息的存儲和輸出,不過并沒有涉及到異步日志,下面就來分析一下異步日志是如何實作的。

什麼是異步日志

        在預設的情況下,日志消息都是直接列印到終端螢幕上,但是實際應用中,日志消息都應該寫到本地檔案,友善記錄以及查詢。

        最簡單的方式就是每産生一條日志消息,都将其寫到相應的檔案中,然而這種方式效率低下,如果很多線程在某一段時間内需要輸出大量日志,那麼顯然日志輸出的效率是很低的。之是以效率低,就是因為每條日志消息都需要通過write這類的函數寫出到本地磁盤,這就導緻頻繁調用IO函數,而磁盤操作本身就比較費時,這樣一來後面的代碼就隻能阻塞住,直到前一條日志寫出成功。

        為了優化上述問題,一個比較好的辦法就是:當日志消息積累到一定量的時候再寫到磁盤中,這樣就可以顯著減少IO操作的次數,進而提高效率。

        換句話說,當日志消息需要輸出時,并不會立即将其寫出到磁盤上,而是先把日志消息存儲,直到達到”寫出時機“才會将存儲的日志消息寫出到磁盤,這樣一來,當日志消息生成時,隻需要将其進行存儲而不需要寫出,後續代碼也不會被阻塞,相對于前面的那種阻塞式日志,這種就是非阻塞式日志。

        muduo的異步日志核心思想正是如此。當需要輸出日志的時候,會先将日志存下來,日志消息存儲達到某個門檻值時将這些日志消息全部寫到磁盤。需要考慮的是,如果日志消息産生比較慢,可能很長一段時間都達不到這個門檻值,那就相當于這些日志消息一直無法寫出到磁盤,是以,還應當設定一個逾時值如3s,每過3s不管日志消息存儲量是否達到門檻值,都會将已經存儲的日志消息寫出到磁盤中。即日志寫出到磁盤的兩個時機:1、日志消息存儲量達到寫出門檻值;2、每過3秒自動将存儲的日志消息全部寫出。

        這種非阻塞式日志也是異步的,因為産生日志的線程隻負責産生日志,并不需要去管它産生的這條日志何時寫出,寫往何處...

異步日志的實作

       muduo中通過AsyncLogging類來實作異步日志。

       異步日志分為前端和後端兩部分,前端負責存儲生成的日志消息,而後端則負責将日志消息寫出到磁盤,是以整個異步日志的過程可以看做如下所示:

muduo源碼學習(2):異步日志——異步日志的實作什麼是異步日志異步日志的實作開啟異步日志功能總結

       先來看看前端和後端分别指的是什麼。

前端與後端

class AsyncLogging : noncopyable
{
 public:

  AsyncLogging(const string& basename,
               off_t rollSize,
               int flushInterval = 3);

  ~AsyncLogging()
  {
    if (running_)
    {
      stop();
    }
  }

  void append(const char* logline, int len);

  void start()
  {
    running_ = true;
    thread_.start();
    latch_.wait();  //等待,直到異步日志線程啟動,才能繼續往下執行
  }

  void stop() NO_THREAD_SAFETY_ANALYSIS
  {
    running_ = false;
    cond_.notify();
    thread_.join();
  }

 private:

  void threadFunc();

  typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
  typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
  typedef BufferVector::value_type BufferPtr;

  const int flushInterval_;   //前端緩沖區定期向後端寫入的時間(沖刷間隔)
  std::atomic<bool> running_;  //辨別線程函數是否正在運作
  const string basename_;   //
  const off_t rollSize_;
  muduo::Thread thread_;
  muduo::CountDownLatch latch_;
  muduo::MutexLock mutex_;
  muduo::Condition cond_ GUARDED_BY(mutex_);  //條件變量,主要用于前端緩沖區隊列中沒有資料時的休眠和喚醒
  BufferPtr currentBuffer_ GUARDED_BY(mutex_); //目前緩沖區   4M大小
  BufferPtr nextBuffer_ GUARDED_BY(mutex_);   //預備緩沖區,主要是在調用append向目前緩沖添加日志消息時,如果目前緩沖放不下,目前緩沖就會被移動到前端緩沖隊列中國,此時預備緩沖區用來作為新的目前緩沖
  BufferVector buffers_ GUARDED_BY(mutex_);//前端緩沖區隊列
};
           

        注意到這裡typedef了一個新類型為Buffer類型,根據其定義可知,它就是前文所說的FixedBuffer緩沖區類型,而這個緩沖區大小由kLargeBuffer指定,大小為4M,是以,Buffer就是大小為4M的緩沖區類型。

        這裡定義了currentBuffer_和nextBuffer_,這兩個緩沖區就是上面所說的”前端“,用來暫時存儲生成的日志消息,隻不過nextBuffer_用作預備緩沖區,當currentBuffer_不夠用時用nextBuffer_來補充currentBuffer_。

        然後就是buffers_,這是一個vector,它用來存儲”準備寫到後端“的緩沖區,舉個例子,如果currentBuffer_寫滿了,那麼就會把寫滿的currentBuffer_放到buffers_中。

       如上所述,”前端“會将日志消息全部存到currentBuffer_中,如果放不下了,就會把currentBuffer_放到buffers_中以備”後端“讀取。可想而知,異步日志的”後端“,就主要負責去和buffers_進行互動,将buffers中的緩沖區中的内容全部寫出到磁盤,是以,就需要開啟另一個線程,來執行”後端“的任務,下文将其稱為”後端線程“。

        後端線程由thread_成員封裝,在構造函數中指定其線程函數為threadFunc,如下所示:

AsyncLogging::AsyncLogging(const string& basename,
                           off_t rollSize,
                           int flushInterval)
  : 
    thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),  
    ...
{
  ...
}
           

前端與後端的互動

         現在來看一下前端和後端之間是如何互動的。

void AsyncLogging::append(const char* logline, int len)//向目前緩沖區中添加日志消息,如果目前緩沖區放不下了,那麼就把目前緩沖區放到前端緩沖區隊列中
{
    muduo::MutexLockGuard lock(mutex_);//用鎖來保持同步
    if (currentBuffer_->avail() > len)//如果目前緩沖區還能放下目前日志消息
    {
        currentBuffer_->append(logline, len);//就把日志消息添加到目前緩沖區中
    } else//如果放不下,就把目前緩沖區移動到前端緩沖區隊列中,然後用預備緩沖區來填充目前緩沖區
    { //将目前緩沖區放到前端緩沖區隊列中後就要喚醒後端處理線程
        buffers_.push_back(std::move(currentBuffer_));
        if (nextBuffer_)//如果預備緩沖區還未使用,就用來填充目前緩沖區
        {
            currentBuffer_ = std::move(nextBuffer_);
        } else//如果預備緩沖區無法使用,就重新配置設定一個新的緩沖區(如果日志寫的速度很快,但是IO速度很慢,那麼前端日志緩沖區就會積累,但是後端還沒有來得及處理,此時預備緩沖區也還沒有歸還,就會産生這種情況
        {
            currentBuffer_.reset(new Buffer); // Rarely happens
        }
        currentBuffer_->append(logline, len);//向新的目前緩沖區中寫入日志消息
        cond_.notify();
    }
}

void AsyncLogging::threadFunc()  //寫日志線程,将緩沖區隊列中的資料調用LogFile的append
{
  assert(running_ == true);
  latch_.countDown();   //計數變量latch減1
  LogFile output(basename_, rollSize_, false);   //指定輸出的日志檔案
  BufferPtr newBuffer1(new Buffer);//用來填充移動後的currentBuffer_
  BufferPtr newBuffer2(new Buffer);//用來填充使用後的nextBuffer_
  newBuffer1->bzero(); //緩沖區清零
  newBuffer2->bzero(); //緩沖區清零
  BufferVector buffersToWrite;//後端緩沖區隊列,初始大小為16
  buffersToWrite.reserve(16);
  while (running_)
  {
    assert(newBuffer1 && newBuffer1->length() == 0);
    assert(newBuffer2 && newBuffer2->length() == 0);
    assert(buffersToWrite.empty());

    {
      muduo::MutexLockGuard lock(mutex_);
      if (buffers_.empty())  // unusual usage!    如果前端緩沖區隊列為空,就休眠flushInterval_的時間
      {
        cond_.waitForSeconds(flushInterval_);//如果前端緩沖區隊列中有資料了就會被喚醒
      }
      buffers_.push_back(std::move(currentBuffer_));
	  currentBuffer_ = std::move(newBuffer1); //目前緩沖區擷取新的記憶體
      buffersToWrite.swap(buffers_); //前端緩沖區隊列與後端緩沖區隊列交換
      if (!nextBuffer_) //如果預備緩沖區為空,那麼就使用newBuffer2作為預備緩沖區,保證始終有一個空閑的緩沖區用于預備
      {
        nextBuffer_ = std::move(newBuffer2);
      }
    }

    assert(!buffersToWrite.empty());

    if (buffersToWrite.size() > 25) //如果最終後端緩沖區的緩沖區太多就隻保留前三個
    {
      char buf[256];//buf作為緩沖區太多時的錯誤提示字元串
      snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
               Timestamp::now().toFormattedString().c_str(),
               buffersToWrite.size()-2);
      fputs(buf, stderr);
      output.append(buf, static_cast<int>(strlen(buf)));//将buf寫出到日志檔案中
      buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());//隻保留後端緩沖區隊列中的前三個緩沖區
    }

    for (const auto& buffer : buffersToWrite)//周遊目前後端緩沖區隊列中的所有緩沖區
    {
      // FIXME: use unbuffered stdio FILE ? or use ::writev ?
      output.append(buffer->data(), buffer->length());//依次寫入日志檔案
    }
	//此時後端緩沖區中的日志消息已經全部寫出,就可以重置緩沖區隊列了
    if (buffersToWrite.size() > 2)
    {
      // drop non-bzero-ed buffers, avoid trashing
      buffersToWrite.resize(2);
    }

    if (!newBuffer1)//如果newBuffer1為空 (剛才用來替代目前緩沖了)
    {
      assert(!buffersToWrite.empty());
      newBuffer1 = std::move(buffersToWrite.back()); //把後端緩沖區的最後一個作為newBuffer1
      buffersToWrite.pop_back(); //最後一個元素的擁有權已經轉移到了newBuffer1中,是以彈出最後一個
      newBuffer1->reset(); //重置newBuffer1為空閑狀态(注意,這裡調用的是Buffer類的reset函數而不是unique_ptr的reset函數)
    }

    if (!newBuffer2)//如果newBuffer2為空
    {
      assert(!buffersToWrite.empty());
      newBuffer2 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer2->reset();
    }

    buffersToWrite.clear();//清空後端緩沖區隊列
    output.flush();//清空檔案緩沖區
  }
  output.flush();
}
           

        對于前端,隻需要調用append函數即可,如果currentBuffer_足以放下目前日志消息就調用緩沖區的append函數放入消息,如果放不下,就會将currentBuffer_放入buffer_中,注意,這裡使用的是移動,移動後currentBuffer_為NULL,此時如果預備緩沖區nextBuffer_尚未使用,那麼就會将nextBuffer_的擁有權轉移給currentBuffer_,轉移後nextBuffer_為NULL,意為已被使用;而如果預備緩沖區本身就為NULL,這種情況會出現在非常頻繁調用append函數,導緻連續多次填滿currentBuffer_的時候,此時nextBuffer_已無法為currentBuffer_提供預備空間,是以隻能為currentBuffer_重新配置設定新的空間。(實際上這種情況很少發生,因為預設的每條日志消息的大小最大為4K,而currentBuffer_的大小為4M,除非連續寫入8M以上的日志消息,而後端來不及處理這些消息,才會發生這種情況)。目前端向buffers_中移入緩沖區後,就會喚醒條件變量。

        接着來看看後端,通過threadFunc函數可知,後端線程會循環去檢查buffers_,如果buffers為空,那麼後端線程就會休眠最多為flushInterval指定的秒數(預設為3秒),如果在此期間buffers中有了資料,後端線程就會被喚醒,否則就一直休眠直到逾時,不管是哪種喚醒,都會将currentBuffer移入buffers中,這是因為後端線程每次操作都是準備将所有日志消息進行輸出,而currentBuffer中大多數情況下都存有日志消息,是以即使其未滿也會被放入buffers中,然後用newBuffer1來補充currentBuffer。

        接下來就需要注意buffersToWrite這個vector,和buffers是相同的類型,buffersToWrite就是後端緩沖區隊列,負責将前端buffers中的資料拿過來,然後把這些資料寫出到磁盤。是以,當currentBuffer被移入buffersToWrite後,就會立刻調用swap函數交換buffersToWrite和buffers,這一部交換了這兩個vector中的記憶體指針,相當于二者交換了各自的内容,buffers變成了空的,而前面所有存有日志消息的緩沖區,則全部到了buffersToWrite中。

        然後,如果此時預備緩沖區為空,說明已經被使用過,就會用newBuffer2來補充它,至此,互斥鎖釋放。這裡互斥鎖的釋放位置是個值得思考的地方,考慮到并發效率,互斥鎖持有的臨界區大小不應太大(不應簡單的去鎖住每一輪循環),在buffersToWrite獲得了buffers的資料之後,其它線程就可以正常的調用append來添加日志消息了,因為此時buffers重置為空,并且buffersToWrite是局部變量,二者互不影響。

資源回收

        接着就是很自然的步驟了:将buffersToWrite中所有的緩沖區内容寫到本地磁盤中,這一點後面再分析。

        在寫出結束後,buffersToWrite中緩沖區的内容就已經沒價值了,不過廢物依然可以回收:由于前面newBuffer1和newBuffer2都有可能被使用過而為空,是以可以将buffersToWrite中的元素用來填充newBuffer1和newBuffer2。

        實際上,在正常情況下(指的是日志消息産生速度不會連續爆掉兩塊currentBuffer),currentBuffer、nextBuffer、newBuffer1和newBuffer2是不需要二次配置設定空間的,因為它們之間通過buffers和buffersToWrite恰好可以構成一個資源使用環:前端将currentBuffer移入buffers後用nextBuffer填補currentBuffer,後端線程将新的currentBuffer再次移入buffers,然後用newBuffer1和newBuffer2去填充currentBuffer和nextBuffer,最後又從buffersToWrite中擷取元素來填充newBuffer1和newBuffer2,可見,資源的消耗端在currentBuffer和nextBuffer,而資源的補充端在newBuffer1和newBuffer2,如果這個過程是平衡的,那麼這4個緩沖區都無需再配置設定新的空間,然後,這一點并不能得到保證,如果預備緩沖區數量越多,越能保證這一點,不過帶來的就是空間上的消耗了。

後端與日志檔案

        後端線程将緩沖區内容寫出到日志檔案通過調用LogFile類的append函數實作,但是muduo中與磁盤檔案互動最緊密的并不是LogFile類,而是AppendFile類,該類含有一個檔案指針指向外部檔案,其最主要的函數就是append函數,定義如下:

class AppendFile : noncopyable
{
 public:
  explicit AppendFile(StringArg filename);

  ~AppendFile();

  void append(const char* logline, size_t len);

  void flush();

  off_t writtenBytes() const { return writtenBytes_; }

 private:

  size_t write(const char* logline, size_t len);

  FILE* fp_;
  char buffer_[64*1024];//緩沖區大小為64K,預設的是4K
  off_t writtenBytes_;//辨別目前檔案一共寫入了多少位元組的資料,如果超過了rollsize,LogFile就會進行rollFile,建立新的日志檔案,而這個檔案就不會再寫入了
};

void FileUtil::AppendFile::append(const char* logline, const size_t len)
{
  size_t n = write(logline, len);  //寫出日志消息
  size_t remain = len - n;  //計算未寫出的部分
  while (remain > 0)//循環直到全部寫出
  {
    size_t x = write(logline + n, remain);  //實際調用fwrite_unlock
    if (x == 0)
    {
      int err = ferror(fp_);
      if (err)
      {
        fprintf(stderr, "AppendFile::append() failed %s\n", strerror_tl(err)); //stderr不帶緩沖,會立刻輸出
      }
      break;
    }
    n += x;
    remain = len - n; // remain -= x
  }

  writtenBytes_ += len;
}
           

       可見,AppendFile類的append函數進行了IO操作,writtenBytes會記錄下寫出到fp_對應的檔案的位元組數。

       LogFile類中通過unique_ptr包裝了一個AppendFile類執行個體file_,在後端線程寫出時所調用的LogFile類的append函數中,就會通過該執行個體調用AppendFile類的append函數來将後端緩沖區中的内容全部寫出到日志檔案中,如下所示:

void LogFile::append_unlocked(const char* logline, int len)
{
  file_->append(logline, len);//将緩沖區内容寫出到日志檔案中

  if (file_->writtenBytes() > rollSize_)//如果寫出的位元組數大于了rollsize,就通過rollFile建立一個檔案
  {
    rollFile();
  }
  else
  {
    ++count_;
    if (count_ >= checkEveryN_)   //每調用一次append計數一次,每調用1024次檢查是否需要隔天rollfile或者flush緩沖區
    {
      count_ = 0;
      time_t now = ::time(NULL);
      time_t thisPeriod_ = now / kRollPerSeconds_ * kRollPerSeconds_;
      if (thisPeriod_ != startOfPeriod_)
      {
        rollFile();
      }
      else if (now - lastFlush_ > flushInterval_)  //外部檔案流是全緩沖的,是以fwrite并不能立刻将資料寫出到外部檔案中,是以需要設定一個flush間隔,每隔一段時間将資料flush到外部檔案中
      {
        lastFlush_ = now;
        file_->flush();
      }
    }
  }
}
           

         在後端與日志檔案的互動中,除了寫出資料到日志檔案,還進行了兩個重要的操作:滾動日志、自動flush緩沖區。

滾動日志

        日志滾動通過rollFile函數實作,如下所示:

bool LogFile::rollFile()
{

  time_t now = 0;
  string filename = getLogFileName(basename_, &now);//得到輸出日志的檔案名
  time_t start = now / kRollPerSeconds_ * kRollPerSeconds_;//計算現在是第幾天 now/kRollPerSeconds求出現在是第幾天,再乘以秒數相當于是目前天數0點對應的秒數

  if (now > lastRoll_)
  {
      rollcnt++;
    lastRoll_ = now;//更新lastRoll
    lastFlush_ = now;//更新lastFlush
    startOfPeriod_ = start;
    file_.reset(new FileUtil::AppendFile(filename));//讓file_指向一個名為filename的檔案,相當于建立了一個檔案
    return true;
  }
  return false;
}
           

        可以看到,rollFile的作用,就是建立一個新檔案,然後讓file_去指向這個新檔案,新檔案的命名方式為:basename + time + hostname + pid + ".log",在此之後所有日志消息都将寫到新檔案中。

        回到LogFile的append函數中,可以看到rollFile發生在兩種情況下:1.當寫出到日志檔案的位元組數達到滾動門檻值,這個門檻值由AsyncLogging構造時指定,并用來構造LogFile;2.每到新的一天就滾動一次。

        需要注意的是第2點,并不是到了新的一天的第一條日志消息就會導緻rollFile,而是每調用1024次append函數時會去檢查是否到了新的一天。可見這種方式還是有點問題的,因為可能存在到了新的一天但是沒有達到1024次調用的情況,不過如果連1024次都沒有達到,說明日志消息很少,也沒有什麼必要建立一個新的日志檔案。此外,如果每次調用append都去判斷是否是新的一天,那麼每次都需要通過gmtime、gettimeofday這類的函數去擷取時間,這樣一來可能就顯得得不償失了。(在muduo中,由于是通過gmtime來擷取時間的,是以會在0時區0時,即中原標準時間8時才算是”新的一天“)。

自動flush緩沖區

        為什麼需要flush緩沖區?這是因為通過與日志檔案互動的檔案流是全緩沖的,隻有當檔案緩沖區滿或者flush時才會将緩沖區中的内容寫到檔案中。而對于日志消息這種需要頻繁寫出的情況,如果不調用flush,那麼就隻有緩沖區滿了才會将資料寫出到檔案中,如果程序突然崩潰,緩沖區中還未寫出的資料就丢失了,而如果調用flush的次數過多,無疑又會影響效率。

        是以,muduo通過flushInterval變量來設定flush的間隔,預設為3s,即至少過3s才會自動flush,之是以說是”至少“,是因為判斷間隔是否達到3秒,也需要調用時間擷取函數去擷取時間,如果每一次append都來判斷一次,那麼也是得不償失的,是以,是否需要flush也是每append1024次再來進行判斷。

開啟異步日志功能

         通過前文可以知道,每一條日志消息實際上都是基于Logger類實作的,是以,要想實作異步日志,就需要将日志消息成功存入”前端緩沖區“,而這一點,隻需要将Logger的g_output設定為AsyncLogging的append函數即可,如下所示:

muduo::AsyncLogging* g_asyncLog = NULL;

void asyncOutput(const char* msg, int len)  
{
  g_asyncLog->append(msg, len);
}

muduo::Logger::setOutput(asyncOutput);

           

         這樣就可以将每條日志消息成功存儲前端緩沖區,接着還需要開啟後端線程,調用AsyncLogging類的start函數即可。

總結

        異步日志的實作,在Logger類的基礎上,還需要AsyncLogging、LogFile、AppendFile類。

        其中AppendFile類用于将緩沖區資料寫出到日志檔案;

        LogFile中包含了AppendFile的執行個體,并且實作了滾動檔案和自動flush緩沖區的功能;

        AsyncLogging包含了異步日志的前端和後端,前端與Logger相連接配接,通過Logger來獲得每一條日志消息并進行存儲,後端線程建立LogFile局部執行個體,從前端緩沖區中得到日志消息後通過LogFile局部執行個體将日志消息寫出到日志檔案中。