天天看點

使用Python腳本進行es資料清理需求背景實作思路具體實作結果驗證注意事項

需求背景

業務系統将各類的報表和統計資料存放于ES中,由于曆史原因,系統每天均以全量方式進行統計,随着時間的推移,ES的資料存儲空間壓力巨大。同時由于沒有規劃好es的索引使用,個别索引甚至出現超過最大文檔數限制的問題,現實情況給運維人員帶來的挑戰是需要以最小的代價來解決這個問題。下面以内網開發、測試環境舉例使用python腳本解決這個問題。

Each Elasticsearch shard is a Lucene index. There is a maximum number of documents you can have in a single Lucene index. As of LUCENE-5843, the limit is 2,147,483,519 (= Integer.MAX_VALUE - 128) documents. You can monitor shard sizes using the _cat/shards API.

實作思路

es本身支援“_delete_by_query”的形式對查詢出來的資料進行删除。首先我們通過”_cat/indices“入口擷取目前es服務上所有的索引資訊。

使用Python腳本進行es資料清理需求背景實作思路具體實作結果驗證注意事項

第一清單示索引目前的健康狀态

第三清單示索引的名稱

第四清單示索引在伺服器上的存儲目錄名

第五、六清單示索引的副本數和分片資訊

第七清單示目前索引的文檔數

最後兩列分别表示目前索引的存儲占用空間,倒數第二列等于倒數第一列乘以副本數

curl -X POST "http://192.168.1.19:9400/fjhb-surveyor-v2/_delete_by_query?pretty" -H 'Content-Type: application/json' -d '
     {"query":{ "range": {
            "createTime": {   
                "lt": 1580400000000,    
                "format": "epoch_millis"
            }
        }
}}'           

具體實作

#!/usr/bin/python
# -*- coding: UTF-8 -*-

###導入必須的子產品
import requests
import time
import datetime
import os

#定義擷取ES資料字典函數,傳回索引名和索引占用存儲空間大小字典
def getData(env):
    header = {"Content-Type": "application/x-www-form-urlencoded",
              "user-agent": "User-Agent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36"
}
    data = {}
    with open('result.txt','w+') as f:
        req = requests.get(url=env+'/_cat/indices',headers=header).text
        f.write(req)
        f.seek(0)
        for line in f.readlines():
            data[line.split()[2]] = line.split()[-1]
    return data

#定義unix時間轉換函數,以毫秒形式傳回,傳回值為int類型
def unixTime(day):
    today = datetime.date.today()
    target_day = today + datetime.timedelta(day)
    unixtime = int(time.mktime(target_day.timetuple())) * 1000
    return unixtime

#定義删除es資料函數,調用系統curl指令進行删除,需要傳入環境、需要删除資料的時間範圍(即多少天之前的資料)參數,由于索引數量衆多,我們隻處理超過1G的索引即可
def delData(env,day):
    header = 'Content-Type: application/json'
    for key, value in getData(env).items():
        if 'gb' in value:
            size = float(value.split('gb')[0])
            if size > 1:
                url = env + '/' + key + '/_delete_by_query?pretty'
                command = ("curl -X POST \"%s\" -H '%s' "
                           "-d '{\"query\":{ \"range\": {\"createTime\": {\"lt\": %s,\"format\": \"epoch_millis\"}}}}'" % (
                           url, header, day))
                print(command)
                os.system(command)

if __name__ == '__main__':
    dev = 'http://192.168.1.19:9400'
    test1 = 'http://192.168.1.19:9200'
    test2 = 'http://192.168.1.19:9600'
    day = unixTime(-30)
    delData(dev,day)
    delData(test1,day)
    delData(test2,day)
           

結果驗證

注意事項