繼續閑聊日志系統,在之前的博文裡已提到我們在日志收集上的選擇是flume-ng。應用程式将日志打到各自的日志檔案或指定的檔案夾(日志檔案按天滾動),然後利用flume的agent去日志檔案中收集。
flume将一條日志抽象成一個event。這裡我們從日志檔案中收集日志采用的是定制版的SpoolDirectorySource(我們對當日日志檔案追加寫入收集提供了支援)。從日志源中将每條日志轉換成event需要Deserializer(反序列化器)。flume的每一個source對應的deserializer必須實作接口EventDeserializer,該接口定義了readEvent/readEvents方法從各種日志源讀取Event。
flume主要支援兩種反序列化器:
(1)AvroEventDeserializer:解析Avro容器檔案的反序列化器。對Avro檔案的每條記錄生成一個flume
Event,并将基于avro編碼的二進制記錄存入event body中。
(2)LineDeserializer:它是基于日志檔案的反序列化器,以“\n”行結束符将每行區分為一條日志記錄。
大部分情況下SpoolDictionarySource配合LineDeserializer工作起來都沒問題。但當日志記錄本身被分割成多行時,比如異常日志的堆棧或日志中包含“\n”換行符時,問題就來了:原先的按行界定日志記錄的方式不能滿足這種要求。形如這樣的格式:
當然你也可以對日志内容進行特殊處理,讓一條日志的所有内容以一行輸出,但這樣需要對日志架構進行定制,有時這并不受你控制。是以這裡最好的選擇是定制日志收集器。
我們先來了解一下Flume源碼中LineDeserializer的核心實作:
首先,建構一個StringBuilder,然後以字元為機關挨個讀取,如果讀取到換行符“\n”,則表示讀取本條日志結束,跳出循環;否則将該字元串追加到StringBuilder中。與此同時會給讀取的字元個數計數:如果讀取的字元個數大于預先配置的一行日志的最大字元串長度,也會跳出循環。
這裡的主要問題出在以換行符“\n”作為日志結尾的分隔符邏輯上。當我們記錄異常日志時,我們需要重新找到一種界定日志記錄結尾的方式。
考慮到我們采用[]作為日志的tag界定符,每條日志幾乎都是以“[”打頭。是以,我們采取的做法是:判斷讀取到換行符“\n”後再預讀下一位,如果下一位是“[”,則認為這是一條普通不換行的日志,此時再回退一個字元(因為剛剛預讀了一個字元,需要讓指針後退回原來的位置),然後跳出循環;而如果下一位不是“[”,則認為它是一個異常日志或者多行日志。則繼續往後讀取字元,當遇到換行符時,再次重複以上判斷。當然如果你的日志格式是以某個固定的格式打頭,首字母固定的話,才可以用這種方式,否則你很可能要配置日志的apender,使其以某個特定的符号作為日志的結尾來判斷了。另外,有時也可以基于正則來比對。
為了提升擴充性,我們提供對預讀的下一個字元進行配置,并将其命名為:newLineStartPrefix。我們建立一個反序列化類:MultiLineDeserializer。該類的大部分邏輯都跟LineDeserializer相同,主要需要重新實作上面的readLine方法,實作如下:
這裡有個小插曲,由于之前已定制了source/sink的緣故。原以為deserializer也可以用同樣的方式進行定制。并在agent的deserializer配置中指定定制過的deserializer的完全限定名。但經過驗證後發現,這條路走不通,會報錯(貌似從flume官網上也找不到對deserializer定制的介紹)。是以,隻能在源碼上進行擴充,然後編譯源碼,重新生成jar。
從源碼裡你會發現為什麼在第三方包内擴充deserializer是行不通的。從github上clone下源碼,進入flume-ng-core
module的如下類:org.apache.flume.serialization.EventDeserializerType,你就會一目了然:
你必須顯式在這裡定義deserializer的枚舉,然後指定其builder的Class執行個體,并在agent裡的deserializer配置項中填寫你這裡的枚舉名稱才行。我們隻需在子package:serialization中建立MultiLineDeserializer類,然後重新實作邏輯、編譯、打包flume-ng-core
Module生成新的jar即可。flume将其源碼中的每個Module生成的jar都放在其二進制包的lib檔案夾下。你隻需将重新打包好的flume-ng-core
jar替換原來的,重新開機agent即可看到效果。
這裡還有個需要注意的地方:LineDeserializer有一個參數(maxLineLength)用于定義一個日志行的最長字元數。如果某條日志超過這個長度,将不再讀取。而一條日志占據多行情況下,該值需要适當增大,因為像異常日志的堆棧長度明顯比普通日志長不少,這裡你可以設定為8192。
原文釋出時間為:2015-06-22
本文作者:vinoYang