天天看點

k8s與日志--journalbeat源碼解讀

前言

對于日志系統的重要性不言而喻,參照滬江的一 篇關于日志系統的介紹,基本上日志資料在以下幾方面具有非常重要的作用:

  • 資料查找:通過檢索日志資訊,定位相應的 bug ,找出解決方案
  • 服務診斷:通過對日志資訊進行統計、分析,了解伺服器的負荷和服務運作狀态
  • 資料分析:可以做進一步的資料分析,比如根據請求中的課程 id ,找出 TOP10 使用者感興趣課程

日志+大資料+AI的确有很多想象空間。

而對于收集系統,流行的技術stack有之前的elk,到現在的efk。logstash換成了filebeat。當然日志收集agent,也有flume和fluentd,尤其fluentd屬于cncf組織的産品,在k8s中有着廣泛的應用。但是fluentd是ruby寫的,不利于深入源碼了解。當然今天我們重點講的是另外一個agent--

journalbeat

。望文生義,隸屬于efk stack 中beats系列中的一員,專門用于收集journald日志。

journalbeat源碼解讀

journald日志簡介

長久以來 syslog 是每一個 Unix 系統中的重要部件。在漫長的曆史中在各種 Linux 發行版中都有不同的實作去完成類似的工作,它們采取的是邏輯相近,并使用基本相同的檔案格式。但是 syslog 也存在諸多的問題,随着新裝置的出現以及對安全的重視,這些缺點越發顯得突出,例如日志消息内容無法驗證、資料格式松散、日志檢索低效、有限的中繼資料儲存、無法記錄二進制資料等。

Journald是針對以上需求的解決方案。受udev事件啟發,Journal 條目與環境組塊相似。一個鍵值域,按照換行符分開,使用大寫的變量名。除了支援ASCII 格式的字元串外,還能夠支援二進制資料,如 ATA SMART 健康資訊、SCSI 資料。應用程式和服務可以通過将項目域傳遞給systemd journald服務來生成項目。該服務可以為項目增加一定數量的中繼資料。這些受信任域的值由 Journal 服務來決定且無法由用戶端來僞造。在Journald中,可以把日志資料導出,在異地讀取,并不受處理器架構的影響。這對嵌入式裝置是很有用的功能,友善維護人員分析裝置運作狀況。

大緻總結就是

  • journald日志是新的linux系統的具備的
  • journald差別于傳統的檔案存儲方式,是二進制存儲。需要用journalctl檢視。

docker對于journald的支援

The journald logging driver sends container logs to the systemd journal. Log entries can be retrieved using the journalctl command, through use of the journal API, or using the docker logs command.

即docker除了json等日志格式,已經增加了journald驅動。

目前本司使用場景

我們的k8s叢集,所有的docker輸出的日志格式都采用journald,這樣主機centos系統日志和docker的日志都用journalbeat來收集。

journalbeat實作關鍵

journalbeat整個實作過程,基本上兩點:

  • 與其他社群貢獻的beats系列,比如packetbeat,mysqlbeat類似,遵循了beats的架構和約定,journalbeat實作了run和stop等方法即可,然後作為一個用戶端,将收集到的資料,publish到beats中。
  • 讀取journald日志,采用了coreos開源的 go-systemd 庫中sdjournal部分。其實sdjournal是一個利用cgo 對于journald日志c接口的封裝。

源碼解讀

程式入口:

package main

import (
 "log" "github.com/elastic/beats/libbeat/beat" "github.com/mheese/journalbeat/beater"
)

func main() {
 err := beat.Run("journalbeat", "", beater.New)
 if err != nil {
 log.Fatal(err)
 }
}
           

整個journalbeat共實作了3個方法即可。run,stop,和new。

run和stop顧名思義,就是beats控制journalbeat的運作和停止。

而new:

需要按照

// Creator initializes and configures a new Beater instance used to execute // the beat its run-loop. type Creator func(*Beat, *common.Config) (Beater, error)           

實作Creator方法,傳回的Beater執行個體,交由beats控制。

具體實作:

// New creates beater func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
 config := config.DefaultConfig
 var err error
 if err = cfg.Unpack(&config); err != nil {
 return nil, fmt.Errorf("Error reading config file: %v", err)
 }

 jb := &Journalbeat{
 config: config,
 done: make(chan struct{}),
 cursorChan: make(chan string),
 pending: make(chan *eventReference),
 completed: make(chan *eventReference, config.PendingQueue.CompletedQueueSize),
 }

 if err = jb.initJournal(); err != nil {
 logp.Err("Failed to connect to the Systemd Journal: %v", err)
 return nil, err
 }

 jb.client = b.Publisher.Connect()
 return jb, nil
}           

一般的beats中,都會有一些共同屬性。例如下面的done和client屬性。

// Journalbeat is the main Journalbeat struct type Journalbeat struct {
 done chan struct{}
 config config.Config
 client publisher.Client

 journal *sdjournal.Journal

 cursorChan chan string
 pending, completed chan *eventReference
 wg sync.WaitGroup
}
           

