天天看點

Ingest pipelines—Elastic Stack 實戰手冊

Ingest pipelines—Elastic Stack 實戰手冊
https://developer.aliyun.com/topic/download?id=1295 · 更多精彩内容,請下載下傳閱讀全本《Elastic Stack實戰手冊》 https://developer.aliyun.com/topic/download?id=1295 https://developer.aliyun.com/topic/es100 · 加入創作人行列,一起交流碰撞,參與技術圈年度盛事吧 https://developer.aliyun.com/topic/es100

創作人:李增勝

Elastic 提供了三種方式進行資料加工處理:Logstash、Beats Processors 以及 Ingest Pipeline,本文着重介紹 Ingest Pipeline,以下比較了 Logstash 與 Ingest Pipeline的一些差別,便于在實際業務場景中選擇:

種類 部署 資料緩沖 資料處理 資料源
Logstash 需要另外部署,增加複雜性 采用隊列機制緩沖資料,多隊列支援 支援大量processors,遠超 ingest 支援外部資料源,如MYSQL、Kafka、Beats等
Ingest pipeline 無需另外部署,易于擴充 無緩沖政策 支援超過30種processors Ingest 也可和 Beats 或者 Logstash 解決特定場景資料源問題

總結:

  • 如果業務場景 Ingest pipeline 已經能處理完成,則無需使用 Logstash ,相反,如果業務處理資料場景要支援外部資料源,則選擇 Logstash
  • 如果業務場景需要緩沖資料,則采用 Logstash 較優
  • 如果資料處理完成後需要輸出到非 Elasticsearch 内部,則采用 Logstash
  • 在簡化配置友善,如果想配置簡單,則選擇 Elasticsearch ingest pipeline 即可

顯然,Ingest pipeline 并非 Logstatsh 的替代品,需要根據自己的業務處理資料的要求和架構設計來選擇對應的技術,并非二選一,也可以同時使用,對處理不同資料采用不同的技術架構。

Kibana Dev Tools 管理 Pipeline

Ingest Pipeline

用于預處理資料,由 Elasticsearch Ingest Node 節點負責運作處理,如需要系統性能提升可單獨部署 Ingest Node 節點

優點:

  • 由 Ingest Node 節點負責處理,職責清晰
  • 更多 Processors 支援,擴充性強
  • 輕量級,覆寫了 Logstash 大多常用場景

Ingest Pipeline 是一系列處理管道,由一系列的 Processors 組成處理,先來看下 pipeline 的處理過程:

Ingest pipelines—Elastic Stack 實戰手冊

在 Kibana 中也可以建立 Ingest pipeline,在稍微章節給出示例。

常用 的 Processors 如下

更多 Pipeline Processors 參考更多; https://www.elastic.co/guide/en/elasticsearch/reference/master/processors.html

Trim

去除空格,如果是字元串類型的數組,數組中所有字元串都會被替換空格處理

Split

切分字元串,使用指定切分符,切分字元串為數組結構,隻作用與字元串類型

Rename

重命名字段

Foreach

對一組資料進行相同的預處理,可以使用 Foreach

Lowercase / Uppercase

對字段進行大小寫轉換

Script

使用腳本語言進行資料預處理

Gsub

對字元串進行替換

Append

添加資料到數組

Set

設定字段值

Remove

移除字段

去除字元串中的空格

PUT _ingest/pipeline/trim_pipeline
{
  "processors": [
    {
      "foreach": {
        "field": "message",
        "processor": {
          "trim": {
            "field": "_ingest._value"
          }
        }
      }
    }
  ]
}

POST _ingest/pipeline/trim_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "car222 ",
          " auto2222 "
        ]
      }
    }
  ]
}

#傳回:
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            "car222",
            "auto2222"
          ]
        },
        "_ingest" : {
          "_value" : null,
          "timestamp" : "2021-04-28T13:19:13.542743Z"
        }
      }
    }
  ]
}
           

Split / Foreach

切分字元串,使用指定切分符,切分字元串為數組結構,隻作用于字元串類型

