天天看點

mysqlbinlog入elasticsearch

環境準備:

10.99.35.214上安裝elasticsearch-5.4.1.tar.gz、jdk-8u121-linux-x64.tar.gz、kibana-5.1.1-linux-x86_64.tar.gz、nginx-1.12.2.tar.gz

10.99.35.215、10.99.35.216上安裝elasticsearch-5.4.1.tar.gz、jdk-8u121-linux-x64.tar.gz

10.99.35.209上安裝mysql-5.7.17-linux-glibc2.5-x86_64.tar.gz、jdk-8u121-linux-x64.tar.gz、logstash-5.5.1.tar.gz、maxwell-1.10.7.tar.gz、kafka_2.11-0.11.0.1

209上安裝mysql,通過maxwell抽取binlog日志,發送到kafka中,logstash抽取kafka中資料,并傳給214、215、216組成elasticsearch叢集,在214上通過kibana與nginx來展示

1, 開啟binlog

vim /etc/my.cnf

server-id=1

log-bin=master(這一步開啟binlog)

binlog_format=row

2, mysql授權

GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'aspire@123-';

GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%';

flushprivileges;

3, 配置Maxwell

wget https://github.com/zendesk/maxwell/releases/download/v1.10.7/maxwell-1.10.7.tar.gz

tar xvfz maxwell-1.10.7.tar.gz

4, 開啟maxwell指令行

nohup bin/maxwell --user='maxwell' --password='aspire@123-' --host='10.99.35.209' --producer=kafka --kafka.bootstrap.servers=10.99.35.209:9092 &

解釋:host參數是安裝mysql的那台主機,最後的kafka.bootstrap.servers是安裝kafka叢集的節點主機名和端口号

5, kafka相關配置

說明(我的kafka是安裝在主機名叫10.99.35.209,mysql也安裝在10.99.35.209上,注意kafka裡的配置檔案端口号要和指令行裡給的端口号一緻)

wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz

mv kafka_2.11-0.10.2.1.tgz /usr/local/

cd /usr/local/

tar xvfz kafka_2.11-0.10.2.1.tgz

Kafka使用ZooKeeper,是以需要先啟動一個ZooKeeper伺服器,如果還沒有。可以使用随Kafka一起打包的便捷腳本來擷取一個快速但是比較粗糙的單節點ZooKeeper執行個體

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

Kafka在config目錄下提供了一個基本的配置檔案。為了保證可以遠端通路Kafka,我們需要修改兩處配置。打開config/server.properties檔案,在很靠前的位置有listeners和 advertised.listeners兩處配置的注釋,去掉這兩個注釋,并且根據目前伺服器的IP修改如下:

listeners=PLAINTEXT://:9092

advertised.listeners=PLAINTEXT://10.99.35.209:9092

目前伺服器IP為10.99.35.209,你需要修改為外網或區域網路可以通路到的伺服器IP。

接下來啟動Kafka服務:

nohup bin/kafka-server-start.sh config/server.properties &

建立一個主題叫maxwell以便于接受資料

bin/kafka-topics.sh --create --zookeeper 10.99.35.209:2181 --replication-factor 1 --partitions 1 --topic maxwell

建立好之後可以通過運作以下指令檢視已建立的topic資訊

bin/kafka-topics.sh --list  --zookeeper 10.99.35.209:2181

檢視topic詳細情況

bin/kafka-topics.sh --describe --zookeeper 10.99.35.209:2181  --topic maxwell

啟動生産者視窗

bin/kafka-console-producer.sh --broker-list 10.99.35.209:9092 --topic maxwell

啟動消費者視窗

bin/kafka-console-consumer.sh --zookeeper 10.99.35.209:2181 --topic maxwell --from-beginning

6, 安裝配置logstash

修改logstash配置檔案

log.level: info

path.logs: /usr/local/logstash-5.1.1/logs

執行個體:

啟動Zookeeper

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

啟動Kafka

bin/kafka-server-start.sh config/server.properties

啟動Maxwell

bin/maxwell --user='maxwell' --password='aspire@123-' --host='10.99.35.209' --producer=kafka --kafka.bootstrap.servers=10.99.35.209:9092

Logstash配置如下:

bin/logstash-plugin install logstash-filter-translate

input {

    kafka {

        bootstrap_servers => "10.99.35.209:9092"

        topics => ["maxwell"]

        codec => json {charset => ["ISO-8859-1"]}

        consumer_threads => 5

        decorate_events => true

    }

}

filter {

  mutate {   

       remove_field => ["database","table","ts","xid","commit","old","kafka"]

       rename => ["[data][id]","id"]

       rename => ["[data][first_name]","first_name"]

       rename => ["[data][last_name]","last_name"]

       rename => ["[data][age]","age"]

       rename => ["[data][about]","about"]

       rename => ["[data][interests]","interests"]

   }

   translate {

       field => "type"

       destination => "op_type"

       dictionary => [

         "insert", "index",

         "update", "update",

         "delete", "delete"

       ]

output {

    elasticsearch {

        hosts => ["10.99.35.214:9200"]

        index => "megacorp"       

        document_id => "%{id}"

        document_type => "employee"

        action => "%{op_type}"

        workers => 1

        flush_size => 20000

        idle_flush_time => 10

        template_overwrite => true

    stdout {}

啟動Logstash和Elasticsearch,在MySQL資料庫中增删改資料,Elasticsearch就會實時看到索引的變化。

上一篇: mysql日志
下一篇: MySQL日志

繼續閱讀