天天看點

Logstash導入csv檔案到ElasticSearch需求Logstash原理資料導入步驟詳解總結

需求

計劃寫一系列ElasticSearch如何快速搭建小型搜尋系統的文章,需要一些實際資料來展示ElasticSearch的搜尋原理,選擇使用kaggle公開資料集Movies Dataset,由于是用來展示ElasticSearch的搜尋原理,隻保留id,name兩列,使用Logstash将資料導入ElasticSearch。

Logstash原理

Logstash是一個開源的資料收集引擎,是ELK技術棧中的L。可以完成資料的ETL,友善的将資料導入ElasticSearch等。

Logstash資料處理流程有三個主要Stage,inputs → filters → outputs.

  • Inputs從文本,資料庫等讀取資料進來,每條記錄為一個event
  • filter對資料做處理轉換等操作,如分隔,小寫等。
  • output将event寫入資料持久化層。

資料導入步驟詳解

配置Logstash的Pipeline

Logstash的Pipeline通過配置檔案來配置,movie-pipeline.yml

input {
  file {
    path => ["/Users/dxi/data/movies_metadata.csv"]  
    start_position => "beginning"
  }
}

filter {
  csv {
    separator => ","
    columns => ["id", "title"]
    skip_header => "true"
  }
  mutate {
    copy => {"id" => "[@metadata][_id]"}
    remove_field => ["@version"]
  }
}

output {
  elasticsearch {
      index => "movies_metadata"
      document_id => "%{[@metadata][_id]}"
      doc_as_upsert => true
      action => "update"
      hosts => "http://localhost:9200"
      manage_template => true
      template => "/Users/dxi/Documents/projects/elasticsearch/logstash/movie-template.json"
      template_name => "movie-template"
      template_overwrite => true
  }
}
           

定義資料在ElasticSearch中的資料類型

資料進入到ElasticSearch中需要明确資料類型,資料Analyzer等,需要定義ES index template

{
    "template": "movies*",
    "order": 0,
    "settings": {
      "index.max_result_window": "50000",
      "analysis": {
        "filter": {
          "word_joiner": {
            "type": "word_delimiter",
            "generate_word_parts": true,
            "generate_number_parts": true,
            "split_on_numerics": false,
            "split_on_case_change": false,
            "preserve_original": true,
            "catenate_all": true,
            "catenate_numbers": false,
            "catenate_words": false
          },
          "autocomplete": {
            "type": "edge_ngram",
            "min_gram": 2,
            "max_gram": 18,
            "token_chars": [
              "letter",
              "digit"
            ]
          }
        },
        "analyzer": {
          "title_analyzer": {
            "type": "custom",
            "tokenizer": "whitespace",
            "filter": [
              "lowercase",
              "word_joiner",
              "autocomplete"
            ]
          },
          "title_search_analyzer": {
            "type": "custom",
            "tokenizer": "whitespace",
            "filter": [
              "lowercase",
              "word_joiner"
            ]
          }
        }
      }
    },
    "mappings": {
      "properties": {
        "title": {
          "type": "text",
          "analyzer": "title_analyzer",
          "search_analyzer": "title_search_analyzer",
          "fields": {
            "raw": {
              "type": "keyword"
            }
          }
        },
        "release_date": {
          "type": "date"
        }
      }
    }
  }
           

資料導入

運作Logstash

logstash -f movie-pipeline.yml
           

Chrome中安裝ElasticSearch Head插件,檢視資料是否已經導入。

Logstash導入csv檔案到ElasticSearch需求Logstash原理資料導入步驟詳解總結

Logstash如何保證不重複導入

file input 插件會将檔案狀态寫入到sincedb檔案中,預設sincedb是存在data目錄下。

Tiger:file dxi$ pwd
/usr/local/Cellar/logstash-full/7.13.2/libexec/data/plugins/inputs/file
Tiger:file dxi$ cat .sincedb_91f4e9a506f303fbd10bfcf80a2a84b4
21022558 1 4 1138439 1624507508.353749 /Users/dxi/data/movies_metadata.csv
           

sincedb中記錄了檔案的inode,時間戳,路徑等。若Logstash重新啟動首先會讀寫sincedb檔案,確定從上次狀态記錄繼續操作,而不會出現重複導入。

如果你确實需要重新導入,則删掉sincedb檔案即可。

總結

Logstash已經有很多插件供使用,可以快速實作資料的ETL,不需要額外寫代碼。如果ETL邏輯特别複雜,filter支援用Ruby實作更加靈活的資料處理操作,Logstash是用JRuby寫的,是以不需額外配置開發環境。

繼續閱讀