天天看點

Aliyun LOG Java Producer 快速入門背景使用步驟應用示例技術支援

背景

Aliyun LOG Java Producer

是為運作在大資料、高并發場景下的 Java 應用量身打造的高性能寫 LogHub 類庫。相對于原始的 API 或 SDK,使用 producer 寫資料能為您帶來諸多優勢,包括高性能、計算與 I/O 邏輯分離、資源可控制等。想要更深入地了解 producer 的功能和原理可參考文章

日志上雲利器 - Aliyun LOG Java Producer

,本文将聚焦于 producer 的使用上。

使用步驟

使用 producer 可歸結為如下圖所示的三個步驟,下面分别為您介紹。

Aliyun LOG Java Producer 快速入門背景使用步驟應用示例技術支援

建立 producer

Producer 的建立過程主要涉及到以下對象。

ProjectConfig

ProjectConfig 包含目标 project 的服務入口資訊以及表征調用者身份的通路憑證。

服務入口

最終通路位址會由 project 和 endpoint 拼出,關于如何确定 project 對應的 endpoint 可參考文章

通路憑證

Producer 支援使用者配置 AK 或 STS token。如果使用 STS token,需要定期建立新的 ProjectConfig 然後将其 put 到 ProjectConfigs 裡。

ProjectConfigs

如果您需要向不同的 project 中寫入資料,可以建立多個 ProjectConfig 對象然後将它們 put 到 ProjectConfigs 裡。ProjectConfigs 内部通過 map 維護不同 project 的配置,該 map 的 key 為 project,value 為該 project 對應的 Client。

ProducerConfig

ProducerConfig 用于配置發送政策,您可以根據不同的業務場景為參數指定不同的值,各參數含義如下表所示。

參數 類型 描述
totalSizeInBytes 整型 單個 producer 執行個體能緩存的日志大小上限,預設為 100MB。
maxBlockMs

如果 producer 可用空間不足,調用者在 send 方法上的最大阻塞時間,預設為 60 秒。

如果超過這個時間後所需空間仍無法得到滿足,send 方法會抛出

TimeoutException

如果将該值設為0,當所需空間無法得到滿足時,send 方法會立即抛出 TimeoutException。

如果您希望 send 方法一直阻塞直到所需空間得到滿足,可将該值設為負數。

ioThreadCount 執行日志發送任務的線程池大小,預設為可用處理器個數。
batchSizeThresholdInBytes 當一個 ProducerBatch 中緩存的日志大小大于等于 batchSizeThresholdInBytes 時,該 batch 将被發送,預設為 512 KB,最大可設定成 5MB。
batchCountThreshold 當一個 ProducerBatch 中緩存的日志條數大于等于 batchCountThreshold 時,該 batch 将被發送,預設為 4096,最大可設定成 40960。
lingerMs 一個 ProducerBatch 從建立到可發送的逗留時間,預設為 2 秒,最小可設定成 100 毫秒。
retries

如果某個 ProducerBatch 首次發送失敗,能夠對其重試的次數,預設為 10 次。

如果 retries 小于等于 0,該 ProducerBatch 首次發送失敗後将直接進入失敗隊列。

maxReservedAttempts 每個 ProducerBatch 每次被嘗試發送都對應着一個 Attempt

,此參數用來控制傳回給使用者的 attempt 個數,預設隻保留最近的 11 次 attempt 資訊。

該參數越大能讓您追溯更多的資訊,但同時也會消耗更多的記憶體。

baseRetryBackoffMs

首次重試的退避時間,預設為 100 毫秒。

Producer 采樣指數退避算法,第 N 次重試的計劃等待時間為 baseRetryBackoffMs * 2^(N-1)。

maxRetryBackoffMs 重試的最大退避時間,預設為 50 秒。
adjustShardHash 布爾 如果調用 send 方法時指定了 shardHash,該參數用于控制是否需要對其進行調整,預設為 true。
buckets

當且僅當 adjustShardHash 為 true 時,該參數才生效。此時,producer 會自動将 shardHash 重新分組,分組數量為 buckets。

如果兩條資料的 shardHash 不同,它們是無法合并到一起發送的,會降低 producer 吞吐量。将 shardHash 重新分組後,能讓資料有更多地機會被批量發送。

該參數的取值範圍是 [1, 256],且必須是 2 的整數次幂,預設為 64。

LogProducer

LogProducer 是接口 Producer 的實作類,它接收唯一的參數 producerConfig。當您準備好 producerConfig 後,可以按照下列方式建立 producer 執行個體。

Producer producer = new LogProducer(producerConfig);           

建立 producer 的同時會建立一系列線程,是一個相對昂貴的操作,是以建議一個應用共用一個 producer 執行個體。一個 producer 執行個體包含的線程如下表所示,其中 N 為該 producer 執行個體在目前程序中的編号,從 0 開始。