PUT _ingest/pipeline/split_pipeline
{
  "processors": [
    {
      "foreach": {
        "field": "message",
        "processor": {
          "split": {
            "field": "_ingest._value",
            "separator": " "
          }
        }
      }
    }
  ]
}

#測試
POST _ingest/pipeline/split_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "car222 aaa",
          " auto2222 aaaa bbb"
        ]
      }
    }
  ]
}
#傳回,可以看到 message 按照空格切分為了多個字元串數組
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            [
              "car222",
              "aaa"
            ],
            [
              "",
              "auto2222",
              "aaaa",
              "bbb"
            ]
          ]
        },
        "_ingest" : {
          "_value" : null,
          "timestamp" : "2021-04-28T13:28:20.762312Z"
        }
      }
    }
  ]
}

           

重命名一個字段, rename 往往和 reindex 結合使用

POST goods_info_comment_message/_bulk
{"index":{"_id":1}}
{"message":"美 國蘋果 "}
{"index":{"_id":2}}
{"message":"山東 蘋果 "}


#定義 rename_pipeline
PUT _ingest/pipeline/rename_pipeline
{
  "processors": [
    {
      "rename": {
        "field": "message",
        "target_field": "message_new"
      }
    }
  ]
}

#重建 index
POST _reindex
{
  "source": {
    "index": "goods_info_comment_message"
  },
  "dest": {
    "index": "goods_info_comment_message_new",
    "pipeline": "rename_pipeline"
  }
}

#查詢 mapping
GET goods_info_comment_message_new/_mapping

#傳回
{
  "goods_info_comment_message_new" : {
    "mappings" : {
      "properties" : {
        "message_new" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}
           

将字元串修改為大寫或者小寫

PUT _ingest/pipeline/lowercase_pipeline
{
  "description": "lowercase processor",
  "processors": [
    {
      "lowercase": {
        "field": "message"
      }
    }
  ]
}

#測試,部分字元大寫
POST _ingest/pipeline/lowercase_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "CAr222 aaa",
          " auto2222 aaaa Bbb"
        ]
      }
    }
  ]
}

#結果,全部輸出為小寫
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            "car222 aaa",
            " auto2222 aaaa bbb"
          ]
        },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:12:10.041308Z"
        }
      }
    }
  ]
}
           

移除已經存在的字段

#定義remove pipelint
PUT _ingest/pipeline/remove_pipeline
{
  "description": "remove processor",
  "processors": [
    {
      "remove": {
        "field": "message"
      }
    }
  ]
}

#測試
POST _ingest/pipeline/remove_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "CAr222 aaa",
          " auto2222 aaaa Bbb"
        ]
      }
    }
  ]
}
#傳回,可以看到message字段已經被移除
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : { },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:15:27.811516Z"
        }
      }
    }
  ]
}
           

給已有字段進行指派

PUT _ingest/pipeline/set_pipeline
{
  "description": "set processor",
  "processors": [
    {
      "set": {
        "field": "message",
        "value": "this is a new message"
      }
    }
  ]
}


POST _ingest/pipeline/set_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "this"
      }
    }
  ]
}

#傳回
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : "this is a new message"
        },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:21:28.928512Z"
        }
      }
    }
  ]
}           

下面介紹如何在 kibana 中通過界面來建立 Pipeline,打開 Kibana 首頁:

Ingest pipelines—Elastic Stack 實戰手冊

選擇 Ingest Node Pipelines,右邊會展示已有的 Pipeline 清單

Ingest pipelines—Elastic Stack 實戰手冊

選擇新建立 Ppipeline

Ingest pipelines—Elastic Stack 實戰手冊
Ingest pipelines—Elastic Stack 實戰手冊

我們選擇建立一個 lowercase processor

Ingest pipelines—Elastic Stack 實戰手冊

點選 Add documents 進行相關測試

Ingest pipelines—Elastic Stack 實戰手冊

添加測試文檔:

[
  {
    "_index": "index_lowercase",
    "_id": "1",
    "_source": {
      "message": "This is a Test"
    }
  }
]
           
Ingest pipelines—Elastic Stack 實戰手冊

可以看到,測試成功,字元串全部變為了小寫

Ingest pipelines—Elastic Stack 實戰手冊

繼續閱讀