天天看點

【Kafka源碼】日志處理一、入口方法二、定時任務總方法三、總結

在kafkaServer.scala中的start方法中,有一個這樣的調用:

這塊就是啟動了日志相關的定時任務,具體都有哪些内容?我們跟進去看一下:

可以看到,這塊主要使用了一個定時任務線程池,來處理任務的定時執行。具體包括兩塊,一部分是清理日志,另一部分是将日志寫入檔案。

首先是cleanupLogs,這塊涉及到配置,log.retention.check.interval.ms,也就是多長時間執行一次日志清理。我們看下具體的方法:

這塊還涉及到另一個配置:cleanup.policy,也就是清理的政策,目前有幾種,一種是compact,也就是日志壓縮,不會清理掉日志檔案;還有一種就是delete,也就是删除。這塊主要有兩個方法,我們分别看下:

這塊又涉及到一個配置:retention.ms,這個參數表示日志儲存的時間。如果小于0,表示永不失效,也就沒有了删除這一說。

當然,如果檔案的修改時間跟目前時間差,大于設定的日志儲存時間,就要執行删除動作了。具體的删除方法為:

這塊的邏輯是:根據傳入的predicate來判斷哪些日志符合被删除的要求,放入到deletable中,最後周遊deletable,進行删除操作。

這塊是一個異步删除檔案的過程,包含一個配置:file.delete.delay.ms。表示每隔多久删除一次日志檔案。删除的過程是先把日志的字尾改為.delete,然後定時删除。

這塊代碼比較清晰,如果日志大小大于retention.bytes,那麼就會被标記為待删除,然後調用的方法是一樣的,也是deleteOldSegments。就不贅述了。

這塊有兩個定時任務。

涉及到兩個配置:

log.flush.scheduler.interval.ms:檢查是否需要固化到硬碟的時間間隔

log.flush.offset.checkpoint.interval.ms:控制上次固化硬碟的時間點,以便于資料恢複一般不需要去修改

我們分别看下兩個任務做了啥。

這個方法的目的是把日志重新整理到硬碟中,保證資料不丢。

這塊設計到一個配置:flush.ms。當日志的重新整理時間與目前時間差,大于配置的值時,就會執行flush操作。

找到目前segment的最後一個offset,即logEndOffset,然後調用flush方法,重新整理到日志檔案中。首先判斷,目前offset是否小于recoveryPoint,也就是第一個需要重新整理到硬碟的offset,如果小于的話,直接傳回,否則繼續flush操作。

将日志中從recoveryPoint到offset的所有日志,重新整理到日志檔案中,調用segment.flush()方法上。重新整理log檔案和index檔案。

這塊主要是用于寫一些恢複點的資料到檔案中去,檔案名是recovery-point-offset-checkpoint,裡面的内容是:

第一行是目前的版本version

第二行是所有偏移量的數字和,每個topic和partition的組合的數量

之後會周遊所有的topic和partition組合,每行展示的内容是:topic partition offset

但是這塊的寫檔案不是直接向目标檔案寫入,而是先寫一個臨時檔案,然後再将臨時檔案移動到目标檔案中。

以上就是kafka中日志處理的一些源碼,我們總結一下,其中涉及到的配置項有:

log.retention.check.interval.ms

cleanup.policy

retention.ms

file.delete.delay.ms

retention.bytes

log.flush.scheduler.interval.ms

log.flush.offset.checkpoint.interval.ms

flush.ms

可能還有其他的一些配置,這塊沒有涉及到。當然,這些參數如何配置,才能使性能達到最優,也需要不斷地進行測試和探索,目前隻能依靠預設的參數來進行配置,這顯然是不夠的。