目錄
什麼是異步日志
異步日志的實作
前端與後端
前端與後端的互動
資源回收
後端與日志檔案
滾動日志
自動flush緩沖區
開啟異步日志功能
總結
在前文中分析了日志消息的存儲和輸出,不過并沒有涉及到異步日志,下面就來分析一下異步日志是如何實作的。
什麼是異步日志
在預設的情況下,日志消息都是直接列印到終端螢幕上,但是實際應用中,日志消息都應該寫到本地檔案,友善記錄以及查詢。
最簡單的方式就是每産生一條日志消息,都将其寫到相應的檔案中,然而這種方式效率低下,如果很多線程在某一段時間内需要輸出大量日志,那麼顯然日志輸出的效率是很低的。之是以效率低,就是因為每條日志消息都需要通過write這類的函數寫出到本地磁盤,這就導緻頻繁調用IO函數,而磁盤操作本身就比較費時,這樣一來後面的代碼就隻能阻塞住,直到前一條日志寫出成功。
為了優化上述問題,一個比較好的辦法就是:當日志消息積累到一定量的時候再寫到磁盤中,這樣就可以顯著減少IO操作的次數,進而提高效率。
換句話說,當日志消息需要輸出時,并不會立即将其寫出到磁盤上,而是先把日志消息存儲,直到達到”寫出時機“才會将存儲的日志消息寫出到磁盤,這樣一來,當日志消息生成時,隻需要将其進行存儲而不需要寫出,後續代碼也不會被阻塞,相對于前面的那種阻塞式日志,這種就是非阻塞式日志。
muduo的異步日志核心思想正是如此。當需要輸出日志的時候,會先将日志存下來,日志消息存儲達到某個門檻值時将這些日志消息全部寫到磁盤。需要考慮的是,如果日志消息産生比較慢,可能很長一段時間都達不到這個門檻值,那就相當于這些日志消息一直無法寫出到磁盤,是以,還應當設定一個逾時值如3s,每過3s不管日志消息存儲量是否達到門檻值,都會将已經存儲的日志消息寫出到磁盤中。即日志寫出到磁盤的兩個時機:1、日志消息存儲量達到寫出門檻值;2、每過3秒自動将存儲的日志消息全部寫出。
這種非阻塞式日志也是異步的,因為産生日志的線程隻負責産生日志,并不需要去管它産生的這條日志何時寫出,寫往何處...
異步日志的實作
muduo中通過AsyncLogging類來實作異步日志。
異步日志分為前端和後端兩部分,前端負責存儲生成的日志消息,而後端則負責将日志消息寫出到磁盤,是以整個異步日志的過程可以看做如下所示:
先來看看前端和後端分别指的是什麼。
前端與後端
class AsyncLogging : noncopyable
{
public:
AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval = 3);
~AsyncLogging()
{
if (running_)
{
stop();
}
}
void append(const char* logline, int len);
void start()
{
running_ = true;
thread_.start();
latch_.wait(); //等待,直到異步日志線程啟動,才能繼續往下執行
}
void stop() NO_THREAD_SAFETY_ANALYSIS
{
running_ = false;
cond_.notify();
thread_.join();
}
private:
void threadFunc();
typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
typedef BufferVector::value_type BufferPtr;
const int flushInterval_; //前端緩沖區定期向後端寫入的時間(沖刷間隔)
std::atomic<bool> running_; //辨別線程函數是否正在運作
const string basename_; //
const off_t rollSize_;
muduo::Thread thread_;
muduo::CountDownLatch latch_;
muduo::MutexLock mutex_;
muduo::Condition cond_ GUARDED_BY(mutex_); //條件變量,主要用于前端緩沖區隊列中沒有資料時的休眠和喚醒
BufferPtr currentBuffer_ GUARDED_BY(mutex_); //目前緩沖區 4M大小
BufferPtr nextBuffer_ GUARDED_BY(mutex_); //預備緩沖區,主要是在調用append向目前緩沖添加日志消息時,如果目前緩沖放不下,目前緩沖就會被移動到前端緩沖隊列中國,此時預備緩沖區用來作為新的目前緩沖
BufferVector buffers_ GUARDED_BY(mutex_);//前端緩沖區隊列
};
注意到這裡typedef了一個新類型為Buffer類型,根據其定義可知,它就是前文所說的FixedBuffer緩沖區類型,而這個緩沖區大小由kLargeBuffer指定,大小為4M,是以,Buffer就是大小為4M的緩沖區類型。
這裡定義了currentBuffer_和nextBuffer_,這兩個緩沖區就是上面所說的”前端“,用來暫時存儲生成的日志消息,隻不過nextBuffer_用作預備緩沖區,當currentBuffer_不夠用時用nextBuffer_來補充currentBuffer_。
然後就是buffers_,這是一個vector,它用來存儲”準備寫到後端“的緩沖區,舉個例子,如果currentBuffer_寫滿了,那麼就會把寫滿的currentBuffer_放到buffers_中。
如上所述,”前端“會将日志消息全部存到currentBuffer_中,如果放不下了,就會把currentBuffer_放到buffers_中以備”後端“讀取。可想而知,異步日志的”後端“,就主要負責去和buffers_進行互動,将buffers中的緩沖區中的内容全部寫出到磁盤,是以,就需要開啟另一個線程,來執行”後端“的任務,下文将其稱為”後端線程“。
後端線程由thread_成員封裝,在構造函數中指定其線程函數為threadFunc,如下所示:
AsyncLogging::AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval)
:
thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),
...
{
...
}
前端與後端的互動
現在來看一下前端和後端之間是如何互動的。
void AsyncLogging::append(const char* logline, int len)//向目前緩沖區中添加日志消息,如果目前緩沖區放不下了,那麼就把目前緩沖區放到前端緩沖區隊列中
{
muduo::MutexLockGuard lock(mutex_);//用鎖來保持同步
if (currentBuffer_->avail() > len)//如果目前緩沖區還能放下目前日志消息
{
currentBuffer_->append(logline, len);//就把日志消息添加到目前緩沖區中
} else//如果放不下,就把目前緩沖區移動到前端緩沖區隊列中,然後用預備緩沖區來填充目前緩沖區
{ //将目前緩沖區放到前端緩沖區隊列中後就要喚醒後端處理線程
buffers_.push_back(std::move(currentBuffer_));
if (nextBuffer_)//如果預備緩沖區還未使用,就用來填充目前緩沖區
{
currentBuffer_ = std::move(nextBuffer_);
} else//如果預備緩沖區無法使用,就重新配置設定一個新的緩沖區(如果日志寫的速度很快,但是IO速度很慢,那麼前端日志緩沖區就會積累,但是後端還沒有來得及處理,此時預備緩沖區也還沒有歸還,就會産生這種情況
{
currentBuffer_.reset(new Buffer); // Rarely happens
}
currentBuffer_->append(logline, len);//向新的目前緩沖區中寫入日志消息
cond_.notify();
}
}
void AsyncLogging::threadFunc() //寫日志線程,将緩沖區隊列中的資料調用LogFile的append
{
assert(running_ == true);
latch_.countDown(); //計數變量latch減1
LogFile output(basename_, rollSize_, false); //指定輸出的日志檔案
BufferPtr newBuffer1(new Buffer);//用來填充移動後的currentBuffer_
BufferPtr newBuffer2(new Buffer);//用來填充使用後的nextBuffer_
newBuffer1->bzero(); //緩沖區清零
newBuffer2->bzero(); //緩沖區清零
BufferVector buffersToWrite;//後端緩沖區隊列,初始大小為16
buffersToWrite.reserve(16);
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
{
muduo::MutexLockGuard lock(mutex_);
if (buffers_.empty()) // unusual usage! 如果前端緩沖區隊列為空,就休眠flushInterval_的時間
{
cond_.waitForSeconds(flushInterval_);//如果前端緩沖區隊列中有資料了就會被喚醒
}
buffers_.push_back(std::move(currentBuffer_));
currentBuffer_ = std::move(newBuffer1); //目前緩沖區擷取新的記憶體
buffersToWrite.swap(buffers_); //前端緩沖區隊列與後端緩沖區隊列交換
if (!nextBuffer_) //如果預備緩沖區為空,那麼就使用newBuffer2作為預備緩沖區,保證始終有一個空閑的緩沖區用于預備
{
nextBuffer_ = std::move(newBuffer2);
}
}
assert(!buffersToWrite.empty());
if (buffersToWrite.size() > 25) //如果最終後端緩沖區的緩沖區太多就隻保留前三個
{
char buf[256];//buf作為緩沖區太多時的錯誤提示字元串
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));//将buf寫出到日志檔案中
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());//隻保留後端緩沖區隊列中的前三個緩沖區
}
for (const auto& buffer : buffersToWrite)//周遊目前後端緩沖區隊列中的所有緩沖區
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffer->data(), buffer->length());//依次寫入日志檔案
}
//此時後端緩沖區中的日志消息已經全部寫出,就可以重置緩沖區隊列了
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2);
}
if (!newBuffer1)//如果newBuffer1為空 (剛才用來替代目前緩沖了)
{
assert(!buffersToWrite.empty());
newBuffer1 = std::move(buffersToWrite.back()); //把後端緩沖區的最後一個作為newBuffer1
buffersToWrite.pop_back(); //最後一個元素的擁有權已經轉移到了newBuffer1中,是以彈出最後一個
newBuffer1->reset(); //重置newBuffer1為空閑狀态(注意,這裡調用的是Buffer類的reset函數而不是unique_ptr的reset函數)
}
if (!newBuffer2)//如果newBuffer2為空
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();//清空後端緩沖區隊列
output.flush();//清空檔案緩沖區
}
output.flush();
}
對于前端,隻需要調用append函數即可,如果currentBuffer_足以放下目前日志消息就調用緩沖區的append函數放入消息,如果放不下,就會将currentBuffer_放入buffer_中,注意,這裡使用的是移動,移動後currentBuffer_為NULL,此時如果預備緩沖區nextBuffer_尚未使用,那麼就會将nextBuffer_的擁有權轉移給currentBuffer_,轉移後nextBuffer_為NULL,意為已被使用;而如果預備緩沖區本身就為NULL,這種情況會出現在非常頻繁調用append函數,導緻連續多次填滿currentBuffer_的時候,此時nextBuffer_已無法為currentBuffer_提供預備空間,是以隻能為currentBuffer_重新配置設定新的空間。(實際上這種情況很少發生,因為預設的每條日志消息的大小最大為4K,而currentBuffer_的大小為4M,除非連續寫入8M以上的日志消息,而後端來不及處理這些消息,才會發生這種情況)。目前端向buffers_中移入緩沖區後,就會喚醒條件變量。
接着來看看後端,通過threadFunc函數可知,後端線程會循環去檢查buffers_,如果buffers為空,那麼後端線程就會休眠最多為flushInterval指定的秒數(預設為3秒),如果在此期間buffers中有了資料,後端線程就會被喚醒,否則就一直休眠直到逾時,不管是哪種喚醒,都會将currentBuffer移入buffers中,這是因為後端線程每次操作都是準備将所有日志消息進行輸出,而currentBuffer中大多數情況下都存有日志消息,是以即使其未滿也會被放入buffers中,然後用newBuffer1來補充currentBuffer。
接下來就需要注意buffersToWrite這個vector,和buffers是相同的類型,buffersToWrite就是後端緩沖區隊列,負責将前端buffers中的資料拿過來,然後把這些資料寫出到磁盤。是以,當currentBuffer被移入buffersToWrite後,就會立刻調用swap函數交換buffersToWrite和buffers,這一部交換了這兩個vector中的記憶體指針,相當于二者交換了各自的内容,buffers變成了空的,而前面所有存有日志消息的緩沖區,則全部到了buffersToWrite中。
然後,如果此時預備緩沖區為空,說明已經被使用過,就會用newBuffer2來補充它,至此,互斥鎖釋放。這裡互斥鎖的釋放位置是個值得思考的地方,考慮到并發效率,互斥鎖持有的臨界區大小不應太大(不應簡單的去鎖住每一輪循環),在buffersToWrite獲得了buffers的資料之後,其它線程就可以正常的調用append來添加日志消息了,因為此時buffers重置為空,并且buffersToWrite是局部變量,二者互不影響。
資源回收
接着就是很自然的步驟了:将buffersToWrite中所有的緩沖區内容寫到本地磁盤中,這一點後面再分析。
在寫出結束後,buffersToWrite中緩沖區的内容就已經沒價值了,不過廢物依然可以回收:由于前面newBuffer1和newBuffer2都有可能被使用過而為空,是以可以将buffersToWrite中的元素用來填充newBuffer1和newBuffer2。
實際上,在正常情況下(指的是日志消息産生速度不會連續爆掉兩塊currentBuffer),currentBuffer、nextBuffer、newBuffer1和newBuffer2是不需要二次配置設定空間的,因為它們之間通過buffers和buffersToWrite恰好可以構成一個資源使用環:前端将currentBuffer移入buffers後用nextBuffer填補currentBuffer,後端線程将新的currentBuffer再次移入buffers,然後用newBuffer1和newBuffer2去填充currentBuffer和nextBuffer,最後又從buffersToWrite中擷取元素來填充newBuffer1和newBuffer2,可見,資源的消耗端在currentBuffer和nextBuffer,而資源的補充端在newBuffer1和newBuffer2,如果這個過程是平衡的,那麼這4個緩沖區都無需再配置設定新的空間,然後,這一點并不能得到保證,如果預備緩沖區數量越多,越能保證這一點,不過帶來的就是空間上的消耗了。
後端與日志檔案
後端線程将緩沖區内容寫出到日志檔案通過調用LogFile類的append函數實作,但是muduo中與磁盤檔案互動最緊密的并不是LogFile類,而是AppendFile類,該類含有一個檔案指針指向外部檔案,其最主要的函數就是append函數,定義如下:
class AppendFile : noncopyable
{
public:
explicit AppendFile(StringArg filename);
~AppendFile();
void append(const char* logline, size_t len);
void flush();
off_t writtenBytes() const { return writtenBytes_; }
private:
size_t write(const char* logline, size_t len);
FILE* fp_;
char buffer_[64*1024];//緩沖區大小為64K,預設的是4K
off_t writtenBytes_;//辨別目前檔案一共寫入了多少位元組的資料,如果超過了rollsize,LogFile就會進行rollFile,建立新的日志檔案,而這個檔案就不會再寫入了
};
void FileUtil::AppendFile::append(const char* logline, const size_t len)
{
size_t n = write(logline, len); //寫出日志消息
size_t remain = len - n; //計算未寫出的部分
while (remain > 0)//循環直到全部寫出
{
size_t x = write(logline + n, remain); //實際調用fwrite_unlock
if (x == 0)
{
int err = ferror(fp_);
if (err)
{
fprintf(stderr, "AppendFile::append() failed %s\n", strerror_tl(err)); //stderr不帶緩沖,會立刻輸出
}
break;
}
n += x;
remain = len - n; // remain -= x
}
writtenBytes_ += len;
}
可見,AppendFile類的append函數進行了IO操作,writtenBytes會記錄下寫出到fp_對應的檔案的位元組數。
LogFile類中通過unique_ptr包裝了一個AppendFile類執行個體file_,在後端線程寫出時所調用的LogFile類的append函數中,就會通過該執行個體調用AppendFile類的append函數來将後端緩沖區中的内容全部寫出到日志檔案中,如下所示:
void LogFile::append_unlocked(const char* logline, int len)
{
file_->append(logline, len);//将緩沖區内容寫出到日志檔案中
if (file_->writtenBytes() > rollSize_)//如果寫出的位元組數大于了rollsize,就通過rollFile建立一個檔案
{
rollFile();
}
else
{
++count_;
if (count_ >= checkEveryN_) //每調用一次append計數一次,每調用1024次檢查是否需要隔天rollfile或者flush緩沖區
{
count_ = 0;
time_t now = ::time(NULL);
time_t thisPeriod_ = now / kRollPerSeconds_ * kRollPerSeconds_;
if (thisPeriod_ != startOfPeriod_)
{
rollFile();
}
else if (now - lastFlush_ > flushInterval_) //外部檔案流是全緩沖的,是以fwrite并不能立刻将資料寫出到外部檔案中,是以需要設定一個flush間隔,每隔一段時間将資料flush到外部檔案中
{
lastFlush_ = now;
file_->flush();
}
}
}
}
在後端與日志檔案的互動中,除了寫出資料到日志檔案,還進行了兩個重要的操作:滾動日志、自動flush緩沖區。
滾動日志
日志滾動通過rollFile函數實作,如下所示:
bool LogFile::rollFile()
{
time_t now = 0;
string filename = getLogFileName(basename_, &now);//得到輸出日志的檔案名
time_t start = now / kRollPerSeconds_ * kRollPerSeconds_;//計算現在是第幾天 now/kRollPerSeconds求出現在是第幾天,再乘以秒數相當于是目前天數0點對應的秒數
if (now > lastRoll_)
{
rollcnt++;
lastRoll_ = now;//更新lastRoll
lastFlush_ = now;//更新lastFlush
startOfPeriod_ = start;
file_.reset(new FileUtil::AppendFile(filename));//讓file_指向一個名為filename的檔案,相當于建立了一個檔案
return true;
}
return false;
}
可以看到,rollFile的作用,就是建立一個新檔案,然後讓file_去指向這個新檔案,新檔案的命名方式為:basename + time + hostname + pid + ".log",在此之後所有日志消息都将寫到新檔案中。
回到LogFile的append函數中,可以看到rollFile發生在兩種情況下:1.當寫出到日志檔案的位元組數達到滾動門檻值,這個門檻值由AsyncLogging構造時指定,并用來構造LogFile;2.每到新的一天就滾動一次。
需要注意的是第2點,并不是到了新的一天的第一條日志消息就會導緻rollFile,而是每調用1024次append函數時會去檢查是否到了新的一天。可見這種方式還是有點問題的,因為可能存在到了新的一天但是沒有達到1024次調用的情況,不過如果連1024次都沒有達到,說明日志消息很少,也沒有什麼必要建立一個新的日志檔案。此外,如果每次調用append都去判斷是否是新的一天,那麼每次都需要通過gmtime、gettimeofday這類的函數去擷取時間,這樣一來可能就顯得得不償失了。(在muduo中,由于是通過gmtime來擷取時間的,是以會在0時區0時,即中原標準時間8時才算是”新的一天“)。
自動flush緩沖區
為什麼需要flush緩沖區?這是因為通過與日志檔案互動的檔案流是全緩沖的,隻有當檔案緩沖區滿或者flush時才會将緩沖區中的内容寫到檔案中。而對于日志消息這種需要頻繁寫出的情況,如果不調用flush,那麼就隻有緩沖區滿了才會将資料寫出到檔案中,如果程序突然崩潰,緩沖區中還未寫出的資料就丢失了,而如果調用flush的次數過多,無疑又會影響效率。
是以,muduo通過flushInterval變量來設定flush的間隔,預設為3s,即至少過3s才會自動flush,之是以說是”至少“,是因為判斷間隔是否達到3秒,也需要調用時間擷取函數去擷取時間,如果每一次append都來判斷一次,那麼也是得不償失的,是以,是否需要flush也是每append1024次再來進行判斷。
開啟異步日志功能
通過前文可以知道,每一條日志消息實際上都是基于Logger類實作的,是以,要想實作異步日志,就需要将日志消息成功存入”前端緩沖區“,而這一點,隻需要将Logger的g_output設定為AsyncLogging的append函數即可,如下所示:
muduo::AsyncLogging* g_asyncLog = NULL;
void asyncOutput(const char* msg, int len)
{
g_asyncLog->append(msg, len);
}
muduo::Logger::setOutput(asyncOutput);
這樣就可以将每條日志消息成功存儲前端緩沖區,接着還需要開啟後端線程,調用AsyncLogging類的start函數即可。
總結
異步日志的實作,在Logger類的基礎上,還需要AsyncLogging、LogFile、AppendFile類。
其中AppendFile類用于将緩沖區資料寫出到日志檔案;
LogFile中包含了AppendFile的執行個體,并且實作了滾動檔案和自動flush緩沖區的功能;
AsyncLogging包含了異步日志的前端和後端,前端與Logger相連接配接,通過Logger來獲得每一條日志消息并進行存儲,後端線程建立LogFile局部執行個體,從前端緩沖區中得到日志消息後通過LogFile局部執行個體将日志消息寫出到日志檔案中。