天天看點

日志上雲利器 - Aliyun LOG Java Producer背景功能特點功能優勢快速入門原理剖析總結技術支援

背景

日志無處不在,它作為記錄世間萬物變化的載體,在運維、研發、營運、安全、BI、審計等領域有着廣泛的應用場景。

日志上雲利器 - Aliyun LOG Java Producer背景功能特點功能優勢快速入門原理剖析總結技術支援
阿裡雲日志服務

是日志類資料的一站式服務平台,其核心元件 LogHub 憑借着高吞吐、低延遲、可自動伸縮等特性,逐漸成為大資料處理領域特别是實時資料處理場景下的基礎設施。那些運作在

Flink

Spark Storm

等大資料計算引擎中的任務往往會将資料處理結果或中間結果實時寫入 LogHub,下遊系統基于 LogHub 中的資料提供查詢分析、監控告警、機器學習、疊代計算等能力。下圖展示了面向 LogHub 的大資料處理系統架構圖。

日志上雲利器 - Aliyun LOG Java Producer背景功能特點功能優勢快速入門原理剖析總結技術支援

要讓整個系統穩定地運作,提供便捷高效的資料寫入手段是前提。直接使用 API 或 SDK 往往無法滿足大資料場景下對資料寫入能力的要求,在這樣的背景下

Aliyun LOG Java Producer

應運而生。

功能特點

Aliyun LOG Java Producer 是一個易于使用且高度可配置的 Java 類庫,它有如下功能特點:

  1. 線程安全 - producer 接口暴露的所有方法都是線程安全的。
  2. 異步發送 - 調用 producer 的發送接口通常能夠立即傳回。Producer 内部會緩存并合并發送資料,然後批量發送以提高吞吐量。
  3. 自動重試 - 對可重試的異常,producer 會根據使用者配置的最大重試次數和重試退避時間進行重試。
  4. 行為追溯 - 使用者通過 callback 或 future 不僅能擷取目前資料是否發送成功的資訊,還可以獲得該資料每次被嘗試發送的資訊,有利于問題追溯和行為決策。
  5. 上下文還原 - 同一個 producer 執行個體産生的日志在同一上下文中,在服務端可以檢視某條日志前後相關的日志。
  6. 優雅關閉 - 保證 close 方法退時,producer 緩存的所有資料都能被處理,使用者也能得到相應的通知。

功能優勢

使用 producer 相對于直接通過 API 或 SDK 向 LogHub 寫資料會有如下優勢。

高性能

在海量資料、資源有限的前提下,寫入端要達到目标吞吐量需要實作複雜的控制邏輯,包括多線程、緩存政策、批量發送等,另外還要充分考慮失敗重試的場景。Producer 實作了上述功能,在為您帶來性能優勢的同時簡化了程式開發步驟。

異步非阻塞

在可用記憶體充足的前提下,producer 會對發往 LogHub 的資料進行緩存,是以使用者調用 send 方法時能夠立即傳回,不會阻塞,達到計算與 I/O 邏輯分離的目的。稍後,使用者可以通過傳回的 future 對象或傳入的 callback 獲得資料發送的結果。

資源可控制

可以通過參數控制 producer 用于緩存待發送資料的記憶體大小,同時還可以配置用于執行資料發送任務的線程數量。這樣做一方面避免了 producer 無限制地消耗資源,另一方面可以讓您根據實際情況平衡資源消耗和寫入吞吐量。

小結

綜上所述,producer 在給您帶來諸多優勢的同時隻暴露了簡單的接口,為您屏蔽了複雜的底層細節;另外,您也無須擔心它會影響到上層業務的正常運作,大大降低了資料接入門檻。

快速入門

Producer 的實作比較複雜,但使用起來卻非常簡單。想了解 producer 的正确打開方式請參考文章

Aliyun LOG Java Producer 快速入門

原理剖析

為了讓您更好地了解 producer 的表現行為,本章将帶您探究它的實作原理,包括資料寫入邏輯、核心元件的實作方式以及如何優雅地關閉 producer 中的各個元件。Producer 的整體架構如下圖所示。

