天天看點

elk筆記19--es python api

elk筆記19--es python api

  • ​​1 Elasticsearch​​
  • ​​1.1 基礎連接配接、寫入、查詢​​
  • ​​1.2 通過 scroll api 拉取資料​​
  • ​​2 Indices​​
  • ​​2.1 indices 基礎建立、删除​​
  • ​​3 Ingest​​
  • ​​4 Cluster​​
  • ​​5 Nodes​​
  • ​​6 Cat​​
  • ​​7 Snapshot​​
  • ​​8 Tasks​​
  • ​​9 說明​​

elasticsearch-py 是es官方提供的低級用戶端。其目标是為所有es相關的python 代碼提供共同基礎;是以,其嘗試自由釋出觀點,切具備很好的擴充性。

es 官方也提供了一個進階的用戶端庫 ​elasticsearch-dsl ,其具有較多的限制範圍,但是它是基于 elasticsearch-py 的一個更加 python 化的庫。

本文基于 elasticsearch-py 介紹 es 常見的 python api,及其使用中的注意事項。

1 Elasticsearch

1.1 基礎連接配接、寫入、查詢

#!/usr/bin/python
# -*- coding:utf-8 -*-
# refer https://elasticsearch-py.readthedocs.io/en/7.7.1/

from elasticsearch import Elasticsearch
import json
import datetime


def es_connect():
    # 單個節點,預設為9200
    # es = Elasticsearch(['10.120.75.103'])
    # es = Elasticsearch(['10.120.75.103:9200'])
    # es = Elasticsearch([{'host': '10.120.75.103', 'port': 9200}])
    # 多個節點
    # es = Elasticsearch(['10.120.75.103', '10.120.75.107'])
    # es = Elasticsearch(['10.120.75.103:9200', '10.120.75.107:9200'])
    # es = Elasticsearch([{'host': '10.120.75.103', 'port': 9200}, {'host': '10.120.75.107', 'port': 9200}])
    # 帶認證節點, 增加 http_auth 選項, 等價于 curl -u elastic:elastic 10.120.75.102:9204
    es = Elasticsearch(['10.120.75.102:9204'], http_auth=('elastic', 'elastic'), timeout=30, max_retries=5)
    print('es_connect:')
    print(json.dumps(es.info()))


def es_index():
    es = Elasticsearch(['10.120.75.103:9201'])
    doc = {
        'author': 'kimchy',
        'text': 'Elasticsearch: cool. bonsai cool.',
        'timestamp': datetime.datetime.now(),
    }
    res = es.index(index="test-index", id=1, body=doc)
    print(res['result'])


def es_refresh():
    es = Elasticsearch(['10.120.75.103:9201'])
    res = es.indices.refresh(index="test-index")
    print(res)


def es_get():
    es = Elasticsearch(['10.120.75.103:9201'])
    # GET test-index/_doc/1
    res = es.get(index="test-index", id=1)
    print(res['_source'])


def es_search():
    es = Elasticsearch(['10.120.75.103:9201'])
    res = es.search(index="test-index", body={"query": {"match_all": {}}})
    print("Got %d Hits:" % res['hits']['total']['value'])
    for hit in res['hits']['hits']:
        print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])


if __name__ == '__main__':
    es_connect()
    es_index()
    es_refresh()
    es_get()
    es_search()

純指令式查詢:
curl -XGET --header 'Content-Type: application/json' -u user:password 10.120.75.103:9201/platform_log/_count -d '{
        "query": {
            "bool": {
                "must": [
                  {"wildcard":{"hostname.keyword":{"value":"dds-tagging-sim-*"}}},
                  {"term":{"level.keyword":{"value":"ERROR"}}},
                  {"range":{"timestamp":{"gte":"now-30m","lte":"now"}}}
                ]
            }
        }
}'      

1.2 通過 scroll api 拉取資料

es預設size最大值為10000,當資料過多的時候,直接search無法拉取所有資料,是以推薦使用scroll api來拉取資料;

以下為一個scroll api的案例,實際業務中隻需要将過濾字段按需添加到 must、must_not、should、filter 中即可;

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import traceback
import requests
import time
from base64 import b64encode

USER = 'elastic'
PWD = 'elastic'
URL = 'http://127.0.0.1:9200'

data = []

ONE_DAY = 24 * 3600 * 1000 #ms

def get_now_ms():
    ts = int(time.time() * 1000)
    return ts
    

def get_base64_str(str_text):
    str_user_pwd = b64encode(str_text.encode('ascii'))
    return str_user_pwd.decode('ascii')
    
    
