
創作人:李增勝
Elastic 提供了三種方式進行資料加工處理:Logstash、Beats Processors 以及 Ingest Pipeline,本文着重介紹 Ingest Pipeline,以下比較了 Logstash 與 Ingest Pipeline的一些差別,便于在實際業務場景中選擇:
種類 | 部署 | 資料緩沖 | 資料處理 | 資料源 |
---|---|---|---|---|
Logstash | 需要另外部署,增加複雜性 | 采用隊列機制緩沖資料,多隊列支援 | 支援大量processors,遠超 ingest | 支援外部資料源,如MYSQL、Kafka、Beats等 |
Ingest pipeline | 無需另外部署,易于擴充 | 無緩沖政策 | 支援超過30種processors | Ingest 也可和 Beats 或者 Logstash 解決特定場景資料源問題 |
總結:
- 如果業務場景 Ingest pipeline 已經能處理完成,則無需使用 Logstash ,相反,如果業務處理資料場景要支援外部資料源,則選擇 Logstash
- 如果業務場景需要緩沖資料,則采用 Logstash 較優
- 如果資料處理完成後需要輸出到非 Elasticsearch 内部,則采用 Logstash
- 在簡化配置友善,如果想配置簡單,則選擇 Elasticsearch ingest pipeline 即可
顯然,Ingest pipeline 并非 Logstatsh 的替代品,需要根據自己的業務處理資料的要求和架構設計來選擇對應的技術,并非二選一,也可以同時使用,對處理不同資料采用不同的技術架構。
Kibana Dev Tools 管理 Pipeline
Ingest Pipeline
用于預處理資料,由 Elasticsearch Ingest Node 節點負責運作處理,如需要系統性能提升可單獨部署 Ingest Node 節點
優點:
- 由 Ingest Node 節點負責處理,職責清晰
- 更多 Processors 支援,擴充性強
- 輕量級,覆寫了 Logstash 大多常用場景
Ingest Pipeline 是一系列處理管道,由一系列的 Processors 組成處理,先來看下 pipeline 的處理過程:
在 Kibana 中也可以建立 Ingest pipeline,在稍微章節給出示例。
常用 的 Processors 如下
更多 Pipeline Processors 參考更多; https://www.elastic.co/guide/en/elasticsearch/reference/master/processors.html
Trim
去除空格,如果是字元串類型的數組,數組中所有字元串都會被替換空格處理
Split
切分字元串,使用指定切分符,切分字元串為數組結構,隻作用與字元串類型
Rename
重命名字段
Foreach
對一組資料進行相同的預處理,可以使用 Foreach
Lowercase / Uppercase
對字段進行大小寫轉換
Script
使用腳本語言進行資料預處理
Gsub
對字元串進行替換
Append
添加資料到數組
Set
設定字段值
Remove
移除字段
去除字元串中的空格
PUT _ingest/pipeline/trim_pipeline
{
"processors": [
{
"foreach": {
"field": "message",
"processor": {
"trim": {
"field": "_ingest._value"
}
}
}
}
]
}
POST _ingest/pipeline/trim_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"car222 ",
" auto2222 "
]
}
}
]
}
#傳回:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : [
"car222",
"auto2222"
]
},
"_ingest" : {
"_value" : null,
"timestamp" : "2021-04-28T13:19:13.542743Z"
}
}
}
]
}
Split / Foreach
切分字元串,使用指定切分符,切分字元串為數組結構,隻作用于字元串類型
PUT _ingest/pipeline/split_pipeline
{
"processors": [
{
"foreach": {
"field": "message",
"processor": {
"split": {
"field": "_ingest._value",
"separator": " "
}
}
}
}
]
}
#測試
POST _ingest/pipeline/split_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"car222 aaa",
" auto2222 aaaa bbb"
]
}
}
]
}
#傳回,可以看到 message 按照空格切分為了多個字元串數組
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : [
[
"car222",
"aaa"
],
[
"",
"auto2222",
"aaaa",
"bbb"
]
]
},
"_ingest" : {
"_value" : null,
"timestamp" : "2021-04-28T13:28:20.762312Z"
}
}
}
]
}
重命名一個字段, rename 往往和 reindex 結合使用
POST goods_info_comment_message/_bulk
{"index":{"_id":1}}
{"message":"美 國蘋果 "}
{"index":{"_id":2}}
{"message":"山東 蘋果 "}
#定義 rename_pipeline
PUT _ingest/pipeline/rename_pipeline
{
"processors": [
{
"rename": {
"field": "message",
"target_field": "message_new"
}
}
]
}
#重建 index
POST _reindex
{
"source": {
"index": "goods_info_comment_message"
},
"dest": {
"index": "goods_info_comment_message_new",
"pipeline": "rename_pipeline"
}
}
#查詢 mapping
GET goods_info_comment_message_new/_mapping
#傳回
{
"goods_info_comment_message_new" : {
"mappings" : {
"properties" : {
"message_new" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
将字元串修改為大寫或者小寫
PUT _ingest/pipeline/lowercase_pipeline
{
"description": "lowercase processor",
"processors": [
{
"lowercase": {
"field": "message"
}
}
]
}
#測試,部分字元大寫
POST _ingest/pipeline/lowercase_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"CAr222 aaa",
" auto2222 aaaa Bbb"
]
}
}
]
}
#結果,全部輸出為小寫
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : [
"car222 aaa",
" auto2222 aaaa bbb"
]
},
"_ingest" : {
"timestamp" : "2021-04-28T15:12:10.041308Z"
}
}
}
]
}
移除已經存在的字段
#定義remove pipelint
PUT _ingest/pipeline/remove_pipeline
{
"description": "remove processor",
"processors": [
{
"remove": {
"field": "message"
}
}
]
}
#測試
POST _ingest/pipeline/remove_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"CAr222 aaa",
" auto2222 aaaa Bbb"
]
}
}
]
}
#傳回,可以看到message字段已經被移除
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : { },
"_ingest" : {
"timestamp" : "2021-04-28T15:15:27.811516Z"
}
}
}
]
}
給已有字段進行指派
PUT _ingest/pipeline/set_pipeline
{
"description": "set processor",
"processors": [
{
"set": {
"field": "message",
"value": "this is a new message"
}
}
]
}
POST _ingest/pipeline/set_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": "this"
}
}
]
}
#傳回
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : "this is a new message"
},
"_ingest" : {
"timestamp" : "2021-04-28T15:21:28.928512Z"
}
}
}
]
}
下面介紹如何在 kibana 中通過界面來建立 Pipeline,打開 Kibana 首頁:
選擇 Ingest Node Pipelines,右邊會展示已有的 Pipeline 清單
選擇新建立 Ppipeline
我們選擇建立一個 lowercase processor
點選 Add documents 進行相關測試
添加測試文檔:
[
{
"_index": "index_lowercase",
"_id": "1",
"_source": {
"message": "This is a Test"
}
}
]
可以看到,測試成功,字元串全部變為了小寫