分析痛點
筆者線上有一個 Flink 任務消費 Kafka 資料,将資料轉換後,在 Flink 的 Sink 算子内部調用第三方 api 将資料上報到第三方的資料分析平台。這裡使用批量同步 api,即:每 50 條資料請求一次第三方接口,可以通過批量 api 來提高請求效率。由于調用的外網接口,是以每次調用 api 比較耗時。假如批次大小為 50,且請求接口的平均響應時間為 50ms,使用同步 api,是以第一次請求響應以後才會發起第二次請求。請求示意圖如下所示:
平均下來,每 50 ms 向第三方伺服器發送 50 條資料,也就是每個并行度 1 秒鐘處理 1000 條資料。假設目前業務資料量為每秒 10 萬條資料,那麼 Flink Sink 算子的并行度需要設定為 100 才能正常處理線上資料。從 Flink 資源配置設定來講,100 個并行度需要申請 100 顆 CPU,是以目前 Flink 任務需要占用叢集中 100 顆 CPU 以及不少的記憶體資源。請問此時 Flink Sink 算子的 CPU 或者記憶體壓力大嗎?
上述請求示意圖可以看出 Flink 任務送出請求到響應這 50ms 期間,Flink Sink 算子隻是在 wait,并沒有實質性的工作。是以,CPU 使用率肯定很低,目前任務的瓶頸明顯在網絡 IO。最後結論是 Flink 任務申請了 100 顆 CPU,導緻 yarn 或其他資源排程架構沒有資源了,但是這 100 顆 CPU 的使用率并不高,這裡能不能優化通過提高 CPU 的使用率,進而少申請一些 CPU 呢?
同步批量請求優化為異步請求
首先可以想到的是将同步請求改為異步請求,使得任務不會阻塞在網絡請求這一環節,請求示意圖如下所示。
異步請求相比同步請求而言,優化點在于每次送出請求時,不需要等待請求響應後再發送下一次請求,而是當下一批次的 50 條資料準備好之後,直接向第三方伺服器發送請求。每次發送請求後,Flink Sink 算子的用戶端需要注冊監聽器來等待響應,當響應失敗時需要做重試或者復原政策。
通過異步請求的方式,可以優化網絡瓶頸,假如 Flink Sink 算子的單個并行度平均 10ms 接收到 50 條資料,那麼使用異步 api 的方式平均 1 秒可以處理 5000 條資料,整個 Flink 任務的性能提高了 5 倍。對于每秒 10 萬資料量的業務,這裡僅需要申請 20 顆 CPU 資源即可。關于異步 api 的具體使用,可以根據場景具體設計,這裡不詳細讨論。
多線程 Client 模式
對于一些不支援異步 api 的場景,可能并不能使用上述優化方案,同樣,為了提高 CPU 使用率,可以在 Flink Sink 端使用多線程的方案。如下圖所示,可以在 Flink Sink 端開啟 5 個請求第三方伺服器的 Client 線程:Client1、Client2、Client3、Client4、Client5。
這五個線程内分别使用同步批量請求的 Client,單個 Client 還是保持 50 條記錄為一個批次,即 50 條記錄請求一次第三方 api。請求第三方 api 耗時主要在于網絡 IO(性能瓶頸在于網絡請求延遲),是以如果變成 5 個 Client 線程,每個 Client 的單次請求平均耗時還能保持在 50ms,除非網絡請求已經達到了帶寬上限或整個任務又遇到其他瓶頸。是以,多線程模式下使用同步批量 api 也能将請求效率提升 5 倍。
說明:多線程的方案,不僅限于請求第三方接口,對于非 CPU 密集型的任務也可以使用該方案,在降低 CPU 數量的同時,單個 CPU 承擔多個線程的工作,進而提高 CPU 使用率。例如:請求 HBase 的任務或磁盤 IO 是瓶頸的任務,可以降低任務的并行度,使得每個并行度内處理多個線程。
Flink 算子内多線程實作
Sink 算子的單個并行度内現在有 5 個 Client 用于消費資料,但 Sink 算子的資料都來自于上遊算子。如下圖所示,一個簡單的實作方式是 Sink 算子接收到上遊資料後通過輪循或随機的政策将資料分發給 5 個 Client 線程。
但是輪循或者随機政策會存在問題,假如 5 個 Client 中 Client3 線程消費較慢,會導緻給 Client3 分發資料時被阻塞,進而使得其他正常消費的線程 Client1、2、4、5 也被分發不到資料。
為了解決上述問題,可以在 Sink 算子内申請一個資料緩沖隊列,隊列有先進先出(FIFO)的特性。Sink 算子接收到的資料直接插入到隊列尾部,五個 Client 線程不斷地從隊首取資料并消費,即:Sink 算子先接收的資料 Client 先消費,後接收的資料 Client 後消費。
- 若隊列一直是滿的,說明 Client 線程消費較慢、Sink 算子上遊生産資料較快。
- 若隊列一直為空,說明 Client 線程消費較快、Sink 算子的上遊生産資料較慢。
五個線程共用同一個隊列完美地解決了單個線程消費慢的問題,當 Client3 線程阻塞時,不影響其他線程從隊列中消費資料。這裡使用隊列還起到了削峰填谷的作用。
代碼實作
原理明白了,具體代碼如下所示,首先是消費資料的 Client 線程代碼,代碼邏輯很簡單,一直從 bufferQueue 中 poll 資料,取出資料後,執行相應的消費邏輯即可,在本案例中消費邏輯便是 Client 積攢批次并調用第三方 api。
public class MultiThreadConsumerClient implements Runnable {
private LinkedBlockingQueue<String> bufferQueue;
public MultiThreadConsumerClient(LinkedBlockingQueue<String> bufferQueue) {
this.bufferQueue = bufferQueue;
}
@Override
public void run() {
String entity;
while (true){
// 從 bufferQueue 的隊首消費資料
entity = bufferQueue.poll();
// 執行 client 消費資料的邏輯
doSomething(entity);
}
}
// client 消費資料的邏輯
private void doSomething(String entity) {
// client 積攢批次并調用第三方 api
}
}
Sink 算子代碼如下所示,在 open 方法中需要初始化線程池、資料緩沖隊列并建立開啟消費者線程,在 invoke 方法中隻需要往 bufferQueue 的隊尾添加資料即可。
public class MultiThreadConsumerSink extends RichSinkFunction<String> {
// Client 線程的預設數量
private final int DEFAULT_CLIENT_THREAD_NUM = 5;
// 資料緩沖隊列的預設容量
private final int DEFAULT_QUEUE_CAPACITY = 5000;
private LinkedBlockingQueue<String> bufferQueue;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// new 一個容量為 DEFAULT_CLIENT_THREAD_NUM 的線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,
0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
// new 一個容量為 DEFAULT_QUEUE_CAPACITY 的資料緩沖隊列
this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);
// 建立并開啟消費者線程
MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue);
for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {
threadPoolExecutor.execute(consumerClient);
}
}
@Override
public void invoke(String value, Context context) throws Exception {
// 往 bufferQueue 的隊尾添加資料
bufferQueue.put(value);
}
}
代碼邏輯相對比較簡單,請問上述 Sink 能保證 Exactly Once 嗎?
答:不能保證 Exactly Once,Flink 要想端對端保證 Exactly Once,必須要求外部元件支援事務,這裡第三方接口明顯不支援事務。
那麼上述 Sink 能保證 At Lease Once 嗎?言外之意,上述 Sink 會丢資料嗎?
答:會丢資料。因為上述案例中使用的批量 api 來消費資料,假如批量 api 是每積攢 50 條資料請求一次第三方接口,當做 Checkpoint 時可能隻積攢了 30 條資料,是以做 Checkpoint 時記憶體中可能還有資料未發送到外部系統。而且資料緩沖隊列中可能還有緩存的資料,是以上述 Sink 在做 Checkpoint 時會出現 Checkpoint 之前的資料未完全消費的情況。
例如,Flink 任務消費的 Kafka 資料,當做 Checkpoint 時,Flink 任務消費到 offset 為 10000 的位置,但實際上 offset 10000 之前的一小部分資料可能還在資料緩沖隊列中尚未完全消費,或者因為沒積攢夠一定批次是以資料緩存在 client 中,并未請求到第三方。當任務失敗後,Flink 任務從 Checkpoint 處恢複,會從 offset 為 10000 的位置開始消費,此時 offset 10000 之前的一小部分緩存在記憶體緩沖隊列中的資料不會再被消費,于是就出現了丢資料情況。
處理丢資料情況
如何保證資料不丢失呢?很簡單,可以在 Checkpoint 時強制将資料緩沖區的資料全部消費完,并對 client 執行 flush 操作,保證 client 端不會緩存資料。
實作思路:Sink 算子可以實作 CheckpointedFunction 接口,當做 Checkpoint 時,會調用 snapshotState 方法,方法内可以觸發 client 的 flush 操作。但 client 在 MultiThreadConsumerClient 對應的五個線程中,需要考慮線程同步的問題,即:Sink 算子的 snapshotState 方法中做一個操作,要使得五個 Client 線程感覺到目前正在執行 Checkpoint,此時應該把資料緩沖區的資料全部消費完,并對 client 執行過 flush 操作。
如何實作呢?需要借助 CyclicBarrier。CyclicBarrier 會讓所有線程都等待某個操作完成後才會繼續下一步行動。在這裡可以使用 CyclicBarrier,讓 Checkpoint 等待所有的 client 将資料緩沖區的資料全部消費完并對 client 執行過 flush 操作,言外之意,offset 10000 之前的資料必須全部消費完成才允許 Checkpoint 執行完成。這樣就可以保證 Checkpoint 時不會有資料被緩存在記憶體,可以保證資料源 offset 10000 之前的資料都消費完成。
MultiThreadConsumerSink 具體代碼如下所示:
public class MultiThreadConsumerSink extends RichSinkFunction<String> {
// Client 線程的預設數量
private final int DEFAULT_CLIENT_THREAD_NUM = 5;
// 資料緩沖隊列的預設容量
private final int DEFAULT_QUEUE_CAPACITY = 5000;
private LinkedBlockingQueue<String> bufferQueue;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// new 一個容量為 DEFAULT_CLIENT_THREAD_NUM 的線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,
0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
// new 一個容量為 DEFAULT_QUEUE_CAPACITY 的資料緩沖隊列
this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);
// 建立并開啟消費者線程
MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue);
for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {
threadPoolExecutor.execute(consumerClient);
}
}
@Override
public void invoke(String value, Context context) throws Exception {
// 往 bufferQueue 的隊尾添加資料
bufferQueue.put(value);
}
}
MultiThreadConsumerSink 實作了 CheckpointedFunction 接口,在 open 方法中增加了 CyclicBarrier 的初始化,CyclicBarrier 預期容量設定為 client 線程數加一,表示當 client 線程數加一個線程都執行了 await 操作時,所有的線程的 await 方法才會執行完成。這裡為什麼要加一呢?因為除了 client 線程外, snapshotState 方法中也需要執行過 await。
當做 Checkpoint 時 snapshotState 方法中執行 clientBarrier.await(),等待所有的 client 線程将緩沖區資料消費完。snapshotState 方法執行過程中 invoke 方法不會被執行,即:Checkpoint 過程中資料緩沖隊列不會增加資料,是以 client 線程很快就可以将緩沖隊列中的資料消費完。
MultiThreadConsumerClient 具體代碼如下所示:
public class MultiThreadConsumerSink extends RichSinkFunction<String> implements CheckpointedFunction {
private Logger LOG = LoggerFactory.getLogger(MultiThreadConsumerSink.class);
// Client 線程的預設數量
private final int DEFAULT_CLIENT_THREAD_NUM = 5;
// 資料緩沖隊列的預設容量
private final int DEFAULT_QUEUE_CAPACITY = 5000;
private LinkedBlockingQueue<String> bufferQueue;
private CyclicBarrier clientBarrier;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// new 一個容量為 DEFAULT_CLIENT_THREAD_NUM 的線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,
0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
// new 一個容量為 DEFAULT_QUEUE_CAPACITY 的資料緩沖隊列
this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);
// barrier 需要攔截 (DEFAULT_CLIENT_THREAD_NUM + 1) 個線程
this.clientBarrier = new CyclicBarrier(DEFAULT_CLIENT_THREAD_NUM + 1);
// 建立并開啟消費者線程
MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue, clientBarrier);
for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {
threadPoolExecutor.execute(consumerClient);
}
}
@Override
public void invoke(String value, Context context) throws Exception {
// 往 bufferQueue 的隊尾添加資料
bufferQueue.put(value);
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
LOG.info("snapshotState : 所有的 client 準備 flush !!!");
// barrier 開始等待
clientBarrier.await();
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
}
}
從資料緩沖隊列中 poll 資料時,增加了 timeout 時間為 50ms。如果從隊列中拿到資料,則執行消費資料的邏輯,若拿不到資料說明資料緩沖隊列中資料消費完了。此時需要判斷是否有等待的 CyclicBarrier,如果有等待的 CyclicBarrier 說明此時正在執行 Checkpoint,是以 client 需要執行 flush 操作。flush 完成後,Client 線程執行 barrier.await() 操作。當所有的 Client 線程都執行到 await 時,所有的 barrier.await() 都會被執行完。此時 Sink 算子的 snapshotState 方法就會執行完。通過這種政策可以保證 Checkpoint 時将資料緩沖區中的資料消費完,client 執行 flush 操作可以保證 client 端不會緩存資料。
總結
分析到這裡,我們設計的 Sink 終于可以保證不丢失資料了。對 CyclicBarrier 不了解的同學請 Google 或百度查詢。再次強調這裡多線程的方案,不僅限于請求第三方接口,對于非 CPU 密集型的任務都可以使用該方案來提高 CPU 使用率,且該方案不僅限于 Sink 算子,各種算子都适用。本文主要希望幫助大家了解 Flink 中使用多線程的優化及在 Flink 算子中使用多線程如何保證不丢資料。