def data_scroll(index, scroll_id, scroll_size='1m'):
    payload = {
        'scroll': scroll_size,
        'scroll_id': scroll_id
    }
    try:
        r = requests.post(URL + '/_search/scroll',
                          data=json.dumps(payload),
                          headers={"Content-Type": "application/json",'Authorization': 'Basic ' + get_base64_str(USER+':'+PWD)}, timeout=10)
        data.extend(r.json()['hits']['hits'])
        if r.json()['_scroll_id'] and r.json()['hits']['hits']:
            return data_scroll(index, r.json()['_scroll_id'], scroll_size)
        return
    except:
        traceback.print_exc()
        return


def get_data(index_name, time_start, time_end):
    payload = {
        "size": 1000,
        "version": True, "sort": [{"@timestamp": {"order": "asc", "unmapped_type": "boolean"}}],
        "query": {
            "bool": {
                "must": [
                    {
                        "match_all": {}
                    },
                    {
                        "range": {
                            "@timestamp": {
                                "gte": time_start,
                                "lte": time_end,
                                "format": "epoch_millis"
                            }
                        }
                    }
                ],
                "filter": [],
                "should": [],
                "must_not": []
            }
        }
    }
    url = URL+'/{}/_search?scroll=1m'.format(index_name)
    r = requests.post(url=url, data=json.dumps(payload), headers={"Content-Type": "application/json",'Authorization': 'Basic ' + get_base64_str(USER+':'+PWD)}, timeout=10)
    data.extend(r.json()['hits']['hits'])
    data_scroll(index_name, r.json()['_scroll_id'], '1m')
    with open('data.json', 'w') as f:
        f.write(json.dumps(data))
    print(len(data))


if __name__ == '__main__':
    time_end = get_now_ms()
    time_start = time_end - ONE_DAY
    get_data('k8s_test-*', time_start, time_end)      

2 Indices

2.1 indices 基礎建立、删除

es.indices 對應es api中indices目錄,包括如下api:

Create index

Delete index

Get index

Index exists

Close index

Open index

Shrink index

Split index

Clone index

Rollover index

Freeze index

Unfreeze index

Resolve index

此處以create,delete,get 為案例加以說明,其它類似:

#!/usr/bin/python
# -*- coding:utf-8 -*-

from elasticsearch import Elasticsearch
import json
import datetime


def es_create():
    es = Elasticsearch(['127.0.0.1:9200'], timeout=30, max_retries=5)
    ret = None
    if not es.indices.exists('test-index'):
        ret = es.indices.create('test-index')
    print(json.dumps(ret))


def es_get():
    # 此處get擷取索引的整體資訊
    es = Elasticsearch(['127.0.0.1:9200'], timeout=30, max_retries=5)
    ret = None
    ret = es.indices.get('test-index')
    print(json.dumps(ret))


def es_delete():
    es = Elasticsearch(['127.0.0.1:9200'], timeout=30, max_retries=5)
    ret = None
    if es.indices.exists('test-index'):
        ret = es.indices.delete('test-index')
    print(json.dumps(ret))


def es_index():
    es = Elasticsearch(['127.0.0.1:9200'])
    doc = {
        'author': 'kimchy',
        'text': 'Elasticsearch: cool. bonsai cool.',
        'timestamp': datetime.datetime.now(),
    }
    res = es.index(index="test-index", id=1, body=doc)
    print(res['result'])
    ret = es.count(index="test-index")
    print(json.dumps(ret))


if __name__ == '__main__':
    es_create()
    es_get()
    es_index()
    es_delete()      

3 Ingest

es.ingest 中包含如下内容, 以下以put,get,delete為案例加以說明:

Put pipeline to add or update a pipeline

Get pipeline to return a specific pipeline

Delete pipeline to delete a pipeline

Simulate pipeline to simulate a call to a pipeline

#!/usr/bin/python
# -*- coding:utf-8 -*-

from elasticsearch import Elasticsearch
import json
import datetime


def ingest_put():
    es = Elasticsearch(['127.0.0.1:9200'])
    body = {
        "description": "describe pipeline",
        "processors": [
            {
                "set": {
                    "field": "foo",
                    "value": "bar"
                }
            }
        ]
    }
    ret = es.ingest.put_pipeline(id='pipe01', body=body)
    print(json.dumps(ret))


def ingest_get():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.ingest.get_pipeline(id='pipe01')
    print(json.dumps(ret))


def ingest_delete():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.ingest.delete_pipeline(id='pipe01')
    print(json.dumps(ret))


if __name__ == '__main__':
    ingest_put()
    ingest_get()
    ingest_delete()      

4 Cluster

es.cluster 中包含 如下常見設定:

Cluster allocation explain

Cluster get settings

Cluster health

Cluster reroute

Cluster state

Cluster stats

Cluster update settings

pending_tasks

此處以 allocation_explain、health、cget_settings、put_settings 加以說明。