日志上雲利器 - Aliyun LOG Java Producer背景功能特點功能優勢快速入門原理剖析總結技術支援

資料寫入

Producer 的資料寫入邏輯如下:

  1. 使用者調用

    producer.send()

    方法發送資料,資料會被加入到 LogAccumulator 中的某個 ProducerBatch 裡。通常情況下 send 方法會立即傳回,但如果該 producer 執行個體沒有足夠空間容納目前資料,此方法會被阻塞直到下列任意一個條件被滿足。
    1. 之前緩存的資料被 BatchHandler 處理完成後,占用的記憶體被“釋放”,producer 有足夠空間容納目前資料。
    2. 到達使用者指定的最長阻塞時間,此時會抛出異常。
  2. 在調用 send 方法過程中,如果發現目标 ProducerBatch 包含的日志條數到達了 maxBatchCount 或該 ProducerBatch 剩餘的空間無法容納目前資料,則會首先将該 ProducerBatch 投遞到 IOThreadPool 裡,然後再建立一個 ProducerBatch 存放目前資料。為了不阻塞使用者線程,IOThreadPool 選用無界阻塞隊列,因為單個 Producer 執行個體能緩存的日志總大小是有限的,該隊列長度不會無限增長。
  3. Mover 會周遊 LogAccumulator 中的每個 ProducerBatch,把超過了緩存時間的 batch 加入 expiredBatches 裡。同時會記錄未過期 batch 的最近逾時時間,記為 t。
  4. 将從 LogAccumulator 中擷取的 expiredBatches 投遞到 IOThreadPool 裡。
  5. 擷取 RetryQueue 中所有滿足發送條件的 ProducerBatch,如果目前沒有 batch 滿足發送條件則最多等待時間 t。
  6. 将從 RetryQueue 中擷取的 expiredBatches 投遞到 IOThreadPool 裡。(Mover 完成步驟 6 後會再次進入步驟 3)。
  7. IOThreadPool 中的工作線程從阻塞隊列裡或取 ProducerBatch,然後發送給目标 logStore。
  8. 如果資料發送成功,會将該 ProducerBatch 寫入成功隊列。
  9. 如果資料發送失敗,且滿足下列任意一個條件,會将該 ProducerBatch 寫入失敗隊列。
    1. 該錯誤無法重試。
    2. RetryQueue 被關閉。
    3. 達到了指定的重試次數且失敗隊列中的 batch 數不超過待發送 batch 總數的二分之一。
  10. 否則,計算目前 ProducerBatch 的下次計劃發送時間然後将其放入 RetryQueue 中。
  11. 線程 SuccessBatchHandler 從成功隊列裡擷取 batch,執行和該 batch 綁定的所有回調。
  12. 線程 FailureBatchHandler 從失敗隊列裡擷取 batch,執行和該 batch 綁定的所有回調。

核心元件

Producer 包含

LogAccumulator RetryQueue Mover IOThreadPool SendProducerBatchTask BatchHandler

等核心元件。

為了提高吞吐量,一個常見的做法是将若幹個小包合并成大包批量發送,本小節介紹的 LogAccumulator 的主要作用便是合并待發送的資料。由于服務端要求具有相同 project、logstore、topic、source、shardHash 的資料才能組裝成一個大包,LogAccumulator 會根據資料的這些屬性将其緩存到内部 map 的不同位置。這個 map 的 key 為上述五元組,value 為 ProducerBatch。為了保證線程安全同時支援高并發,這裡選用 ConcurrentMap 作為 map 的實作。

LogAccumulator 的另一個作用是控制緩存資料的總大小,這裡選用 Semaphore 實作控制邏輯。Semaphore 是基于 AQS 實作的高性能同步工具,它會首先嘗試通過自旋的方式擷取共享資源,減少線程上下文切換的開銷。

RetryQueue 用于存放發送失敗待重試的 ProducerBatch,每個 batch 有一個字段用于辨別下次計劃發送時間。為了高效地擷取逾時 batch,内部選用 DelayQueue 存放這些 batch。DelayQueue 是一種按時間排序的優先隊列,最先逾時的 batch 會被優先取出,同時它也是線程安全的。

