在kafkaServer.scala中的start方法中,有一個這樣的調用:
這塊就是啟動了日志相關的定時任務,具體都有哪些内容?我們跟進去看一下:
可以看到,這塊主要使用了一個定時任務線程池,來處理任務的定時執行。具體包括兩塊,一部分是清理日志,另一部分是将日志寫入檔案。
首先是cleanupLogs,這塊涉及到配置,log.retention.check.interval.ms,也就是多長時間執行一次日志清理。我們看下具體的方法:
這塊還涉及到另一個配置:cleanup.policy,也就是清理的政策,目前有幾種,一種是compact,也就是日志壓縮,不會清理掉日志檔案;還有一種就是delete,也就是删除。這塊主要有兩個方法,我們分别看下:
這塊又涉及到一個配置:retention.ms,這個參數表示日志儲存的時間。如果小于0,表示永不失效,也就沒有了删除這一說。
當然,如果檔案的修改時間跟目前時間差,大于設定的日志儲存時間,就要執行删除動作了。具體的删除方法為:
這塊的邏輯是:根據傳入的predicate來判斷哪些日志符合被删除的要求,放入到deletable中,最後周遊deletable,進行删除操作。
這塊是一個異步删除檔案的過程,包含一個配置:file.delete.delay.ms。表示每隔多久删除一次日志檔案。删除的過程是先把日志的字尾改為.delete,然後定時删除。
這塊代碼比較清晰,如果日志大小大于retention.bytes,那麼就會被标記為待删除,然後調用的方法是一樣的,也是deleteOldSegments。就不贅述了。
這塊有兩個定時任務。
涉及到兩個配置:
log.flush.scheduler.interval.ms:檢查是否需要固化到硬碟的時間間隔
log.flush.offset.checkpoint.interval.ms:控制上次固化硬碟的時間點,以便于資料恢複一般不需要去修改
我們分别看下兩個任務做了啥。
這個方法的目的是把日志重新整理到硬碟中,保證資料不丢。
這塊設計到一個配置:flush.ms。當日志的重新整理時間與目前時間差,大于配置的值時,就會執行flush操作。
找到目前segment的最後一個offset,即logEndOffset,然後調用flush方法,重新整理到日志檔案中。首先判斷,目前offset是否小于recoveryPoint,也就是第一個需要重新整理到硬碟的offset,如果小于的話,直接傳回,否則繼續flush操作。
将日志中從recoveryPoint到offset的所有日志,重新整理到日志檔案中,調用segment.flush()方法上。重新整理log檔案和index檔案。
這塊主要是用于寫一些恢複點的資料到檔案中去,檔案名是recovery-point-offset-checkpoint,裡面的内容是:
第一行是目前的版本version
第二行是所有偏移量的數字和,每個topic和partition的組合的數量
之後會周遊所有的topic和partition組合,每行展示的内容是:topic partition offset
但是這塊的寫檔案不是直接向目标檔案寫入,而是先寫一個臨時檔案,然後再将臨時檔案移動到目标檔案中。
以上就是kafka中日志處理的一些源碼,我們總結一下,其中涉及到的配置項有:
log.retention.check.interval.ms
cleanup.policy
retention.ms
file.delete.delay.ms
retention.bytes
log.flush.scheduler.interval.ms
log.flush.offset.checkpoint.interval.ms
flush.ms
可能還有其他的一些配置,這塊沒有涉及到。當然,這些參數如何配置,才能使性能達到最優,也需要不斷地進行測試和探索,目前隻能依靠預設的參數來進行配置,這顯然是不夠的。