分布式日志问题
1.分布式系统日志
(1)分布多
(2)追踪难
2.处理方式
(1)日志收集
(2)可视化
3.常见日志解决方案
(1)ELK+Kafka
(2) 第三方
Elatsicsearch
1.分布式搜索引擎
2.特性
(1)分布式实时全文搜索引擎
(2)分布式实时分析搜索引擎
(3)分布式实时大数据处理引擎
Kibana
1. Elastic Stack 成员
2.数据分析和可视化
(1)图形
(2)表格
(3) …
2. Web端访问
Logstash
1.Elastic Stack 成员
2. 数据处理管道
(1)Input(采集)
(2)Output(迁移)
Kafka
1.定义
(1) 一个由Java+Scala开发的开源流处理平台
2.特点
(1)稳定性
(2)高吞吐
(3)易扩展
3.依赖环境
(1)Zookeeper

ELK+Kafka日志收集原理
1.Kafka:日志接收+异步处理
2.Logstash:获取日志+发送日志
3.Elasticsearch:存储日志+分析日志
4.Kibana:查询展示
基于Docker搭建ELK+Kafka
1. 安装步骤
① 上传Dockerfile包至ELK+Kafka目录(如果没有Dockerfile文件直接push也可以)
② 进入ELK+Kafka目录
③ 执行以下命令,生成镜像
(1)docker build -t elasticsearch Elasticsearch
(2)docker build -t kibana Kibana
(3)docker build -t kafka Kafka
(4)docker build -t logstash Logstash
④ 创建容器
(1)docker run -d --name kafka -p 9092:9092 kafka
(2)docker run -d --name elasticsearch -p 9200:9200 elasticsearch
(3)docker run -d -it --name kibana -p 5601:5601 --link elasticsearch:elasticsearch kibana
(4)docker run -d -it --name logstash --link elasticsearch:elasticsearch --link kafka:kafka logstash
注意:1 、Kafka 的advertised.listeners 要指向宿主机IP
2 、进行环境搭建前,需要设置vm.max_map_count( 进程中内存映射区域的最大数 量,默认值 65536 )
打开系统配置 vi /etc/sysctl.conf
vm.max_map_count=655360
sysctl -p
3、--link 代表使用别名映射到指定容器
Kafka相关概念
1、Broker:Kafka集群中的服务
2、Producer:消息生产方
3、Consumer:消息消费方
4、Topic:消息类型
Spring Boot整合Kafka实现Producer
① 引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
② 配置文件
spring.kafka.producer.bootstrap-servers=192.168.9.150:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
③ 编写客户端
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("sendMes")
public String sendMessage(){
for (int i=0;i<10;i++){
kafkaTemplate.send(topic,key,message);
}
return "success";
}
使用Logstash实现日志收集
1、logstash.conf
(1)input
《1》bootstrap_servers:指向Kafka服务
《2》group_id:可以自由指定
《3》topics:数组形式,可以填写多个
《4》type :可以自由指定
(2)output
《1》hosts:指向Elasticsearch服务地址,可以有多个
《2》index:表示在Elasticsearch中生成index的规则
《3》user和password:Elasticsearch的用户名和密码
在Kibana中定义规则查询日志
1、创建规则
(1)访问Kibana服务主页
(2)点击左侧选项卡Management→Index Patterns
(3)定义规则名称,完成创建
2、查询数据
(1)点击左侧选项卡Discover
(2)切换查询时间
3、筛选规则
定义通用工具类
1、需求
① 区分不同项目
② 区分普通日志和异常日志
2、分析
(1)输入
《1》项目名、日志级别、日志信息
3、封装
(1) 配置项目名、输入级别、打印异常信息
demo
1、引入依赖
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、application.properties添加配置
#连接Kafka和一下配置
spring.kafka.producer.bootstrap-servers=192.168.9.150:9092 #(宿主机ip)
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer #固定写法
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #固定写法
#配置
kafkaConfig.moduleName=user
3、创建config配置类
package com.qg.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "kafkaConfig")
public class KafkaConfig {
private String moduleName;
public static final class LogType{
public static String INFO="info";
public static String ERROR="error";
}
public static final String qgKey="qg";
public String getModuleName() {
return moduleName;
}
public void setModuleName(String moduleName) {
this.moduleName = moduleName;
}
}
4、编写Kafka工具类
package com.qg.utils;
import com.qg.config.KafkaConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
/***
* Kafka 工具类
*
*/
@Component
public class KafkaUtil {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private KafkaConfig kafkaConfig;
/***
* 发送普通日志的方法
* @param message
* @return
*/
public String sendInfoMessage(String message){
try{
kafkaTemplate.send(kafkaConfig.getModuleName()+"-"+KafkaConfig.LogType.INFO,KafkaConfig.qgKey,message);
}catch (Exception e){
e.printStackTrace();
return "fail";
}
return "success";
}
/***
* 发送异常日志的方法
* @param em
* @return
*/
public String sendErrorMessage(Exception em){
try{
Writer writer=new StringWriter();
PrintWriter pw=new PrintWriter(writer);
em.printStackTrace(pw);
kafkaTemplate.send(kafkaConfig.getModuleName()+"-"+KafkaConfig.LogType.ERROR,KafkaConfig.qgKey,writer.toString());
}catch (Exception e){
e.printStackTrace();
return "fail";
}
return "success";
}
}
5、编写测试类进行测试
package com.qg.controller;
import com.qg.utils.KafkaUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaUtil kafkaUtil;
/**
* 测试发送的具体的方法
* @return
*/
@RequestMapping("/send")
public String testSendMessage() {
try{
kafkaUtil.sendInfoMessage(" start execute testSendMessage function>>>>>>");
int score=1/0;
}catch (Exception e){
kafkaUtil.sendErrorMessage(e);
return "success";
}
return "success";
}
}
6、到kibana图形页面查看打印的日志。
总结
1、分布式系统日志问题
(1)分布多、追踪难
2、ELK+Kafka解决方案
(1)Kafka->日志接收+异步处理
(2)Logstash->获取日志+发送日志
(3)Elasticsearch->存储日志+分析日志
(4)Kibana->查询展示