天天看点

极速导入:Logstash并发处理,每次批量导入20万条数据

作者:修身服务站

要实现多表导入,可以使用多个命令窗口来同时执行多个 Logstash 实例,每个实例负责导入不同的表。下面是一个基本的示例:

  1. 创建多个配置文件:根据你的需求,为每个表创建一个独立的配置文件,例如 config_table1.conf、config_table2.conf 等。每个配置文件应包含适当的输入插件和输出插件配置,以及与特定表相关的过滤器配置。
  2. 打开多个命令窗口:为每个表导入打开一个独立的命令窗口。
  3. 在每个命令窗口中运行 Logstash:在每个命令窗口中,使用不同的配置文件运行 Logstash。

例如,在第一个命令窗口中运行

logstash -f /path/to/config_table1.conf

在第二个命令窗口中运行:

logstash -f /path/to/config_table2.conf

确保根据实际的配置文件路径进行相应的调整。

4、Logstash 导入数据:当 Logstash 运行时,它将根据配置文件中的配置从相应的输入源读取数据,并将其导入到相应的输出目标中。每个命令窗口将独立处理不同的表数据导入。

这种方式可以并行处理多个表的导入,提高导入效率。请根据实际需求和系统资源进行适当的调整和优化。

导入ES配置

input {
    stdin {
    }
	jdbc {
      # 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/bigdata?characterEncoding=utf-8&generateSimpleParameterMetadata=true&useSSL=false&allowMultiQueries=true&serverTimezone=GMT%2B8"
      # 你的账户密码
      jdbc_user => "root"
      jdbc_password => "123456"
      # 连接数据库的驱动包,建议使用绝对地址  就是第1步中的驱动jar包
      jdbc_driver_library => "D:\install\logstash-7.14.0\big_mysql\mysql-connector-java-8.0.22.jar"
      # mysql驱动
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      # 是否分页
      jdbc_paging_enabled => "true"
      # 一页50000条
      jdbc_page_size => "200000"
      # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
      lowercase_column_names => false
      # 处理中文乱码问题
      codec => plain { charset => "UTF-8"}
      # 记录上一次运行记录
      record_last_run => true     
      # 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值  这个就是增量数据同步的关键  
      last_run_metadata_path => "D:\install\logstash-7.14.0\big_mysql\hosp_assay_req_last_value.txt"
      jdbc_default_timezone => "Asia/Shanghai"
      # 从数据库查询的sql存放路径
      statement_filepath => "D:\install\logstash-7.14.0\big_mysql\hosp_assay_req.sql"
      # 是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录
      clean_run => false
      # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 比如我这个是2分钟一次
      schedule => "* * * * *"
      type => "hosp_assay_req"
    }
}

filter {
}
 
 
output {
    elasticsearch {
      # 要导入到的Elasticsearch所在的主机
      hosts => "172.160.10.189:9200"
      # 要导入到的Elasticsearch的索引的名称
      index => "hosp_assay_req"
      # 主键名称(类似数据库主键)这里的这个id就是数据字段id  名字一样就能对应上
      document_id => "%{id}"
    }	
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}           

内存足够大,可以继续放大 不限于 20w 数据。

日志文件占用处理方式:

logstash -f /path/to/config_file.conf --quiet

导入是Data占用处理方式

logstash -f /path/to/config_file.conf --path.data=/path/to/data1

解决多进程批量导入历史数据

logstash -f /path/to/config_file.conf --quiet --path.data=/path/to/data1

继续阅读