需求
計劃寫一系列ElasticSearch如何快速搭建小型搜尋系統的文章,需要一些實際資料來展示ElasticSearch的搜尋原理,選擇使用kaggle公開資料集Movies Dataset,由于是用來展示ElasticSearch的搜尋原理,隻保留id,name兩列,使用Logstash将資料導入ElasticSearch。
Logstash原理
Logstash是一個開源的資料收集引擎,是ELK技術棧中的L。可以完成資料的ETL,友善的将資料導入ElasticSearch等。
Logstash資料處理流程有三個主要Stage,inputs → filters → outputs.
- Inputs從文本,資料庫等讀取資料進來,每條記錄為一個event
- filter對資料做處理轉換等操作,如分隔,小寫等。
- output将event寫入資料持久化層。
資料導入步驟詳解
配置Logstash的Pipeline
Logstash的Pipeline通過配置檔案來配置,movie-pipeline.yml
input {
file {
path => ["/Users/dxi/data/movies_metadata.csv"]
start_position => "beginning"
}
}
filter {
csv {
separator => ","
columns => ["id", "title"]
skip_header => "true"
}
mutate {
copy => {"id" => "[@metadata][_id]"}
remove_field => ["@version"]
}
}
output {
elasticsearch {
index => "movies_metadata"
document_id => "%{[@metadata][_id]}"
doc_as_upsert => true
action => "update"
hosts => "http://localhost:9200"
manage_template => true
template => "/Users/dxi/Documents/projects/elasticsearch/logstash/movie-template.json"
template_name => "movie-template"
template_overwrite => true
}
}
定義資料在ElasticSearch中的資料類型
資料進入到ElasticSearch中需要明确資料類型,資料Analyzer等,需要定義ES index template
{
"template": "movies*",
"order": 0,
"settings": {
"index.max_result_window": "50000",
"analysis": {
"filter": {
"word_joiner": {
"type": "word_delimiter",
"generate_word_parts": true,
"generate_number_parts": true,
"split_on_numerics": false,
"split_on_case_change": false,
"preserve_original": true,
"catenate_all": true,
"catenate_numbers": false,
"catenate_words": false
},
"autocomplete": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 18,
"token_chars": [
"letter",
"digit"
]
}
},
"analyzer": {
"title_analyzer": {
"type": "custom",
"tokenizer": "whitespace",
"filter": [
"lowercase",
"word_joiner",
"autocomplete"
]
},
"title_search_analyzer": {
"type": "custom",
"tokenizer": "whitespace",
"filter": [
"lowercase",
"word_joiner"
]
}
}
}
},
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "title_analyzer",
"search_analyzer": "title_search_analyzer",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"release_date": {
"type": "date"
}
}
}
}
資料導入
運作Logstash
logstash -f movie-pipeline.yml
Chrome中安裝ElasticSearch Head插件,檢視資料是否已經導入。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIyVGduV2YfNWawNCM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TUzMWMoJzYrh2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLzQTNxAjNwQTM0IjNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
Logstash如何保證不重複導入
file input 插件會将檔案狀态寫入到sincedb檔案中,預設sincedb是存在data目錄下。
Tiger:file dxi$ pwd
/usr/local/Cellar/logstash-full/7.13.2/libexec/data/plugins/inputs/file
Tiger:file dxi$ cat .sincedb_91f4e9a506f303fbd10bfcf80a2a84b4
21022558 1 4 1138439 1624507508.353749 /Users/dxi/data/movies_metadata.csv
sincedb中記錄了檔案的inode,時間戳,路徑等。若Logstash重新啟動首先會讀寫sincedb檔案,確定從上次狀态記錄繼續操作,而不會出現重複導入。
如果你确實需要重新導入,則删掉sincedb檔案即可。
總結
Logstash已經有很多插件供使用,可以快速實作資料的ETL,不需要額外寫代碼。如果ETL邏輯特别複雜,filter支援用Ruby實作更加靈活的資料處理操作,Logstash是用JRuby寫的,是以不需額外配置開發環境。