#!/usr/bin/python
# -*- coding:utf-8 -*-

from elasticsearch import Elasticsearch
from elasticsearch import RequestError
import json


def cluster_explain():
    es = Elasticsearch(['127.0.0.1:9200'])
    body = {}
    # 也可以指定 index,primary,shard 等3個參數;不指定預設傳回第一個 unassigned 的shard
    try:
        ret = es.cluster.allocation_explain(body=body)
        print(json.dumps(ret))
    except RequestError as e:
        print(e)


def cluster_health():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.cluster.health()
    print(json.dumps(ret))


def cluster_get_settings():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.cluster.get_settings()
    print(json.dumps(ret))


def cluster_put_settings():
    es = Elasticsearch(['127.0.0.1:9200'])
    body = {"persistent": {"indices.recovery.max_bytes_per_sec": "50mb"}}
    # cluster settings 是部分更新的,put哪部分就更新哪部分
    ret = es.cluster.put_settings(body=body)
    print(json.dumps(ret))


if __name__ == '__main__':
    cluster_explain()
    cluster_health()
    cluster_get_settings()
    cluster_put_settings()      

5 Nodes

es.nodes 中包含如下常見 api:

Nodes feature usage

Nodes hot threads

Nodes info

Nodes reload secure settings

Nodes stats

此處以為 hot_threads 和 info 加以說明:

#!/usr/bin/python
# -*- coding:utf-8 -*-

from elasticsearch import Elasticsearch
import json
import datetime


def node_hot_threads():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.nodes.hot_threads()
    # 也可以指定 node_id 檢視具體某個node的線程資訊
    print(json.dumps(ret))


def node_info():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.nodes.info(node_id=None)
    print(json.dumps(ret))


if __name__ == '__main__':
    node_hot_threads()
    node_info()      

6 Cat

es.cat 中包含如下常見 api:

cat aliases

cat allocation

cat anomaly detectors

cat count

cat data frame analytics

cat datafeeds

cat fielddata

cat health

cat indices

cat master

cat nodeattrs

cat nodes

cat pending tasks

cat plugins

cat recovery

cat repositories

cat shards

cat segments

cat snapshots

cat task management

cat templates

cat thread pool

cat trained model

cat transforms

大部分使用方法類似,此處以 alias、allocation、health 加以說明:

#!/usr/bin/python
# -*- coding:utf-8 -*-

from elasticsearch import Elasticsearch
import json
import datetime


def cat_alias():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.cat.aliases()
    print(json.dumps(ret))


def cat_allocation():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.cat.allocation()
    # 檢視節點磁盤、分片資訊等
    print(json.dumps(ret))


def cat_health():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.cat.health()
    print(json.dumps(ret))


if __name__ == '__main__':
    cat_alias()
    cat_allocation()
    cat_health()      

7 Snapshot

es.snapshot 中包含如下常見 api:

Put snapshot repository

Verify snapshot repository

Get snapshot repository

Delete snapshot repository

Clean up snapshot repository

Clone snapshot

Create snapshot

Get snapshot

Get snapshot status

Restore snapshot

Delete snapshot

此處以 snapshot 和 create加以說明,具體操作見筆者博文:​​​elk筆記11–快照的使用​​

#!/usr/bin/python
# -*- coding:utf-8 -*-

from elasticsearch import Elasticsearch
import json
import datetime


def snapshot_cleanup():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.snapshot.cleanup_repository(repository='my_repository')
    print(json.dumps(ret))


def snapshot_create():
    es = Elasticsearch(['127.0.0.1:9200'])
    body = None
    # body 可以填寫具體 indices 和 include_global_state 等各類參數
    ret = es.snapshot.create(repository='my_repository', snapshot='test_index_20201127', body=body)
    print(json.dumps(ret))


if __name__ == '__main__':
    snapshot_cleanup()
    snapshot_create()      

8 Tasks

#!/usr/bin/python
# -*- coding:utf-8 -*-

from elasticsearch import Elasticsearch
import json
import datetime


def task_cancel():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.tasks.cancel(task_id='oTUltX4IQMOUUVeiohTt8A:12345')
    print(json.dumps(ret))


def task_get():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.tasks.get()
    print(json.dumps(ret))


def task_list():
    es = Elasticsearch(['127.0.0.1:9200'])
    ret = es.tasks.list()
    print(json.dumps(ret))


if __name__ == '__main__':
    task_get()
    task_list()
    task_cancel()      

9 說明

  1. 參考文檔

    ​​​elasticsearch-py 官文​​​​Python Elasticsearch api​​

  2. 補充說明

    上述 api 清單有部分直接從 官方文檔 elasticsearch/reference 的api 清單中複制的,存在部分和python api 功能不完全對應的現象,但是基本不影響使用 es py api。