天天看點

Flink 實時寫入資料到 ElasticSearch 性能調優

作者:張劉毅

背景說明

線上業務反應使用 Flink 消費上遊 kafka topic 裡的軌迹資料出現 backpressure,資料積壓嚴重。單次 bulk 的寫入量為:3000/50mb/30s,并行度為 48。針對該問題,為了避免影響線上業務申請了一個與線上叢集配置相同的 ES 叢集。本着複現問題進行優化就能解決的思路進行調優測試。

測試環境

  • Elasticsearch 2.3.3
  • Flink 1.6.3
  • flink-connector-elasticsearch 2_2.11
  • 八台 SSD,56 核 :3 主 5 從

Rally 分布式壓測 ES 叢集

Flink 實時寫入資料到 ElasticSearch 性能調優

從壓測結果來看,叢集層面的平均寫入性能大概在每秒 10 w+ 的 doc。

Flink 寫入測試

配置檔案

config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest"));
config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000"));
config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50"));
config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));           

執行代碼片段

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 initEnv(env);
 Properties properties = ConfigUtil.getProperties(CONFIG_FILE_PATH);
 //從kafka中擷取軌迹資料
 FlinkKafkaConsumer010<String> flinkKafkaConsumer010 =
     new FlinkKafkaConsumer010<>(properties.getProperty("topic.name"), new SimpleStringSchema(), properties);
 //從checkpoint最新處消費
 flinkKafkaConsumer010.setStartFromLatest();
 DataStreamSource<String> streamSource = env.addSource(flinkKafkaConsumer010);
10//Sink2ES
streamSource.map(s -> JSONObject.parseObject(s, Trajectory.class))
    .addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())).name("esSink");
env.execute("flinktest");           

運作時配置

Flink 實時寫入資料到 ElasticSearch 性能調優

任務容器數為 24 個 container,一共 48 個并發。savepoint 為 15 分鐘:

Flink 實時寫入資料到 ElasticSearch 性能調優
  • 運作現象

(1)source 和 Map 算子均出現較高的反壓

Flink 實時寫入資料到 ElasticSearch 性能調優

(2)ES 叢集層面,目标索引寫入速度寫入陡降

平均 QPS 為:12 k 左右。

(3)對比取消 sink 算子後的 QPS

streamSource.map(s -> JSONObject.parseObject(s, FurionContext.class)).name("withnosink");           
Flink 實時寫入資料到 ElasticSearch 性能調優

平均QPS為:116 k 左右。

有無sink參照實驗的結論:

取消 sink 2 ES 的操作後,QPS 達到 110 k,是之前 QPS 的十倍。由此可以基本判定: ES 叢集寫性能導緻的上遊反壓

優化方向

索引字段類型調整

Flink 實時寫入資料到 ElasticSearch 性能調優

bulk 失敗的原因是由于叢集 dynamic mapping 自動監測,部分字段格式被識别為日期格式而遇到空字元串無法解析報錯。

解決方案:關閉索引自動檢測。

Flink 實時寫入資料到 ElasticSearch 性能調優

效果: ES 叢集寫入性能明顯提高但 Flink operator 依然存在反壓:

Flink 實時寫入資料到 ElasticSearch 性能調優

降低副本數

curl -XPUT{叢集位址}/{索引名稱}/_settings?timeout=3m -H "Content-Type: application/json" -d'{"number_of_replicas":"0"}'           

提高 refresh_interval

針對這種 ToB、日志型、實時性要求不高的場景,我們不需要查詢的實時性,通過加大甚至關閉 refresh_interval 的參數提高寫入性能。

curl -XPUT{叢集位址}/{索引名稱}/_settings?timeout=3m -H "Content-Type: application/json" -d '{ "settings": {  "index": {"refresh_interval" : -1   }   }  }'           

檢查叢集各個節點 CPU 核數

在 Flink 執行時,通過 Grafana 觀測各個節點 CPU 使用率以及通過 Linux 指令檢視各個節點 CPU 核數。發現 CPU 使用率高的節點 CPU 核數比其餘節點少。為了排除這個短闆效應,我們将在這個節點中的索引 shard 移動到 CPU 核數多的節點。

