作者:張劉毅
背景說明
線上業務反應使用 Flink 消費上遊 kafka topic 裡的軌迹資料出現 backpressure,資料積壓嚴重。單次 bulk 的寫入量為:3000/50mb/30s,并行度為 48。針對該問題,為了避免影響線上業務申請了一個與線上叢集配置相同的 ES 叢集。本着複現問題進行優化就能解決的思路進行調優測試。
測試環境
- Elasticsearch 2.3.3
- Flink 1.6.3
- flink-connector-elasticsearch 2_2.11
- 八台 SSD,56 核 :3 主 5 從
Rally 分布式壓測 ES 叢集

從壓測結果來看,叢集層面的平均寫入性能大概在每秒 10 w+ 的 doc。
Flink 寫入測試
配置檔案
config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest"));
config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000"));
config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50"));
config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));
執行代碼片段
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
initEnv(env);
Properties properties = ConfigUtil.getProperties(CONFIG_FILE_PATH);
//從kafka中擷取軌迹資料
FlinkKafkaConsumer010<String> flinkKafkaConsumer010 =
new FlinkKafkaConsumer010<>(properties.getProperty("topic.name"), new SimpleStringSchema(), properties);
//從checkpoint最新處消費
flinkKafkaConsumer010.setStartFromLatest();
DataStreamSource<String> streamSource = env.addSource(flinkKafkaConsumer010);
10//Sink2ES
streamSource.map(s -> JSONObject.parseObject(s, Trajectory.class))
.addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())).name("esSink");
env.execute("flinktest");
運作時配置
任務容器數為 24 個 container,一共 48 個并發。savepoint 為 15 分鐘:
- 運作現象
(1)source 和 Map 算子均出現較高的反壓
(2)ES 叢集層面,目标索引寫入速度寫入陡降
平均 QPS 為:12 k 左右。
(3)對比取消 sink 算子後的 QPS
streamSource.map(s -> JSONObject.parseObject(s, FurionContext.class)).name("withnosink");
平均QPS為:116 k 左右。
有無sink參照實驗的結論:
取消 sink 2 ES 的操作後,QPS 達到 110 k,是之前 QPS 的十倍。由此可以基本判定: ES 叢集寫性能導緻的上遊反壓
優化方向
索引字段類型調整
bulk 失敗的原因是由于叢集 dynamic mapping 自動監測,部分字段格式被識别為日期格式而遇到空字元串無法解析報錯。
解決方案:關閉索引自動檢測。
效果: ES 叢集寫入性能明顯提高但 Flink operator 依然存在反壓:
降低副本數
curl -XPUT{叢集位址}/{索引名稱}/_settings?timeout=3m -H "Content-Type: application/json" -d'{"number_of_replicas":"0"}'
提高 refresh_interval
針對這種 ToB、日志型、實時性要求不高的場景,我們不需要查詢的實時性,通過加大甚至關閉 refresh_interval 的參數提高寫入性能。
curl -XPUT{叢集位址}/{索引名稱}/_settings?timeout=3m -H "Content-Type: application/json" -d '{ "settings": { "index": {"refresh_interval" : -1 } } }'
檢查叢集各個節點 CPU 核數
在 Flink 執行時,通過 Grafana 觀測各個節點 CPU 使用率以及通過 Linux 指令檢視各個節點 CPU 核數。發現 CPU 使用率高的節點 CPU 核數比其餘節點少。為了排除這個短闆效應,我們将在這個節點中的索引 shard 移動到 CPU 核數多的節點。
curl -XPOST {叢集位址}/_cluster/reroute -d'{"commands":[{"move":{"index":"{索引名稱}","shard":5,"from_node":"源node名稱","to_node":"目标node名稱"}}]}' -H "Content-Type:application/json"
以上優化的效果:
經過以上的優化,我們發現寫入性能提升有限。是以,需要深入檢視寫入的瓶頸點。
在 CPU 使用率高的節點使用 Arthas 觀察線程
列印阻塞的線程堆棧
"elasticsearch[ES-077-079][bulk][T#3]" Id=247 WAITING on java.util.concurrent.LinkedTransferQueue@369223fa
at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.LinkedTransferQueue@369223fa
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:737)
at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
at java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1269)
at org.elasticsearch.common.util.concurrent.SizeBlockingQueue.take(SizeBlockingQueue.java:161)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
從上面的線程堆棧我們可以看出線程處于等待狀态。
關于這個問題的讨論詳情檢視
https://discuss.elastic.co/t/thread-selection-and-locking/26051/3,這個 issue 讨論大緻意思是:節點數不夠,需要增加節點。于是我們又增加節點并通過設定索引級别的 total_shards_per_node 參數将索引 shard 的寫入平均到各個節點上。
線程隊列優化
ES 是将不同種類的操作(index、search…)交由不同的線程池執行,主要的線程池有三:index、search 和 bulk thread_pool。線程池隊列長度配置按照官網預設值,我覺得增加隊列長度而叢集本身沒有很高的處理能力線程還是會 await(事實上實驗結果也是如此在此不必贅述),因為實驗節點機器是 56 核,對照官網:
是以修改 size 數值為 56。
經過以上的優化,我們發現在 kafka 中的 topic 積壓有明顯變少的趨勢:
index buffer size 的優化
參照官網:
indices.memory.index_buffer_size : 10%
translog 優化:
索引寫入 ES 的基本流程是:
- 資料寫入 buffer 緩沖和 translog;
- 每秒 buffer 的資料生成 segment 并進入記憶體,此時 segment 被打開并供 search 使用查詢;
- buffer 清空并重複上述步驟 ;
- buffer 不斷添加、清空 translog 不斷累加,當達到某些條件觸發 commit 操作,刷到磁盤;
ES 預設的刷盤操作為 Request 但容易部分操作比較耗時,在日志型叢集、允許資料在刷盤過程中少量丢失可以改成異步 async。
另外一次 commit 操作是在 translog 達到某個門檻值執行的,可以把大小(flush_threshold_size )調大,重新整理間隔調大。
index.translog.durability : async
index.translog.flush_threshold_size : 1gb
index.translog.sync_interval : 30s
效果:
- Flink 反壓從打滿 100% 降到 40%(output buffer usage):
kafka 消費組裡的積壓明顯減少:
總結
當 ES 寫入性能遇到瓶頸時,我總結的思路應該是這樣:
- 看日志,是否有字段類型不比對,是否有髒資料。
- 看 CPU 使用情況,叢集是否異構
- 用戶端是怎樣的配置?使用的 bulk 還是單條插入
- 檢視線程堆棧,檢視耗時最久的方法調用
- 确定叢集類型:ToB 還是 ToC,是否允許有少量資料丢失?
- 針對 ToB 等實時性不高的叢集減少副本增加重新整理時間
- index buffer 優化 translog 優化,滾動重新開機叢集
▼ Apache Flink 社群推薦 ▼
Apache Flink 及大資料領域頂級盛會 Flink Forward Asia 2019 大會議程重磅釋出,參與
問卷調研就有機會免費擷取門票!
https://developer.aliyun.com/special/ffa2019