Mover 是一個獨立的線程,它會循環地将 LogAccumulator 和 RetryQueue 中的逾時 batch 投遞到 IOThreadPool 裡。為了避免空轉占用寶貴的 CPU 資源,當 Mover 發現 LogAccumulator 和 RetryQueue 裡沒有滿足發送條件的 batch 時,會在 RetryQueue 的 expiredBatches 方法上等待使用者配置的資料最長緩存時間 lingerMs。

IOThreadPool 中的工作線程用于真正執行資料發送任務,該線程池的大小可通過參數 ioThreadCount 指定,預設為可用處理器個數乘以 2。

SendProducerBatchTask 封裝了 batch 發送邏輯。為了避免阻塞 IO 線程,不論目前 batch 發送成功與否都會将其投遞到隊列中交由獨立線程去執行回調。另外,如果某個發送失敗的 batch 滿足重試條件,不會在目前 IO 線程中立即重試(立即重試通常也會失敗),而是根據指數退避政策将其投遞到 RetryQueue 中。

Producer 會啟動一個 SuccessBatchHandler 和一個 FailureBatchHandler 分别用來處理發送成功或失敗的 batch。Handler 在執行完 batch 的 callback、設定好 batch 的 future 後便會“釋放”該 batch 占用的記憶體空間,供新的資料使用。分開處理的原因是為了隔離發送成功和發送失敗的 batch,保持 producer 整體的流動性。

優雅關閉

要實作優雅關閉,需要做到以下幾點:

  1. Close 方法在期望時間内傳回時,producer 中的所有線程都應停止,緩存的資料都應得到處理,使用者注冊的 callback 都應被執行,傳回給使用者的 future 都應被設定。
  2. 支援使用者設定 close 方法的最長等待時間,超過這個時間不論線程是否停止,緩存的資料是否完全處理,該方法都應立即傳回。
  3. Close 方法支援被調用多次,在多線程環境下也能按預期工作。
  4. 在 callback 裡調用 close 方法是安全的,不會造成程式死鎖。

為了達到上述目标,producer 的關閉邏輯設計如下:

  1. 關閉 LogAccumulator,這時繼續往 LogAccumulator 中寫資料會抛異常。
  2. 關閉 RetryQueue,這時繼續往 RetryQueue 中投遞 batch 會抛異常。
  3. 關閉 Mover 并等待其完全退出。Mover 檢測到關閉信号後會把 LogAccumulator 和 RetryQueue 中剩餘的 batch 全部取出并投遞到 IOThreadPool 中,不論它們是否滿足發送條件。為了防止資料丢失,Mover 會不斷從 LogAccumulator 和 RetryQueue 中擷取 batch 直到沒有其他線程正在寫入。
  4. 關閉 IOThreadPool 并等待已送出的任務全部執行完畢。這時由于 RetryQueue 已經關閉,發送失敗的 batch 會被直接投遞到失敗隊列中。
  5. 關閉 SuccessBatchHandler 并等待其完全退出(如果檢測到在 callback 裡調用 close 方法會跳過等待過程)。SuccessBatchHandler 檢測到關閉信号後會把成功隊列中的 batch 全部取出并依次處理。
  6. 關閉 FailureBatchHandler 并等待其完全退出(如果檢測到在 callback 裡調用 close 方法會跳過等待過程)。FailureBatchHandler 檢測到關閉信号後會把失敗隊列中的 batch 全部取出并依次處理。

可以看到,這裡按照資料流動方向依次關閉隊列和線程來達到優雅關閉、安全退出的目的。

總結

Aliyun LOG Java Producer 是對老版 producer 的全面更新,解決了上一版存在的多個問題,包括網絡異常情況下 CPU 占用率過高、關閉 producer 可能出現少量資料丢失等問題。另外,在容錯方面也進行了加強,就算使用者使用不合理,在資源、吞吐、隔離上都有較好的保證。

技術支援

日志上雲利器 - Aliyun LOG Java Producer背景功能特點功能優勢快速入門原理剖析總結技術支援