天天看点

Spring Boot+ELK+Kafka

分布式日志问题

1.分布式系统日志
(1)分布多
(2)追踪难
2.处理方式
(1)日志收集
(2)可视化
3.常见日志解决方案
(1)ELK+Kafka
(2) 第三方
           

Elatsicsearch

1.分布式搜索引擎
2.特性
(1)分布式实时全文搜索引擎
(2)分布式实时分析搜索引擎
(3)分布式实时大数据处理引擎
           

Kibana

1. Elastic Stack 成员
2.数据分析和可视化
(1)图形
(2)表格
(3) …
2. Web端访问
           

Logstash

1.Elastic Stack 成员
2. 数据处理管道
(1)Input(采集)
(2)Output(迁移)
           

Kafka

1.定义
(1) 一个由Java+Scala开发的开源流处理平台
2.特点
(1)稳定性
(2)高吞吐
(3)易扩展
3.依赖环境
(1)Zookeeper
           
Spring Boot+ELK+Kafka

ELK+Kafka日志收集原理

1.Kafka:日志接收+异步处理
2.Logstash:获取日志+发送日志
3.Elasticsearch:存储日志+分析日志
4.Kibana:查询展示
           
Spring Boot+ELK+Kafka

基于Docker搭建ELK+Kafka

1. 安装步骤
 ① 上传Dockerfile包至ELK+Kafka目录(如果没有Dockerfile文件直接push也可以)
 ② 进入ELK+Kafka目录
 ③ 执行以下命令,生成镜像
(1)docker build -t elasticsearch Elasticsearch
(2)docker build -t kibana Kibana
(3)docker build -t kafka Kafka
(4)docker build -t logstash Logstash
 ④ 创建容器
(1)docker run -d --name kafka -p 9092:9092 kafka
(2)docker run -d --name elasticsearch -p 9200:9200 elasticsearch
(3)docker run -d -it --name kibana -p 5601:5601 --link elasticsearch:elasticsearch kibana
(4)docker run -d -it --name logstash --link elasticsearch:elasticsearch --link kafka:kafka logstash
注意:1 、Kafka 的advertised.listeners 要指向宿主机IP
	 2 、进行环境搭建前,需要设置vm.max_map_count( 进程中内存映射区域的最大数		           	量,默认值 65536 )
	  打开系统配置   vi /etc/sysctl.conf
	 vm.max_map_count=655360 
	 sysctl -p
	 3、--link  代表使用别名映射到指定容器
           

Kafka相关概念

1、Broker:Kafka集群中的服务
2、Producer:消息生产方
3、Consumer:消息消费方
4、Topic:消息类型
           

Spring Boot整合Kafka实现Producer

① 引入依赖

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
           

② 配置文件

spring.kafka.producer.bootstrap-servers=192.168.9.150:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
           

③ 编写客户端

@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("sendMes")
public String sendMessage(){
	for (int i=0;i<10;i++){
		kafkaTemplate.send(topic,key,message);
	}
	return "success";
}
           

使用Logstash实现日志收集

1、logstash.conf
	(1)input
	  《1》bootstrap_servers:指向Kafka服务
	  《2》group_id:可以自由指定
	  《3》topics:数组形式,可以填写多个
	  《4》type :可以自由指定
   (2)output
	 《1》hosts:指向Elasticsearch服务地址,可以有多个
	 《2》index:表示在Elasticsearch中生成index的规则
	 《3》user和password:Elasticsearch的用户名和密码
           

在Kibana中定义规则查询日志

1、创建规则
(1)访问Kibana服务主页
(2)点击左侧选项卡Management→Index Patterns
(3)定义规则名称,完成创建
2、查询数据
(1)点击左侧选项卡Discover
(2)切换查询时间
3、筛选规则
           

定义通用工具类

1、需求
 ① 区分不同项目
 ② 区分普通日志和异常日志
2、分析
(1)输入
	《1》项目名、日志级别、日志信息
3、封装
(1) 配置项目名、输入级别、打印异常信息
           

demo

1、引入依赖

<!--kafka-->
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>
           

2、application.properties添加配置

#连接Kafka和一下配置
spring.kafka.producer.bootstrap-servers=192.168.9.150:9092 #(宿主机ip)
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer #固定写法
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer  #固定写法

#配置
kafkaConfig.moduleName=user
           

3、创建config配置类

package com.qg.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "kafkaConfig")
public class KafkaConfig {

    private String moduleName;

    public static final class LogType{
        public static String INFO="info";
        public static String ERROR="error";
    }

    public static final String qgKey="qg";

    public String getModuleName() {
        return moduleName;
    }

    public void setModuleName(String moduleName) {
        this.moduleName = moduleName;
    }
}

           

4、编写Kafka工具类

package com.qg.utils;
import com.qg.config.KafkaConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;

/***
 * Kafka 工具类
 *
 */
@Component
public class KafkaUtil {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private KafkaConfig kafkaConfig;

    /***
     * 发送普通日志的方法
     * @param message
     * @return
     */
    public String sendInfoMessage(String message){
        try{
            kafkaTemplate.send(kafkaConfig.getModuleName()+"-"+KafkaConfig.LogType.INFO,KafkaConfig.qgKey,message);
        }catch (Exception e){
            e.printStackTrace();
            return "fail";
        }
        return "success";
    }
    /***
     * 发送异常日志的方法
     * @param em
     * @return
     */
    public String sendErrorMessage(Exception em){
        try{
            Writer writer=new StringWriter();
            PrintWriter pw=new PrintWriter(writer);
            em.printStackTrace(pw);
            kafkaTemplate.send(kafkaConfig.getModuleName()+"-"+KafkaConfig.LogType.ERROR,KafkaConfig.qgKey,writer.toString());
        }catch (Exception e){
            e.printStackTrace();
            return "fail";
        }
        return "success";
    }
}

           

5、编写测试类进行测试

package com.qg.controller;

import com.qg.utils.KafkaUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {


    @Autowired
    private KafkaUtil kafkaUtil;

    /**
     * 测试发送的具体的方法
     * @return
     */
    @RequestMapping("/send")
    public String testSendMessage() {
        try{
            kafkaUtil.sendInfoMessage(" start execute testSendMessage function>>>>>>");
            int score=1/0;
        }catch (Exception e){
            kafkaUtil.sendErrorMessage(e);
            return "success";
        }
        return "success";
    }
}

           

6、到kibana图形页面查看打印的日志。

总结

1、分布式系统日志问题
(1)分布多、追踪难
2、ELK+Kafka解决方案
(1)Kafka->日志接收+异步处理
(2)Logstash->获取日志+发送日志
(3)Elasticsearch->存储日志+分析日志
(4)Kibana->查询展示
           

继续阅读