天天看點

極速導入:Logstash并發處理,每次批量導入20萬條資料

要實作多表導入,可以使用多個指令視窗來同時執行多個 Logstash 執行個體,每個執行個體負責導入不同的表。下面是一個基本的示例:

  1. 建立多個配置檔案:根據你的需求,為每個表建立一個獨立的配置檔案,例如 config_table1.conf、config_table2.conf 等。每個配置檔案應包含适當的輸入插件和輸出插件配置,以及與特定表相關的過濾器配置。
  2. 打開多個指令視窗:為每個表導入打開一個獨立的指令視窗。
  3. 在每個指令視窗中運作 Logstash:在每個指令視窗中,使用不同的配置檔案運作 Logstash。

例如,在第一個指令視窗中運作

logstash -f /path/to/config_table1.conf

在第二個指令視窗中運作:

logstash -f /path/to/config_table2.conf

確定根據實際的配置檔案路徑進行相應的調整。

4、Logstash 導入資料:當 Logstash 運作時,它将根據配置檔案中的配置從相應的輸入源讀取資料,并将其導入到相應的輸出目标中。每個指令視窗将獨立處理不同的表資料導入。

這種方式可以并行處理多個表的導入,提高導入效率。請根據實際需求和系統資源進行适當的調整和優化。

導入ES配置

input {
    stdin {
    }
	jdbc {
      # 連接配接的資料庫位址和哪一個資料庫,指定編碼格式,禁用SSL協定,設定自動重連
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/bigdata?characterEncoding=utf-8&generateSimpleParameterMetadata=true&useSSL=false&allowMultiQueries=true&serverTimezone=GMT%2B8"
      # 你的賬戶密碼
      jdbc_user => "root"
      jdbc_password => "123456"
      # 連接配接資料庫的驅動包,建議使用絕對位址  就是第1步中的驅動jar包
      jdbc_driver_library => "D:\install\logstash-7.14.0\big_mysql\mysql-connector-java-8.0.22.jar"
      # mysql驅動
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      # 是否分頁
      jdbc_paging_enabled => "true"
      # 一頁50000條
      jdbc_page_size => "200000"
      # 是否将字段名轉換為小寫,預設true(如果有資料序列化、反序列化需求,建議改為false)
      lowercase_column_names => false
      # 進行中文亂碼問題
      codec => plain { charset => "UTF-8"}
      # 記錄上一次運作記錄
      record_last_run => true     
      # 上一個sql_last_value值的存放檔案路徑, 必須要在檔案中指定字段的初始值  這個就是增量資料同步的關鍵  
      last_run_metadata_path => "D:\install\logstash-7.14.0\big_mysql\hosp_assay_req_last_value.txt"
      jdbc_default_timezone => "Asia/Shanghai"
      # 從資料庫查詢的sql存放路徑
      statement_filepath => "D:\install\logstash-7.14.0\big_mysql\hosp_assay_req.sql"
      # 是否清除 last_run_metadata_path 的記錄,如果為true那麼每次都相當于從頭開始查詢所有的資料庫記錄
      clean_run => false
      # 這是控制定時的,重複執行導入任務的時間間隔,第一位是分鐘 比如我這個是2分鐘一次
      schedule => "* * * * *"
      type => "hosp_assay_req"
    }
}

filter {
}
 
 
output {
    elasticsearch {
      # 要導入到的Elasticsearch所在的主機
      hosts => "172.160.10.189:9200"
      # 要導入到的Elasticsearch的索引的名稱
      index => "hosp_assay_req"
      # 主鍵名稱(類似資料庫主鍵)這裡的這個id就是資料字段id  名字一樣就能對應上
      document_id => "%{id}"
    }	
    stdout {
        # JSON格式輸出
        codec => json_lines
    }
}           

記憶體足夠大,可以繼續放大 不限于 20w 資料。

日志檔案占用處理方式:

logstash -f /path/to/config_file.conf --quiet

導入是Data占用處理方式

logstash -f /path/to/config_file.conf --path.data=/path/to/data1

解決多程序批量導入曆史資料

logstash -f /path/to/config_file.conf --quiet --path.data=/path/to/data1

繼續閱讀