curl -XPOST {叢集位址}/_cluster/reroute  -d'{"commands":[{"move":{"index":"{索引名稱}","shard":5,"from_node":"源node名稱","to_node":"目标node名稱"}}]}' -H "Content-Type:application/json"           

以上優化的效果:

Flink 實時寫入資料到 ElasticSearch 性能調優

經過以上的優化,我們發現寫入性能提升有限。是以,需要深入檢視寫入的瓶頸點。

在 CPU 使用率高的節點使用 Arthas 觀察線程

Flink 實時寫入資料到 ElasticSearch 性能調優

列印阻塞的線程堆棧

"elasticsearch[ES-077-079][bulk][T#3]" Id=247 WAITING on java.util.concurrent.LinkedTransferQueue@369223fa
   at sun.misc.Unsafe.park(Native Method)
     -  waiting on java.util.concurrent.LinkedTransferQueue@369223fa
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:737)
    at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
    at java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1269)
    at org.elasticsearch.common.util.concurrent.SizeBlockingQueue.take(SizeBlockingQueue.java:161)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)           

從上面的線程堆棧我們可以看出線程處于等待狀态。

關于這個問題的讨論詳情檢視

https://discuss.elastic.co/t/thread-selection-and-locking/26051/3

,這個 issue 讨論大緻意思是:節點數不夠,需要增加節點。于是我們又增加節點并通過設定索引級别的 total_shards_per_node 參數将索引 shard 的寫入平均到各個節點上。

線程隊列優化

ES 是将不同種類的操作(index、search…)交由不同的線程池執行,主要的線程池有三:index、search 和 bulk thread_pool。線程池隊列長度配置按照官網預設值,我覺得增加隊列長度而叢集本身沒有很高的處理能力線程還是會 await(事實上實驗結果也是如此在此不必贅述),因為實驗節點機器是 56 核,對照官網:

Flink 實時寫入資料到 ElasticSearch 性能調優

是以修改 size 數值為 56。

Flink 實時寫入資料到 ElasticSearch 性能調優

經過以上的優化,我們發現在 kafka 中的 topic 積壓有明顯變少的趨勢:

Flink 實時寫入資料到 ElasticSearch 性能調優

index buffer size 的優化

參照官網:

Flink 實時寫入資料到 ElasticSearch 性能調優
indices.memory.index_buffer_size : 10%           

translog 優化:

索引寫入 ES 的基本流程是:

  • 資料寫入 buffer 緩沖和 translog;
  • 每秒 buffer 的資料生成 segment 并進入記憶體,此時 segment 被打開并供 search 使用查詢;
  • buffer 清空并重複上述步驟 ;
  • buffer 不斷添加、清空 translog 不斷累加,當達到某些條件觸發 commit 操作,刷到磁盤;

ES 預設的刷盤操作為 Request 但容易部分操作比較耗時,在日志型叢集、允許資料在刷盤過程中少量丢失可以改成異步 async。

另外一次 commit 操作是在 translog 達到某個門檻值執行的,可以把大小(flush_threshold_size )調大,重新整理間隔調大。

index.translog.durability : async
index.translog.flush_threshold_size : 1gb
index.translog.sync_interval : 30s           

效果:

  • Flink 反壓從打滿 100% 降到 40%(output buffer usage):
Flink 實時寫入資料到 ElasticSearch 性能調優

kafka 消費組裡的積壓明顯減少:

Flink 實時寫入資料到 ElasticSearch 性能調優

總結

當 ES 寫入性能遇到瓶頸時,我總結的思路應該是這樣:

  • 看日志,是否有字段類型不比對,是否有髒資料。
  • 看 CPU 使用情況,叢集是否異構
  • 用戶端是怎樣的配置?使用的 bulk 還是單條插入
  • 檢視線程堆棧,檢視耗時最久的方法調用
  • 确定叢集類型:ToB 還是 ToC,是否允許有少量資料丢失?
  • 針對 ToB 等實時性不高的叢集減少副本增加重新整理時間
  • index buffer 優化 translog 優化,滾動重新開機叢集

▼ Apache Flink 社群推薦 ▼

Apache Flink 及大資料領域頂級盛會 Flink Forward Asia 2019 大會議程重磅釋出,參與

問卷調研

就有機會免費擷取門票!

https://developer.aliyun.com/special/ffa2019