done是一個控制整個beater啟停的信号量。

而client 是與beats平台通信的client。注意在初始化的時候,

jb.client = b.Publisher.Connect()           

建立連結。

然後在收集到資料,發送的時候,也是通過該client

select {
 case <-jb.done:
 return nil
 default:
 // we need to clone to avoid races since map is a pointer...
 jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)
 }           

注意上邊的發送姿勢和對于剛才提到的done信号量使用。

其他方法都是業務相關不再詳細解讀了。

journalbeat如何保證發送失敗的日志重新發送

關于這點,個人感覺是最優雅的部分

所有發送失敗的日志是會在程式結束之前以json格式儲存到檔案,完成持久化。

// on exit fully consume both queues and flush to disk the pending queue defer func() {
 var wg sync.WaitGroup
 wg.Add(2)

 go func() {
 defer wg.Done()
 for evRef := range jb.pending {
 pending[evRef.cursor] = evRef.body
 }
 }()

 go func() {
 defer wg.Done()
 for evRef := range jb.completed {
 completed[evRef.cursor] = evRef.body
 }
 }()
 wg.Wait()

 logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
 if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
 logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
 }
 }()           

程式啟動以後首先會讀取之前持久化的發送失敗的日志,重新發送

// load the previously saved queue of unsent events and try to publish them if any
 if err := jb.publishPending(); err != nil {
 logp.Warn("could not read the pending queue: %s", err)
 }           

client publish收集到的日志到beats,設定了publisher.Guaranteed模式,成功和失敗都有回報

jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)           

其中publisher.Signal(&eventSignal{ref, jb.completed})類似于一個回調,凡是成功的都會寫成功的ref到jb.completed中。友善用戶端控制。

維護了兩個chan,一個存放用戶端發送的日志,一個存放服務端接受成功的日志,精确對比,可擷取發送失敗的日志,進入重發動作

journalbeat struct中有下面兩個屬性

pending, completed chan *eventReference           

每次用戶端發送一條日志,都會寫到pending。

case publishedChan <- jb.client.PublishEvent(event, publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed):
 if published := <-publishedChan; published {
 jb.pending <- ref // save cursor if jb.config.WriteCursorState {
 jb.cursorChan <- rawEvent.Cursor
 }
 }
 }           

publisher.Signal(&eventSignal{ref, jb.completed}),回調會将成功的寫到completed。

整個程式同時會啟動一個

go jb.managePendingQueueLoop()

協程,專門用來定時重發失敗日志。

// managePendingQueueLoop runs the loop which manages the set of events waiting to be acked func (jb *Journalbeat) managePendingQueueLoop() {
 jb.wg.Add(1)
 defer jb.wg.Done()
 pending := map[string]common.MapStr{}
 completed := map[string]common.MapStr{}

 // diff returns the difference between this map and the other.
 diff := func(this, other map[string]common.MapStr) map[string]common.MapStr {
 result := map[string]common.MapStr{}
 for k, v := range this {
 if _, ok := other[k]; !ok {
 result[k] = v
 }
 }
 return result
 }

 // flush saves the map[string]common.MapStr to the JSON file on disk
 flush := func(source map[string]common.MapStr, dest string) error {
 tempFile, err := ioutil.TempFile(filepath.Dir(dest), fmt.Sprintf(".%s", filepath.Base(dest)))
 if err != nil {
 return err
 }

 if err = json.NewEncoder(tempFile).Encode(source); err != nil {
 _ = tempFile.Close()
 return err
 }

 _ = tempFile.Close()
 return os.Rename(tempFile.Name(), dest)
 }

 // on exit fully consume both queues and flush to disk the pending queue defer func() {
 var wg sync.WaitGroup
 wg.Add(2)

 go func() {
 defer wg.Done()
 for evRef := range jb.pending {
 pending[evRef.cursor] = evRef.body
 }
 }()

 go func() {
 defer wg.Done()
 for evRef := range jb.completed {
 completed[evRef.cursor] = evRef.body
 }
 }()
 wg.Wait()

 logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
 if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
 logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
 }
 }()

 // flush the pending queue to disk periodically
 tick := time.Tick(jb.config.PendingQueue.FlushPeriod)
 for {
 select {
 case <-jb.done:
 return case p, ok := <-jb.pending:
 if ok {
 pending[p.cursor] = p.body
 }
 case c, ok := <-jb.completed:
 if ok {
 completed[c.cursor] = c.body
 }
 case <-tick:
 result := diff(pending, completed)
 if err := flush(result, jb.config.PendingQueue.File); err != nil {
 logp.Err("error writing %s: %s", jb.config.PendingQueue.File, err)
 }
 pending = result
 completed = map[string]common.MapStr{}
 }
 }
}           

總結

當然還有一些其他的細節,不再一一講述了。比如定時寫Cursor的功能和日志格式轉換等。具體的大家可以看源碼。主要是講了我認為其優雅的部分和為beats編寫beater的要點。

本文轉自SegmentFault-

k8s與日志--journalbeat源碼解讀

繼續閱讀