天天看點

如何提高ElasticSearch 索引速度

我google了下,大緻給出的答案如下:

使用bulk api

初次索引的時候,把 replica 設定為 0

增大 threadpool.index.queue_size

增大 indices.memory.index_buffer_size

增大 index.translog.flush_threshold_ops

增大 index.translog.sync_interval

增大 index.engine.robin.refresh_interval 

這篇文章會講述上面幾個參數的原理,以及一些其他的思路。這些參數大體上是朝着兩個方向優化的:

減少磁盤寫入

增大建構索引處理資源

一般而言,通過第二種方式的需要慎用,會對叢集查詢功能造成比較大的影響。這裡還有兩種形态的解決方案:

關閉一些特定場景并不需要的功能,比如translog或者version等

将部分計算挪到其他并行計算架構上,比如資料的分片計算等,都可以放到spark上事先算好

其中

5,6 屬于 translog 相關。

4 則和lucene相關

3 則因為es裡大量采用線程池,建構索引的時候,是有單獨的線程池做處理的

7 的話個人認為影響不大

2 的話,能夠使用上的場景有限。個人認為replica這塊可以使用kafka的isr機制。所有資料還是都從primary寫和讀。replica盡量隻作為備份資料。

為什麼要有translog? 因為translog順序寫日志比建構索引更高效。我們不可能每加一條記錄就commit一次,這樣會有大量的檔案和磁盤io産生。但是我們又想避免程式挂掉或者硬體故障而出現資料丢失,是以有了translog,通常這種日志我們叫做write ahead log。

為了保證資料的完整性,es預設是每次request結束後都會進行一次sync操作。具體可以檢視如下方法:

該方法會調用indexshard.sync 方法進行檔案落地。

你也可以通過設定index.translog.durability=async 來完成異步落地。這裡的異步其實可能會有一點點誤導。前面是每次request結束後都會進行sync,這裡的sync僅僅是将translog落地。而無論你是否設定了async,都會執行如下操作:

根據條件,主要是每隔sync_interval(5s) ,如果flush_threshold_ops(integer.max_value),flush_threshold_size(512m),flush_threshold_period(30m)  滿足對應的條件,則進行flush操作,這裡除了對translog進行commit以外,也對索引進行了commit。 

是以如果你是海量的日志,可以容忍發生故障時丢失一定的資料,那麼完全可以設定,index.translog.durability=async,并且将前面提到的flush*相關的參數調大。

而極端情況,你還可以有兩個選擇:

設定index.translog.durability=async,接着設定index.translog.disable_flush=true進行禁用定時flush。然後你可以通過應用程式自己手動來控制flush。

通過改寫es 去掉translog日志相關的功能

當然,如果去掉translog日志有兩個風險點:

get最新資料會有點問題。因為根據id get最新資料是從translog裡拿的。

我們知道es通過shard replication 保證node節點出現故障時出現資料的完整性。在relocating的時候,replica 從primary 進行recover時,primary會先snapshot lucene,然後拷貝資料到replica,最後通過回放translog 保證資料的一緻性。

 version可以讓es實作并發修改,但是帶來的性能影響也是極大的,這裡主要有兩塊:

需要通路索引裡的版本号,觸發磁盤讀寫

鎖機制

目前而言,似乎沒有辦法直接關閉version機制。你可以使用自增長id并且在建構索引時,index 類型設定為create。這樣可以跳過版本檢查。

這個場景主要應用于不可變日志導入,随着es被越來越多的用來做日志分析,日志沒有主鍵id,是以使用自增id是合适的,并且不會進行更新,使用一個固定的版本号也是合适的。而不可變日志往往是追求吞吐量。

當然,如果有必要,我們也可以通過改寫es相關代碼,禁用版本管理。

es是對索引進行了分片(shard),然後資料被分發到不同的shard。這樣 查詢和建構索引其實都存在一個問題:

如果是建構索引,則需要對資料分揀,然後根據shard分布分發到不同的node節點上。如果是查詢,則對外提供的node需要收集各個shard的資料做merge

這都會對對外提供的節點造成較大的壓力,進而影響整個bulk/query 的速度。

一個可行的方案是,直接面向客戶提供建構索引和查詢api的node節點都采用client模式,不存儲資料,可以達到一定的優化效果。

另外一個較為麻煩但似乎會更優的解決方案是,如果你使用類似spark streaming這種流式處理程式,在最後往es輸出的時候,可以做如下幾件事情:

擷取所有primary shard的資訊,并且給所有shard帶上一個順序的數字序号,得到partition(順序序号) -> shardid的映射關系

對資料進行repartition,分區後每個partition對應一個shard的資料

周遊這些partions,寫入es。方法為直接通過rpc 方式,類似transportservice.sendrequest 将資料批量發送到對應包含有對應shardid的node節點上。

這樣有三點好處:

所有的資料都被直接分到各個node上直接處理。避免所有的資料先集中到一台伺服器

避免二次分發,減少一次網絡io

防止最先處理資料的node壓力太大而導緻木桶短闆效應

因為我正好要做日志分析類的應用,追求高吞吐量,這樣上面的三個優化其實都可以做了。一個典型隻增不更新的日志入庫操作,可以采用如下方案:

對接spark streaming,在spark裡對資料做好分片,直接推送到es的各個節點

禁止自動flush操作,每個batch 結束後手動flush。

避免使用version

我們可以預期es會産生多少個新的segment檔案,通過控制batch的周期和大小,預判出es segment索引檔案的生成大小和merge情況。最大可能減少es的一些額外消耗

總結

大體是下面這三個點讓es比原生的lucene吞吐量下降了不少:

為了資料完整性  es額外添加了wal(tanslog)

為了能夠并發修改  添加了版本機制

對外提供服務的node節點存在瓶頸

es的線性擴充問題主要受限于第三點,具體描述就是:

如果是建構索引,接受到請求的node節點需要對資料分揀,然後根據shard分布分發到不同的node節點上。如果是查詢,則對外提供的node需要收集各個shard的資料做merge

另外,索引的讀寫并不需要向master彙報。