線程名格式 數量
aliyun-log-producer-<N>-mover 1 負責将滿足發送條件的 batch 投遞到發送線程池裡。
aliyun-log-producer-<N>-io-thread- IOThreadPool 中真正用于執行資料發送任務的線程。
aliyun-log-producer-<N>-success-batch-handler 用于處理發送成功的 batch。
aliyun-log-producer-<N>-failure-batch-handler 用于處理發送失敗的 batch。

另外,LogProducer 提供的所有方法都是線程安全的,可以在多線程環境下安全執行。

發送資料

Producer 執行個體建立好後,下一步就是使用其提供的方法發送資料。

參數介紹

Producer 接口提供多種發送方法,方法的各個參數含義如下。

能否為空
project 待發送資料的目标 project。 不能
logStore 待發送資料的目标 logStore。
logItem 待發送資料。
topic 待發送資料的 topic。

如果留白或沒有指定,該字段将被賦予""。

source 待發送資料的 source。 如果留白或沒有指定,該字段将被賦予 producer 所在主控端的 IP。
shardHash 待發送資料的 shardHash,用于将資料寫入 logStore 中的特定 shard。 如果留白或沒有指定,資料将被随機寫入目标 logStore 的某個 shard 中。
callback 用于告知調用者資料發送的結果。

另外,隻有 project、logStore、topic、source、shardHash 這 5 個屬性都相同的資料才有機會和并在一起批量發送。為了讓資料合并功能充分發揮作用,同時也為了節省記憶體,建議您控制這 5 個字段的取值範圍。如果某個字段如 topic 的取值确實非常多,建議您将其加入 logItem 而不是直接使用 topic。

擷取發送結果

由于 producer 提供的所有發送方法都是異步的,需要通過傳回的 future 或者傳入的 callback 擷取發送結果。

Future

Send 方法會傳回一個

ListenableFuture

,它除了可以像普通 future 那樣通過調用 get 方法阻塞獲得發送結果外,還允許你注冊回調方法(回調方法會在完成 future 設定後被調用)。以下代碼片段展示了 ListenableFuture 的使用方法,使用者需要為該 future 注冊一個 FutureCallback 并将其投遞到應用提供的線程池 EXECUTOR_SERVICE 中執行,完整樣例可參考

SampleProducerWithFuture.java
ListenableFuture<Result> f = producer.send("project", "logStore", logItem);
Futures.addCallback(f,
                    new FutureCallback<Result>() {
                        @Override
                        public void onSuccess(@Nullable Result result) {
                        }

                        @Override
                        public void onFailure(Throwable t) {
                        }
                    },
                    EXECUTOR_SERVICE);           

Callback

除了使用 future 外,您還可以通過在調用 send 方法時注冊 callback 擷取資料發送結果,代碼片段如下。(完整樣例可參考

SampleProducerWithCallback.java

producer.send(
        "project",
        "logStore",
        logItem,
        new Callback() {
            @Override
            public void onCompletion(Result result) {
            }
        });           

Callback 由 producer 内部線程負責執行,并且隻有在執行完畢後資料“占用”的空間才會釋放。為了不阻塞 producer 造成整體吞吐量的下降,要避免在 callback 裡執行耗時的操作。另外,在 callback 中調用 send 方法進行重試也是不建議的,您可以上調參數 retries 或者在 ListenableFuture 的callback 中進行重試。

Future vs Callback

那麼想要擷取資料的發送結果到底是選擇 future 還是 callback 呢?如果擷取結果後,應用的處理邏輯比較簡單且不會造成 producer 阻塞,建議直接使用 callback。否則,建議使用 ListenableFuture,在單獨的線程(池)中執行後續業務。

關閉 producer

當您已經沒有資料需要發送或者目前程序準備退出時,需要關閉 producer,目的是讓 producer 中緩存的資料全部得到處理。目前,producer 提供安全關閉和有限關閉兩種模式。

安全關閉

在大多數情況下,建議您使用安全關閉。安全關閉對應的方法是

close()

,它會等到 producer 中緩存的資料全部被處理、線程全部停止、注冊的 callback 全部執行,傳回 future 全部被設定後才會傳回。

雖然要等到資料全部處理完成,但 producer 被關閉後,緩存的 batch 會被立刻處理且不會被重試。是以,如果 callback 不被阻塞,close 方法往往能在很短的時間内傳回。

有限關閉

如果您的 callback 在執行過程中有可能阻塞,但您又希望 close 方法能在短時間内傳回,可以使用有限關閉。有限關閉對應的方法是

close(long timeoutMs)

,如果超過指定的 timeoutMs 後 producer 仍未完全關閉,它會抛出 IllegalStateException,這意味着緩存的資料可能還沒來得及處理就被丢棄,使用者注冊的 callback 也可能不會被執行。

應用示例

為了進一步減少學習成本,我們為您準備了

Aliyun LOG Java Producer 應用示例

。示例中包含了 producer 從建立到關閉的全部流程。

技術支援

Aliyun LOG Java Producer 快速入門背景使用步驟應用示例技術支援