環境準備:
10.99.35.214上安裝elasticsearch-5.4.1.tar.gz、jdk-8u121-linux-x64.tar.gz、kibana-5.1.1-linux-x86_64.tar.gz、nginx-1.12.2.tar.gz
10.99.35.215、10.99.35.216上安裝elasticsearch-5.4.1.tar.gz、jdk-8u121-linux-x64.tar.gz
10.99.35.209上安裝mysql-5.7.17-linux-glibc2.5-x86_64.tar.gz、jdk-8u121-linux-x64.tar.gz、logstash-5.5.1.tar.gz、maxwell-1.10.7.tar.gz、kafka_2.11-0.11.0.1
209上安裝mysql,通過maxwell抽取binlog日志,發送到kafka中,logstash抽取kafka中資料,并傳給214、215、216組成elasticsearch叢集,在214上通過kibana與nginx來展示
1, 開啟binlog
vim /etc/my.cnf
server-id=1
log-bin=master(這一步開啟binlog)
binlog_format=row
2, mysql授權
GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'aspire@123-';
GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%';
flushprivileges;
3, 配置Maxwell
wget https://github.com/zendesk/maxwell/releases/download/v1.10.7/maxwell-1.10.7.tar.gz
tar xvfz maxwell-1.10.7.tar.gz
4, 開啟maxwell指令行
nohup bin/maxwell --user='maxwell' --password='aspire@123-' --host='10.99.35.209' --producer=kafka --kafka.bootstrap.servers=10.99.35.209:9092 &
解釋:host參數是安裝mysql的那台主機,最後的kafka.bootstrap.servers是安裝kafka叢集的節點主機名和端口号
5, kafka相關配置
說明(我的kafka是安裝在主機名叫10.99.35.209,mysql也安裝在10.99.35.209上,注意kafka裡的配置檔案端口号要和指令行裡給的端口号一緻)
wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
mv kafka_2.11-0.10.2.1.tgz /usr/local/
cd /usr/local/
tar xvfz kafka_2.11-0.10.2.1.tgz
Kafka使用ZooKeeper,是以需要先啟動一個ZooKeeper伺服器,如果還沒有。可以使用随Kafka一起打包的便捷腳本來擷取一個快速但是比較粗糙的單節點ZooKeeper執行個體
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
Kafka在config目錄下提供了一個基本的配置檔案。為了保證可以遠端通路Kafka,我們需要修改兩處配置。打開config/server.properties檔案,在很靠前的位置有listeners和 advertised.listeners兩處配置的注釋,去掉這兩個注釋,并且根據目前伺服器的IP修改如下:
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://10.99.35.209:9092
目前伺服器IP為10.99.35.209,你需要修改為外網或區域網路可以通路到的伺服器IP。
接下來啟動Kafka服務:
nohup bin/kafka-server-start.sh config/server.properties &
建立一個主題叫maxwell以便于接受資料
bin/kafka-topics.sh --create --zookeeper 10.99.35.209:2181 --replication-factor 1 --partitions 1 --topic maxwell
建立好之後可以通過運作以下指令檢視已建立的topic資訊
bin/kafka-topics.sh --list --zookeeper 10.99.35.209:2181
檢視topic詳細情況
bin/kafka-topics.sh --describe --zookeeper 10.99.35.209:2181 --topic maxwell
啟動生産者視窗
bin/kafka-console-producer.sh --broker-list 10.99.35.209:9092 --topic maxwell
啟動消費者視窗
bin/kafka-console-consumer.sh --zookeeper 10.99.35.209:2181 --topic maxwell --from-beginning
6, 安裝配置logstash
修改logstash配置檔案
log.level: info
path.logs: /usr/local/logstash-5.1.1/logs
執行個體:
啟動Zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
啟動Kafka
bin/kafka-server-start.sh config/server.properties
啟動Maxwell
bin/maxwell --user='maxwell' --password='aspire@123-' --host='10.99.35.209' --producer=kafka --kafka.bootstrap.servers=10.99.35.209:9092
Logstash配置如下:
bin/logstash-plugin install logstash-filter-translate
input {
kafka {
bootstrap_servers => "10.99.35.209:9092"
topics => ["maxwell"]
codec => json {charset => ["ISO-8859-1"]}
consumer_threads => 5
decorate_events => true
}
}
filter {
mutate {
remove_field => ["database","table","ts","xid","commit","old","kafka"]
rename => ["[data][id]","id"]
rename => ["[data][first_name]","first_name"]
rename => ["[data][last_name]","last_name"]
rename => ["[data][age]","age"]
rename => ["[data][about]","about"]
rename => ["[data][interests]","interests"]
}
translate {
field => "type"
destination => "op_type"
dictionary => [
"insert", "index",
"update", "update",
"delete", "delete"
]
output {
elasticsearch {
hosts => ["10.99.35.214:9200"]
index => "megacorp"
document_id => "%{id}"
document_type => "employee"
action => "%{op_type}"
workers => 1
flush_size => 20000
idle_flush_time => 10
template_overwrite => true
stdout {}
啟動Logstash和Elasticsearch,在MySQL資料庫中增删改資料,Elasticsearch就會實時看到索引的變化。