本人水平:参加工作六个月,刚看完一本《go实战》的菜鸡
代码版本:2019年1月15日使用go get github.com/open-falcon/falcon-plus拉下来的代码
agent的功能就是不停采集机器各种数据发送给其他模块,提供一个自定义push metric的接口,向hbs发送一些心跳信息
1 概览
1.1 目录结构

agent的代码还是比较少的,先粗略讲一下每个包都做什么
- cron:定时任务,包括从hbs同步plugin、内建监控项和可信任IP等信息,向hbs发送心跳信息,收集监控数据
- funcs:收集监控数据的函数,基本都是通过第三方库nux去收集的
- g:全局共享信息的存放,包括配置信息,日志,网络通信的工具类,常量等
- http:API的后端实现
- plugins:用户配置plugin的管理和启动
1.2 main函数
上代码,中文注释都是我自己加的
前面的一些初始化代码都很简单,这里略去不讲,从BuildMappers开始分析
func main() {
// 命令行参数
cfg := flag.String("c", "cfg.json", "configuration file") // 配置文件,默认为 cfg.json
version := flag.Bool("v", false, "show version") // 是否只显示版本,默认不显示
check := flag.Bool("check", false, "check collector") // 是否只检查收集器,默认不检查
// 获取一下命令行参数
flag.Parse()
// 打印版本
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
// 检查是否能正常采集监控数据
if *check {
funcs.CheckCollector()
os.Exit(0)
}
// 读取配置文件
g.ParseConfig(*cfg)
// 设置日志级别
if g.Config().Debug {
g.InitLog("debug")
} else {
g.InitLog("info")
}
// 初始化
g.InitRootDir() // 初始化根目录
g.InitLocalIp() // 初始化本地 IP
g.InitRpcClients() // 初始化 hbs rpc 客户端
// 建立收集数据的函数和发送数据间隔映射
funcs.BuildMappers()
// 定时任务(每秒),通过文件 /proc/cpustats 和 /proc/diskstats,更新 cpu 和 disk 的原始数据,等待后一步处理
go cron.InitDataHistory()
// 定时任务
cron.ReportAgentStatus() // 向 hbs 发送客户端心跳信息
cron.SyncMinePlugins() // 同步 plugins
cron.SyncBuiltinMetrics() // 从 hbs 同步 BuiltinMetrics
cron.SyncTrustableIps() // 同步可信任 IP 地址
cron.Collect() // 采集监控数据
// 开启 http 服务
go http.Start()
// 阻塞 main 函数
select {}
}
2 建立映射
看着很长,其实就是每组采集监控数据的函数(分组大概是基础指标、device、网络、GPU等等),和向transfer发送的数据间隔相对应放到一个叫FuncsAndInterval的自定义结构体里面
/*
收集数据的函数和发送数据间隔映射
*/
func BuildMappers() {
interval := g.Config().Transfer.Interval // 向 transfer 发送数据间隔
// 获取监控数据函数、与发送数据间隔映射
Mappers = []FuncsAndInterval{
{
Fs: []func() []*model.MetricValue{
AgentMetrics,
CpuMetrics,
NetMetrics,
KernelMetrics,
LoadAvgMetrics,
MemMetrics,
DiskIOMetrics,
IOStatsMetrics,
NetstatMetrics,
ProcMetrics,
UdpMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
DeviceMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
PortMetrics,
SocketStatSummaryMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
DuMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
UrlMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
GpuMetrics,
},
Interval: interval,
},
}
}
结构体FuncsAndInterval的定义,很简单
type FuncsAndInterval struct {
Fs []func() []*model.MetricValue // 一组采集监控数据函数
Interval int // 向 transfer 发送数据间隔
}
采集监控数据函数的返回值统一为[]*model.MetricValue,来看两个采集监控数据的函数,先简单扫一遍不追求细节
agent.alive是最熟悉的一个监控项了,不用其他逻辑来采集数据直接就是1
/*
获取 agent 的 MetricValue
返回值固定为 1,表示客户端正常工作
*/
func AgentMetrics() []*model.MetricValue {
return []*model.MetricValue{GaugeValue("agent.alive", 1)}
}
cpu的就复杂很多,十来个,每个都调用了不同的函数去采集监控项
/*
获取 cpu 的 MetricValue
*/
func CpuMetrics() []*model.MetricValue {
if !CpuPrepared() {
return []*model.MetricValue{}
}
// 获取各项 cpu 监控指标
cpuIdleVal := CpuIdle()
idle := GaugeValue("cpu.idle", cpuIdleVal)
busy := GaugeValue("cpu.busy", 100.0-cpuIdleVal)
user := GaugeValue("cpu.user", CpuUser())
nice := GaugeValue("cpu.nice", CpuNice())
system := GaugeValue("cpu.system", CpuSystem())
iowait := GaugeValue("cpu.iowait", CpuIowait())
irq := GaugeValue("cpu.irq", CpuIrq())
softirq := GaugeValue("cpu.softirq", CpuSoftIrq())
steal := GaugeValue("cpu.steal", CpuSteal())
guest := GaugeValue("cpu.guest", CpuGuest())
switches := CounterValue("cpu.switches", CurrentCpuSwitches())
return []*model.MetricValue{idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches}
}
3 定时任务
这部分是agent功能的主体,每个定时任务都是开了一个goroutine去完成的,
// 定时任务(每秒),通过文件 /proc/stat 和 /proc/diskstats,更新 cpu 和 disk 的原始数据,等待后一步处理
go cron.InitDataHistory()
// 定时任务
cron.ReportAgentStatus() // 向 hbs 发送客户端心跳信息
cron.SyncMinePlugins() // 同步 plugins
cron.SyncBuiltinMetrics() // 从 hbs 同步客户端需要采集的 metric
cron.SyncTrustableIps() // 同步可信任 IP 地址
cron.Collect() // 采集监控数据
3.1 初始化历史
open-falcon是通过/proc/stat和/proc/diskstats这两个系统文件采集cpu和disk的数据,保留最近两次的数据是为了处理delta类指标
/*
定时(每秒)通过 /proc/stat 和 /proc/diskstats 文件采集 cpu 和 disk 的数据,保留最近两次的数据
*/
func InitDataHistory() {
for {
funcs.UpdateCpuStat() // 更新 cpu 状态
funcs.UpdateDiskStats() // 更新 disk 状态
time.Sleep(g.COLLECT_INTERVAL) // 睡一个收集间隔(一秒)
}
}
3.1.1 cpu历史数据
看一下UpdateCpuStat(),比较简单
const (
historyCount int = 2 // 保留多少轮历史数据
)
var (
procStatHistory [historyCount]*nux.ProcStat // 存放 cpu 历史数据
psLock = new(sync.RWMutex) // 锁
)
/*
更新 cpu 状态
*/
func UpdateCpuStat() error {
// 读取 /proc/stat 文件
ps, err := nux.CurrentProcStat()
if err != nil {
return err
}
psLock.Lock()
defer psLock.Unlock()
// 抛弃过期的历史数据,给新数据腾出位置
for i := historyCount - 1; i > 0; i-- {
procStatHistory[i] = procStatHistory[i-1]
}
procStatHistory[0] = ps // 保存最新数据
return nil
}
/proc/stat这个文件记录了开机以来的cpu使用信息,比如在我的虚拟机上长这样。具体的含义可以看这篇博客【Linux】/proc/stat详解 完整验证版
- 以cpu开头的那一行表示所有cpu的数值总和,我的虚拟机只有一个cpu,所以cpu和cpu0完全一致
- cpu0之后的9个数字分别表示用户时间、nice用户时间、系统时间、空闲时间、IO等待时间、硬中断时间、软中断时间、steal时间、来宾时间
- intr表示中断,第一个数据是总中断次数,之后是某些中断发生的次数
- ctxt表示cpu上下文切换次数
- btime表示系统启动时间戳(这个我贴的那篇博文有误,他说是系统启动的秒,算算就知道不可能)
- processes表示创建process的个数
- procs_running表示当前运行队列的任务数目
- procs_blocked表示当前运行队列的阻塞任务数目
- softirq表示软中断,数据含义和intr类似
这个文件的读取是通过nux包完成的,读文件的逻辑略去不说,看一下读取之后返回的结构体
CpuUsage那里的Total是读取过程中把每个cpu时间累加得来的
func (this *ProcStat) String() string {
return fmt.Sprintf("<Cpu:%v, Cpus:%v, Ctxt:%d, Processes:%d, ProcsRunning:%d, ProcsBlocking:%d>",
this.Cpu,
this.Cpus,
this.Ctxt,
this.Processes,
this.ProcsRunning,
this.ProcsBlocked)
}
type CpuUsage struct {
User uint64 // time spent in user mode
Nice uint64 // time spent in user mode with low priority (nice)
System uint64 // time spent in system mode
Idle uint64 // time spent in the idle task
Iowait uint64 // time spent waiting for I/O to complete (since Linux 2.5.41)
Irq uint64 // time spent servicing interrupts (since 2.6.0-test4)
SoftIrq uint64 // time spent servicing softirqs (since 2.6.0-test4)
Steal uint64 // time spent in other OSes when running in a virtualized environment (since 2.6.11)
Guest uint64 // time spent running a virtual CPU for guest operating systems under the control of the Linux kernel. (since 2.6.24)
Total uint64 // total of all time fields
}
3.1.2 disk历史数据
UpdateDiskStats(),和cpu类似,唯一区别的是历史数据的保存形式从切片变成了映射,每个设备的信息分开了
var (
diskStatsMap = make(map[string][2]*nux.DiskStats) // 存放 disk 历史数据的映射,映射关系为设备名-设备信息
dsLock = new(sync.RWMutex) // 锁
)
/*
更新 disk 状态
*/
func UpdateDiskStats() error {
// 读取 /proc/diskstats 文件
dsList, err := nux.ListDiskStats()
if err != nil {
return err
}
dsLock.Lock()
defer dsLock.Unlock()
// 抛弃过期的历史数据,给新数据腾出位置,保存新数据
for i := 0; i < len(dsList); i++ {
device := dsList[i].Device
diskStatsMap[device] = [2]*nux.DiskStats{dsList[i], diskStatsMap[device][0]}
}
return nil
}
/proc/diskstats文件在我虚拟机上长这样,具体含义可以参考这篇博文/proc/diskstats文件注解
第一列为主设备号,第二列为次设备号,第三列为设备名称
后面的11列,表示读完成次数、合并读完成次数、读扇区次数、读操作花费毫秒、写完成次数、合并写完成次数、写扇区次数、写操作花费毫秒、正在处理的输入输出请求数、输入输出花费毫秒、输入输出花费的加权毫秒
读取也是通过nux包,返回的结构体如下
type DiskStats struct {
Major int
Minor int
Device string
ReadRequests uint64 // Total number of reads completed successfully.
ReadMerged uint64 // Adjacent read requests merged in a single req.
ReadSectors uint64 // Total number of sectors read successfully.
MsecRead uint64 // Total number of ms spent by all reads.
WriteRequests uint64 // total number of writes completed successfully.
WriteMerged uint64 // Adjacent write requests merged in a single req.
WriteSectors uint64 // total number of sectors written successfully.
MsecWrite uint64 // Total number of ms spent by all writes.
IosInProgress uint64 // Number of actual I/O requests currently in flight.
MsecTotal uint64 // Amount of time during which ios_in_progress >= 1.
MsecWeightedTotal uint64 // Measure of recent I/O completion time and backlog.
TS time.Time
}
3.2 向 hbs 发送心跳
代码如下,基本就是一个简单的rpc,远程调用hbs的Agent.ReportStatus方法
/*
向 hbs 汇报客户端状态
*/
func ReportAgentStatus() {
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" { // 如果允许向 hbs 发送心跳,并配置了 hbs 地址
go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second) // 定时发送心跳
}
}
/*
定时汇报客户端状态
*/
func reportAgentStatus(interval time.Duration) {
for {
hostname, err := g.Hostname() // 获取当前主机的 hostname,依次尝试从配置文件、环境变量 FALCON_ENDPOINT、系统 hostname 获取
if err != nil {
hostname = fmt.Sprintf("error:%s", err.Error())
}
// 生成汇报请求
req := model.AgentReportRequest{
Hostname: hostname,
IP: g.IP(), // IP
AgentVersion: g.VERSION, // 版本
PluginVersion: g.GetCurrPluginVersion(), // 插件版本号,即最后一次 git commit 的 hash
}
var resp model.SimpleRpcResponse
err = g.HbsClient.Call("Agent.ReportStatus", req, &resp) // rpc 调用 hbs 的 Agent.ReportStatus,获取响应
if err != nil || resp.Code != 0 {
log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp)
}
time.Sleep(interval) // 睡一个心跳汇报间隔时间
}
}
可以看一下hbsClient.Call函数的细节
/*
rpc 调用
*/
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {
this.Lock()
defer this.Unlock()
err := this.serverConn() // 连接服务器
if err != nil {
return err
}
timeout := time.Duration(10 * time.Second) // 超时时间为 10 秒
done := make(chan error, 1)
go func() {
err := this.rpcClient.Call(method, args, reply) // rpc 调用方法
done <- err
}()
select {
// 如果 rpc 调用超时,关闭连接,报错
case <-time.After(timeout):
log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
this.close()
return errors.New(this.RpcServer + " rpc call timeout")
// rpc 调用如果有报错,关闭连接,报错
case err := <-done:
if err != nil {
this.close()
return err
}
}
return nil
}
3.3 同步plugins
agent会用哈希表缓存plugins的相关信息以便定期执行,除此之外,还要定期去hbs获取用户在web页面上面配置的plugins路径来更新缓存,总之同步plugins的操作就是维护好这个存储plugins的哈希表
函数调用的ClearAllPlugins、DelNoUsePlugins、AddNewPlugins都在plugins.go里,而plugins.go又调用了scheduler.go
// Copyright 2017 Xiaomi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cron
import (
"github.com/open-falcon/falcon-plus/common/model"
"github.com/open-falcon/falcon-plus/modules/agent/g"
"github.com/open-falcon/falcon-plus/modules/agent/plugins"
"log"
"strings"
"time"
)
/*
同步用户自定义 plugins
*/
func SyncMinePlugins() {
// 配置未使能 plugin,返回
if !g.Config().Plugin.Enabled {
return
}
// 配置未使能 hbs,返回
if !g.Config().Heartbeat.Enabled {
return
}
// 未配置 hbs 地址,返回
if g.Config().Heartbeat.Addr == "" {
return
}
go syncMinePlugins()
}
/*
同步 plugins
*/
func syncMinePlugins() {
var (
timestamp int64 = -1
pluginDirs []string
)
duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
for {
time.Sleep(duration) // 睡一个周期间隔
hostname, err := g.Hostname() // 获取当前主机的 hostname,依次尝试从配置文件、环境变量 FALCON_ENDPOINT、系统 hostname 获取
if err != nil {
continue
}
req := model.AgentHeartbeatRequest{ // 新建 rpc 请求
Hostname: hostname,
}
var resp model.AgentPluginsResponse
err = g.HbsClient.Call("Agent.MinePlugins", req, &resp) // rpc 调用 hbs 的 Agent.MinePlugins
if err != nil {
log.Println("ERROR:", err)
continue
}
if resp.Timestamp <= timestamp { // 如果返回数据的时间戳小于等于上次的同步的时间戳,跳过本次同步
continue
}
pluginDirs = resp.Plugins // 获取用户在 web 页面配置的本主机 plugin 存放地址
timestamp = resp.Timestamp
if g.Config().Debug {
log.Println(&resp)
}
if len(pluginDirs) == 0 { // 若用户没有配置,清空缓存的 plugins
plugins.ClearAllPlugins()
}
desiredAll := make(map[string]*plugins.Plugin) // 保存配置的目录下所有符合命名规则的监控脚本
for _, p := range pluginDirs {
underOneDir := plugins.ListPlugins(strings.Trim(p, "/")) // 列出该目录下所有符合命名规则setp_xxx的监控脚本(用户配置的路径,前后的“/”会被去掉)
for k, v := range underOneDir {
desiredAll[k] = v
}
}
// 更新 plugins 缓存
plugins.DelNoUsePlugins(desiredAll) // 清除无用的
plugins.AddNewPlugins(desiredAll) // 增加新的
}
}
现在来看看plugins.go和scheduler.go,一点一点看
plugins.go里面保存了两个hash表,键值都是plugin的脚本目录,保存的对象一个叫Plugin一个叫PluginScheduler
var (
Plugins = make(map[string]*Plugin) // 保存用户在 web 配置的要执行的 plugin,键值是 plugin 脚本目录
PluginsWithScheduler = make(map[string]*PluginScheduler) // 保存定时执行的 plugin,键值是 plugin 脚本目录
)
看一下Plugin和PluginScheduler,Plugin其实就是从hbs获取的信息,PluginScheduler封装了一层,加了定时器和通道
/*
plugin
*/
type Plugin struct {
FilePath string // 文件目录
MTime int64 // 文件修改时间
Cycle int // 监控周期
}
/*
定时 plugin
*/
type PluginScheduler struct {
Ticker *time.Ticker // 定时器
Plugin *Plugin // plugin
Quit chan struct{} // 接收停止定时任务信号的通道
}
Plugin的增删改查十分简单
/*
停止不再使用的 plugin,并从缓存的 plugins 表中删除
*/
func DelNoUsePlugins(newPlugins map[string]*Plugin) {
for currKey, currPlugin := range Plugins {
newPlugin, ok := newPlugins[currKey]
if !ok || currPlugin.MTime != newPlugin.MTime { // 如果从 hbs 拉取的新 plugins 列表里没有该 plugin,或文件修改时间不同,删除老 plugin
deletePlugin(currKey)
}
}
}
/*
在缓存的 plugins 表中增加新 plugin,并启动定时执行 plugin
*/
func AddNewPlugins(newPlugins map[string]*Plugin) {
for fpath, newPlugin := range newPlugins {
if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime { // 如果该 plugin 已经存在,且文件修改时间符合,跳过添加
continue
}
Plugins[fpath] = newPlugin
sch := NewPluginScheduler(newPlugin) // 生成定时 plugin
PluginsWithScheduler[fpath] = sch
sch.Schedule() // 启动定时执行
}
}
/*
停止所有 plugin,清空缓存 plugins 表
*/
func ClearAllPlugins() {
for k := range Plugins {
deletePlugin(k)
}
}
/*
停止 plugin
*/
func deletePlugin(key string) {
v, ok := PluginsWithScheduler[key]
if ok {
v.Stop() // 停止定时任务
delete(PluginsWithScheduler, key) // 从缓存中删除
}
delete(Plugins, key) // 从缓存中删除
}
PluginScheduler主要是定时执行任务,就一个定时器加一个停止通道,每个Plugin对应一个单独的goroutine
/*
构造函数
*/
func NewPluginScheduler(p *Plugin) *PluginScheduler {
scheduler := PluginScheduler{Plugin: p}
scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second) // 定时器
scheduler.Quit = make(chan struct{})
return &scheduler
}
/*
启动定时任务
*/
func (this *PluginScheduler) Schedule() {
go func() {
for {
select {
case <-this.Ticker.C:
PluginRun(this.Plugin) // 定时任务
case <-this.Quit:
this.Ticker.Stop() // 收到停止信号,停止定时器,终止任务返回
return
}
}
}()
}
/*
停止任务
*/
func (this *PluginScheduler) Stop() {
close(this.Quit) // 关闭接收停止信号的通道
}
/*
运行一次 plugin 脚本
*/
func PluginRun(plugin *Plugin) {
timeout := plugin.Cycle*1000 - 500 // 任务超时时间,在下一次定时任务开始前 0.5 秒
fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath)
// 文件不存在报错
if !file.IsExist(fpath) {
log.Println("no such plugin:", fpath)
return
}
debug := g.Config().Debug
if debug {
log.Println(fpath, "running...")
}
// 执行脚本
cmd := exec.Command(fpath)
var stdout bytes.Buffer
cmd.Stdout = &stdout
var stderr bytes.Buffer
cmd.Stderr = &stderr
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
err := cmd.Start()
if err != nil {
log.Printf("[ERROR] plugin start fail, error: %s\n", err)
return
}
if debug {
log.Println("plugin started:", fpath)
}
err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond) // 执行命令
errStr := stderr.String()
if errStr != "" {
logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log")
if _, err = file.WriteString(logFile, errStr); err != nil {
log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err)
}
}
// 超时报错
if isTimeout {
if err == nil && debug {
log.Println("[INFO] timeout and kill process", fpath, "successfully")
}
if err != nil {
log.Println("[ERROR] kill process", fpath, "occur error:", err)
}
return
}
if err != nil {
log.Println("[ERROR] exec plugin", fpath, "fail. error:", err)
return
}
// 执行成功且没有输出,在 debug 模式下会记录
data := stdout.Bytes()
if len(data) == 0 {
if debug {
log.Println("[DEBUG] stdout of", fpath, "is blank")
}
return
}
// 输出转 MetricValue
var metrics []*model.MetricValue
err = json.Unmarshal(data, &metrics)
if err != nil {
log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String())
return
}
// 发送给 transfer
g.SendToTransfer(metrics)
}
3.4 从hbs同步内建Metric
嘛是内建Metric呢,比如最熟悉的net.port.listen就是open-falcon内建的一个metric,每个周期agent都去hbs获取用户配置好的内建metric,从tag中抽取出要监控的具体信息,比如net.port.listen/port=8080,就会抽出8080这个端口,缓存起来,定时采集数据上报
内建Metric都是在Template那里配置后,由hbs发送给客户端的,agent和hbs的交互大致和同步plugin的交互一样,都是rpc
这里一共有四种内建metric,url.check.health是判断url的响应是否超时的,net.port.listen监听端口,du.bs监控某个目录的磁盘,proc.num监控进程名或命令包含某些字符串的进程总数
/*
从 hbs 同步 BuiltinMetrics
*/
func SyncBuiltinMetrics() {
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
go syncBuiltinMetrics()
}
}
/*
同步 BuiltinMetrics
*/
func syncBuiltinMetrics() {
var timestamp int64 = -1
var checksum string = "nil"
duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
for {
time.Sleep(duration)
var ports = []int64{}
var paths = []string{}
var procs = make(map[string]map[int]string)
var urls = make(map[string]string)
hostname, err := g.Hostname()
if err != nil {
continue
}
req := model.AgentHeartbeatRequest{
Hostname: hostname,
Checksum: checksum,
}
var resp model.BuiltinMetricResponse
err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp) // rpc 调用 hbs 的 Agent.BuiltinMetrics 方法
if err != nil {
log.Println("ERROR:", err)
continue
}
// 通过时间戳和校验和判断响应是不是最新的
if resp.Timestamp <= timestamp {
continue
}
if resp.Checksum == checksum {
continue
}
timestamp = resp.Timestamp
checksum = resp.Checksum
for _, metric := range resp.Metrics {
if metric.Metric == g.URL_CHECK_HEALTH { // url 检测
arr := strings.Split(metric.Tags, ",")
if len(arr) != 2 {
continue
}
url := strings.Split(arr[0], "=")
if len(url) != 2 {
continue
}
stime := strings.Split(arr[1], "=")
if len(stime) != 2 {
continue
}
if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil {
urls[url[1]] = stime[1]
} else {
log.Println("metric ParseInt timeout failed:", err)
}
}
if metric.Metric == g.NET_PORT_LISTEN { // 端口检测
arr := strings.Split(metric.Tags, "=")
if len(arr) != 2 {
continue
}
if port, err := strconv.ParseInt(arr[1], 10, 64); err == nil {
ports = append(ports, port)
} else {
log.Println("metrics ParseInt failed:", err)
}
continue
}
if metric.Metric == g.DU_BS {
arr := strings.Split(metric.Tags, "=")
if len(arr) != 2 {
continue
}
paths = append(paths, strings.TrimSpace(arr[1]))
continue
}
if metric.Metric == g.PROC_NUM { // 进程数
arr := strings.Split(metric.Tags, ",")
tmpMap := make(map[int]string)
for i := 0; i < len(arr); i++ {
if strings.HasPrefix(arr[i], "name=") {
tmpMap[1] = strings.TrimSpace(arr[i][5:])
} else if strings.HasPrefix(arr[i], "cmdline=") {
tmpMap[2] = strings.TrimSpace(arr[i][8:])
}
}
procs[metric.Tags] = tmpMap
}
}
// 缓存配置
g.SetReportUrls(urls)
g.SetReportPorts(ports)
g.SetReportProcs(procs)
g.SetDuPaths(paths)
}
}
3.5 同步可信任IP
这个就更简单了,没啥新东西
/*
同步可信任 IP 地址
*/
func SyncTrustableIps() {
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
go syncTrustableIps()
}
}
/*
同步可信任 IP
*/
func syncTrustableIps() {
duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
for {
time.Sleep(duration)
var ips string
err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips) // rpc 调用 hbs 的 Agent.TrustableIps 方法
if err != nil {
log.Println("ERROR: call Agent.TrustableIps fail", err)
continue
}
g.SetTrustableIps(ips) // 缓存
}
}
3.6 采集监控数据
重点了,终于到了最关键的采集监控数据,其实里面这些采集函数都大同小异,都是通过第三方库nux读取一些系统文件,再做一些计算百分比之类的简单处理
collector.go只是一个壳,定义了一些封装的逻辑,采集数据的函数都保存在funcs.go里面的Mapper里
/*
采集数据
*/
func Collect() {
if !g.Config().Transfer.Enabled {
return
}
if len(g.Config().Transfer.Addrs) == 0 {
return
}
for _, v := range funcs.Mappers {
go collect(int64(v.Interval), v.Fs)
}
}
/*
采集单组数据
*/
func collect(sec int64, fns []func() []*model.MetricValue) {
t := time.NewTicker(time.Second * time.Duration(sec)) // 定时器
defer t.Stop()
for {
<-t.C
hostname, err := g.Hostname()
if err != nil {
continue
}
mvs := []*model.MetricValue{} // 保存采集后的 metric
ignoreMetrics := g.Config().IgnoreMetrics // 配置的ignore metric
for _, fn := range fns {
items := fn() // 调用函数,获取监控值
if items == nil {
continue
}
if len(items) == 0 {
continue
}
for _, mv := range items {
if b, ok := ignoreMetrics[mv.Metric]; ok && b { // 如果配置了忽略,跳过数据采集
continue
} else {
mvs = append(mvs, mv)
}
}
}
// 设置时间戳、endpoint、step
now := time.Now().Unix()
for j := 0; j < len(mvs); j++ {
mvs[j].Step = sec
mvs[j].Endpoint = hostname
mvs[j].Timestamp = now
}
// 发送给 transfer
g.SendToTransfer(mvs)
}
}
看一下发送transfer的代码,如果配置了多个transfer,他会随机选取地址进行尝试发送,直到发送成功为止
/*
监控数据发送给 transfer
*/
func SendToTransfer(metrics []*model.MetricValue) {
if len(metrics) == 0 {
return
}
dt := Config().DefaultTags // 读取配置的默认 tag
if len(dt) > 0 {
var buf bytes.Buffer
default_tags_list := []string{}
for k, v := range dt {
buf.Reset()
buf.WriteString(k)
buf.WriteString("=")
buf.WriteString(v)
default_tags_list = append(default_tags_list, buf.String())
}
default_tags := strings.Join(default_tags_list, ",") // 生成默认 tag 的字符串
// 给要上报的 metric 添加默认 tag
for i, x := range metrics {
buf.Reset()
if x.Tags == "" {
metrics[i].Tags = default_tags
} else {
buf.WriteString(metrics[i].Tags)
buf.WriteString(",")
buf.WriteString(default_tags)
metrics[i].Tags = buf.String()
}
}
}
debug := Config().Debug
if debug {
log.Printf("=> <Total=%d> %v\n", len(metrics), metrics[0]) // debug 模式下,记录每次上报 metric 的个数并记录第一个 metric
}
var resp model.TransferResponse
SendMetrics(metrics, &resp) // 发送给 transfer
if debug {
log.Println("<=", &resp)
}
}
var (
TransferClientsLock *sync.RWMutex = new(sync.RWMutex)
TransferClients map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{}
)
/*
发送数据给 transfer
*/
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
rand.Seed(time.Now().UnixNano())
for _, i := range rand.Perm(len(Config().Transfer.Addrs)) { // 随机在配置的 transfer 地址中选择一个尝试发送,失败则尝试下一个
addr := Config().Transfer.Addrs[i]
c := getTransferClient(addr)
if c == nil {
c = initTransferClient(addr)
}
if updateMetrics(c, metrics, resp) { // 数据发送成功就退出尝试
break
}
}
}
/*
初始化 transfer 连接客户端
*/
func initTransferClient(addr string) *SingleConnRpcClient {
var c *SingleConnRpcClient = &SingleConnRpcClient{
RpcServer: addr,
Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond,
}
TransferClientsLock.Lock()
defer TransferClientsLock.Unlock()
TransferClients[addr] = c
return c
}
/*
rpc 调用 transfer 的 Transfer.Update 方法
*/
func updateMetrics(c *SingleConnRpcClient, metrics []*model.MetricValue, resp *model.TransferResponse) bool {
err := c.Call("Transfer.Update", metrics, resp)
if err != nil {
log.Println("call Transfer.Update fail:", c, err)
return false
}
return true
}
/*
获取 transfer 连接客户端
*/
func getTransferClient(addr string) *SingleConnRpcClient {
TransferClientsLock.RLock()
defer TransferClientsLock.RUnlock()
if c, ok := TransferClients[addr]; ok {
return c
}
return nil
}
要采集的监控指标都在这里了,一个一个函数看
var Mappers []FuncsAndInterval
/*
收集数据的函数和发送数据间隔映射
*/
func BuildMappers() {
interval := g.Config().Transfer.Interval // 向 transfer 发送数据间隔
// 获取监控数据函数、与发送数据间隔映射
Mappers = []FuncsAndInterval{
{
Fs: []func() []*model.MetricValue{
AgentMetrics,
CpuMetrics,
NetMetrics,
KernelMetrics,
LoadAvgMetrics,
MemMetrics,
DiskIOMetrics,
IOStatsMetrics,
NetstatMetrics,
ProcMetrics,
UdpMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
DeviceMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
PortMetrics,
SocketStatSummaryMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
DuMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
UrlMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
GpuMetrics,
},
Interval: interval,
},
}
}
agent以前看过了,很简单
/*
获取 agent 的 MetricValue
返回值固定为 1,表示客户端正常工作
*/
func AgentMetrics() []*model.MetricValue {
return []*model.MetricValue{GaugeValue("agent.alive", 1)}
}
3.1说过,linux记录cpu的信息记录的是一个累计的值,所以为了获取每分钟cpu的数据,必须计算差值,所以才有了3.1中的记录历史数据的操作。因为有procStatHistory这个共用变量,每个函数基本都加了锁,防止每个周期之间的数据混淆,造成错误,比如计算cpu.idle/cpu.total,获取了cpu.total后UpdateCpuStat()函数被调用,更新了数值再获取cpu.idle,出来的结果就不对
cpu的各种时间百分比计算都大同小异,上下文切换这个函数没有处理数值,直接弄成了counter数据类型,之后会计算为speed
const (
historyCount int = 2 // 保留多少轮历史数据
)
var (
procStatHistory [historyCount]*nux.ProcStat // 存放 cpu 历史数据
psLock = new(sync.RWMutex) // 锁
)
/*
更新 cpu 状态
*/
func UpdateCpuStat() error {
// 读取 /proc/stat 文件
ps, err := nux.CurrentProcStat()
if err != nil {
return err
}
psLock.Lock()
defer psLock.Unlock()
// 抛弃过期的历史数据,给新数据腾出位置
for i := historyCount - 1; i > 0; i-- {
procStatHistory[i] = procStatHistory[i-1]
}
procStatHistory[0] = ps // 保存最新数据
return nil
}
/*
cpu total 最新一次的变化差值
*/
func deltaTotal() uint64 {
if procStatHistory[1] == nil {
return 0
}
return procStatHistory[0].Cpu.Total - procStatHistory[1].Cpu.Total
}
/*
计算 cpu idle 时间百分比
*/
func CpuIdle() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Idle-procStatHistory[1].Cpu.Idle) * invQuotient
}
/*
计算 cpu user 时间百分比
*/
func CpuUser() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.User-procStatHistory[1].Cpu.User) * invQuotient
}
/*
计算 cpu nice 时间百分比
*/
func CpuNice() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Nice-procStatHistory[1].Cpu.Nice) * invQuotient
}
/*
计算 cpu system 时间百分比
*/
func CpuSystem() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.System-procStatHistory[1].Cpu.System) * invQuotient
}
/*
计算 cpu io wait 时间百分比
*/
func CpuIowait() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Iowait-procStatHistory[1].Cpu.Iowait) * invQuotient
}
/*
计算 cpu 中断时间百分比
*/
func CpuIrq() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Irq-procStatHistory[1].Cpu.Irq) * invQuotient
}
/*
计算 cpu 软中断时间百分比
*/
func CpuSoftIrq() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.SoftIrq-procStatHistory[1].Cpu.SoftIrq) * invQuotient
}
/*
计算 cpu steal 时间百分比
*/
func CpuSteal() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Steal-procStatHistory[1].Cpu.Steal) * invQuotient
}
/*
计算 cpu guest 时间百分比
*/
func CpuGuest() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Guest-procStatHistory[1].Cpu.Guest) * invQuotient
}
/*
计算 cpu 上下文切换次数
*/
func CurrentCpuSwitches() uint64 {
psLock.RLock()
defer psLock.RUnlock()
return procStatHistory[0].Ctxt
}
/*
检测 cpu 数据是否已经准备完毕(至少需要两个周期的历史数据,才能计算差值)
*/
func CpuPrepared() bool {
psLock.RLock()
defer psLock.RUnlock()
return procStatHistory[1] != nil
}
/*
获取 cpu 的 MetricValue
*/
func CpuMetrics() []*model.MetricValue {
// 检查历史数据是否已经有了两个周期,足够计算差值
if !CpuPrepared() {
return []*model.MetricValue{}
}
// 获取各项 cpu 监控指标
cpuIdleVal := CpuIdle()
idle := GaugeValue("cpu.idle", cpuIdleVal)
busy := GaugeValue("cpu.busy", 100.0-cpuIdleVal)
user := GaugeValue("cpu.user", CpuUser())
nice := GaugeValue("cpu.nice", CpuNice())
system := GaugeValue("cpu.system", CpuSystem())
iowait := GaugeValue("cpu.iowait", CpuIowait())
irq := GaugeValue("cpu.irq", CpuIrq())
softirq := GaugeValue("cpu.softirq", CpuSoftIrq())
steal := GaugeValue("cpu.steal", CpuSteal())
guest := GaugeValue("cpu.guest", CpuGuest())
switches := CounterValue("cpu.switches", CurrentCpuSwitches()) // 利用了 counter 数据类型,之后会被计算为 speed
return []*model.MetricValue{idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches}
}
网卡的信息类似cpu的获取方式,也是调用了第三方库,通过系统文件/proc/net/dev和/sys/class/net/%s/speed获取网卡信息
这是我虚拟机的/proc/net/dev,一行对应一个网卡
左边的8列表示接收,分别是系统启动以来,接收的总字节数、包数、错误包数、丢包数、FIFO缓冲区错误数、分组帧错误数、压缩数据包数、多播帧数;右边的8列表示发送,大部分相同,不同的colls表示冲突数、carrier表示载波损耗的数量(说实话,已经完全无法理解了)
我虚拟机的/sys/class/net/eth0/speed,好吧就一个数,查了一下说是网卡最新的连接速度,差不多就是带宽啦,单位是Mbits/sec
第三方库还提供了一些处理过后的指标,比如出入流量占带宽百分比等
/*
网卡监控数据采集
*/
func NetMetrics() []*model.MetricValue {
return CoreNetMetrics(g.Config().Collector.IfacePrefix) // 获取配置的要监控网卡的前缀
}
/*
网卡监控
*/
func CoreNetMetrics(ifacePrefix []string) []*model.MetricValue {
netIfs, err := nux.NetIfs(ifacePrefix) // 通过系统文件 /proc/net/dev 和 /sys/class/net/%s/speed 获取网卡信息
if err != nil {
log.Println(err)
return []*model.MetricValue{}
}
cnt := len(netIfs)
ret := make([]*model.MetricValue, cnt*26) // 网卡的信息全放在一个一维数组里
for idx, netIf := range netIfs {
iface := "iface=" + netIf.Iface
ret[idx*26+0] = CounterValue("net.if.in.bytes", netIf.InBytes, iface)
ret[idx*26+1] = CounterValue("net.if.in.packets", netIf.InPackages, iface)
ret[idx*26+2] = CounterValue("net.if.in.errors", netIf.InErrors, iface)
ret[idx*26+3] = CounterValue("net.if.in.dropped", netIf.InDropped, iface)
ret[idx*26+4] = CounterValue("net.if.in.fifo.errs", netIf.InFifoErrs, iface)
ret[idx*26+5] = CounterValue("net.if.in.frame.errs", netIf.InFrameErrs, iface)
ret[idx*26+6] = CounterValue("net.if.in.compressed", netIf.InCompressed, iface)
ret[idx*26+7] = CounterValue("net.if.in.multicast", netIf.InMulticast, iface)
ret[idx*26+8] = CounterValue("net.if.out.bytes", netIf.OutBytes, iface)
ret[idx*26+9] = CounterValue("net.if.out.packets", netIf.OutPackages, iface)
ret[idx*26+10] = CounterValue("net.if.out.errors", netIf.OutErrors, iface)
ret[idx*26+11] = CounterValue("net.if.out.dropped", netIf.OutDropped, iface)
ret[idx*26+12] = CounterValue("net.if.out.fifo.errs", netIf.OutFifoErrs, iface)
ret[idx*26+13] = CounterValue("net.if.out.collisions", netIf.OutCollisions, iface)
ret[idx*26+14] = CounterValue("net.if.out.carrier.errs", netIf.OutCarrierErrs, iface)
ret[idx*26+15] = CounterValue("net.if.out.compressed", netIf.OutCompressed, iface)
ret[idx*26+16] = CounterValue("net.if.total.bytes", netIf.TotalBytes, iface) // 第三方库计算的,发送和接收的和
ret[idx*26+17] = CounterValue("net.if.total.packets", netIf.TotalPackages, iface)
ret[idx*26+18] = CounterValue("net.if.total.errors", netIf.TotalErrors, iface)
ret[idx*26+19] = CounterValue("net.if.total.dropped", netIf.TotalDropped, iface)
ret[idx*26+20] = GaugeValue("net.if.speed.bits", netIf.SpeedBits, iface) // 网卡最新的连接速度,即带宽,单位是 Mbits/sec
ret[idx*26+21] = CounterValue("net.if.in.percent", netIf.InPercent, iface) // 入流量占带宽百分比
ret[idx*26+22] = CounterValue("net.if.out.percent", netIf.OutPercent, iface) // 出流量占带宽百分比
ret[idx*26+23] = CounterValue("net.if.in.bits", netIf.InBytes*8, iface) // 计算 bit 数
ret[idx*26+24] = CounterValue("net.if.out.bits", netIf.OutBytes*8, iface)
ret[idx*26+25] = CounterValue("net.if.total.bits", netIf.TotalBytes*8, iface)
}
return ret
}
kernel的信息采集很简单,也是调第三方库,打开的系统文件基本都很简单,最复杂的就是/proc/sys/fs/file-nr,第一列表示系统已经分配并使用的文件描述符数,第二列表示已经分配但并未使用的文件描述符数,第三列表示系统支持的最大文件描述符数
文件描述符是一个进程打开文件时创建的,同一个文件可以有不同的文件描述符,因此这几项监控类似于打开文件的数量
/*
内核监控数据采集
*/
func KernelMetrics() (L []*model.MetricValue) {
maxFiles, err := nux.KernelMaxFiles() // 通过 /proc/sys/fs/file-max 获取系统允许的最大文件描述符数
if err != nil {
log.Println(err)
return
}
L = append(L, GaugeValue("kernel.maxfiles", maxFiles))
maxProc, err := nux.KernelMaxProc() // 通过 /proc/sys/kernel/pid_max 获取系统允许最大进程数
if err != nil {
log.Println(err)
return
}
L = append(L, GaugeValue("kernel.maxproc", maxProc))
allocateFiles, err := nux.KernelAllocateFiles() // 通过 /proc/sys/fs/file-nr 获取系统当前使用的文件描述符数
if err != nil {
log.Println(err)
return
}
L = append(L, GaugeValue("kernel.files.allocated", allocateFiles))
L = append(L, GaugeValue("kernel.files.left", maxFiles-allocateFiles)) // 系统还可以使用多少文件描述符
return
}
cpu负载是通过/proc/loadavg采集的,每列分别表示1、5、15分钟内在运行队列中等待或等待磁盘IO的进程、正在运行的进程数占进程总数百分比、最近运行的进程ID号
代码很短
/*
cpu 负载监控数据采集
*/
func LoadAvgMetrics() []*model.MetricValue {
load, err := nux.LoadAvg()
if err != nil {
log.Println(err)
return nil
}
return []*model.MetricValue{
GaugeValue("load.1min", load.Avg1min),
GaugeValue("load.5min", load.Avg5min),
GaugeValue("load.15min", load.Avg15min),
}
}
内存看的文件是/proc/meminfo,一行是一个参数,采集的指标浅显易懂,就不多讲
/*
memory 监控数据采集
*/
func MemMetrics() []*model.MetricValue {
m, err := nux.MemInfo()
if err != nil {
log.Println(err)
return nil
}
memFree := m.MemFree + m.Buffers + m.Cached // 空闲内存总量等于空闲内存 + 给文件的缓冲大小 + 高速缓冲存储器大小
if m.MemAvailable > 0 { // 如果能直接从 /proc/meminfo 获得空闲内存总量,使用文件中的值
memFree = m.MemAvailable
}
memUsed := m.MemTotal - memFree // 计算使用量
// 计算百分比
pmemFree := 0.0
pmemUsed := 0.0
if m.MemTotal != 0 {
pmemFree = float64(memFree) * 100.0 / float64(m.MemTotal)
pmemUsed = float64(memUsed) * 100.0 / float64(m.MemTotal)
}
pswapFree := 0.0
pswapUsed := 0.0
if m.SwapTotal != 0 {
pswapFree = float64(m.SwapFree) * 100.0 / float64(m.SwapTotal)
pswapUsed = float64(m.SwapUsed) * 100.0 / float64(m.SwapTotal)
}
return []*model.MetricValue{
GaugeValue("mem.memtotal", m.MemTotal),
GaugeValue("mem.memused", memUsed),
GaugeValue("mem.memfree", memFree),
GaugeValue("mem.swaptotal", m.SwapTotal),
GaugeValue("mem.swapused", m.SwapUsed),
GaugeValue("mem.swapfree", m.SwapFree),
GaugeValue("mem.memfree.percent", pmemFree),
GaugeValue("mem.memused.percent", pmemUsed),
GaugeValue("mem.swapfree.percent", pswapFree),
GaugeValue("mem.swapused.percent", pswapUsed),
}
}
磁盘IO监控有两个函数,DiskIOMetrics和IOStatsMetrics,都在一个文件里,磁盘信息是从/proc/diskstats这个文件读的,当然并不是所有磁盘的信息都会读出来,必须是sd、vd、xvd、fio、nvme开头的设备名、且长度符合规定的设备才会被监控,详见函数ShouldHandleDevice
/*
读完成次数
*/
func IOReadRequests(arr [2]*nux.DiskStats) uint64 {
return arr[0].ReadRequests - arr[1].ReadRequests
}
/*
合并读完成次数
*/
func IOReadMerged(arr [2]*nux.DiskStats) uint64 {
return arr[0].ReadMerged - arr[1].ReadMerged
}
/*
读扇区次数
*/
func IOReadSectors(arr [2]*nux.DiskStats) uint64 {
return arr[0].ReadSectors - arr[1].ReadSectors
}
/*
读操作花费毫秒
*/
func IOMsecRead(arr [2]*nux.DiskStats) uint64 {
return arr[0].MsecRead - arr[1].MsecRead
}
/*
写完成次数
*/
func IOWriteRequests(arr [2]*nux.DiskStats) uint64 {
return arr[0].WriteRequests - arr[1].WriteRequests
}
/*
合并写完成次数
*/
func IOWriteMerged(arr [2]*nux.DiskStats) uint64 {
return arr[0].WriteMerged - arr[1].WriteMerged
}
/*
写扇区次数
*/
func IOWriteSectors(arr [2]*nux.DiskStats) uint64 {
return arr[0].WriteSectors - arr[1].WriteSectors
}
/*
写操作花费毫秒
*/
func IOMsecWrite(arr [2]*nux.DiskStats) uint64 {
return arr[0].MsecWrite - arr[1].MsecWrite
}
/*
io 操作花费毫秒
*/
func IOMsecTotal(arr [2]*nux.DiskStats) uint64 {
return arr[0].MsecTotal - arr[1].MsecTotal
}
/*
io 操作花费加权毫秒
*/
func IOMsecWeightedTotal(arr [2]*nux.DiskStats) uint64 {
return arr[0].MsecWeightedTotal - arr[1].MsecWeightedTotal
}
/*
计算两次历史数据的时间间隔有多少秒
*/
func TS(arr [2]*nux.DiskStats) uint64 {
return uint64(arr[0].TS.Sub(arr[1].TS).Nanoseconds() / 1000000)
}
/*
根据设备名取出对应的一组历史数据,传递给相应函数
*/
func IODelta(device string, f func([2]*nux.DiskStats) uint64) uint64 {
val, ok := diskStatsMap[device]
if !ok {
return 0
}
if val[1] == nil {
return 0
}
return f(val)
}
/*
采集磁盘IO监控数据
*/
func DiskIOMetrics() (L []*model.MetricValue) {
dsList, err := nux.ListDiskStats()
if err != nil {
log.Println(err)
return
}
for _, ds := range dsList {
if !ShouldHandleDevice(ds.Device) { // 根据设备名的前缀判定是否要进行监控
continue
}
device := "device=" + ds.Device
L = append(L, CounterValue("disk.io.read_requests", ds.ReadRequests, device))
L = append(L, CounterValue("disk.io.read_merged", ds.ReadMerged, device))
L = append(L, CounterValue("disk.io.read_sectors", ds.ReadSectors, device))
L = append(L, CounterValue("disk.io.msec_read", ds.MsecRead, device))
L = append(L, CounterValue("disk.io.write_requests", ds.WriteRequests, device))
L = append(L, CounterValue("disk.io.write_merged", ds.WriteMerged, device))
L = append(L, CounterValue("disk.io.write_sectors", ds.WriteSectors, device))
L = append(L, CounterValue("disk.io.msec_write", ds.MsecWrite, device))
L = append(L, CounterValue("disk.io.ios_in_progress", ds.IosInProgress, device))
L = append(L, CounterValue("disk.io.msec_total", ds.MsecTotal, device))
L = append(L, CounterValue("disk.io.msec_weighted_total", ds.MsecWeightedTotal, device))
}
return
}
/*
采集磁盘IO监控数据
*/
func IOStatsMetrics() (L []*model.MetricValue) {
dsLock.RLock()
defer dsLock.RUnlock()
for device := range diskStatsMap {
if !ShouldHandleDevice(device) { // 根据设备名的前缀判定是否要进行监控
continue
}
tags := "device=" + device
rio := IODelta(device, IOReadRequests)
wio := IODelta(device, IOWriteRequests)
delta_rsec := IODelta(device, IOReadSectors)
delta_wsec := IODelta(device, IOWriteSectors)
ruse := IODelta(device, IOMsecRead)
wuse := IODelta(device, IOMsecWrite)
use := IODelta(device, IOMsecTotal)
n_io := rio + wio
avgrq_sz := 0.0
await := 0.0
svctm := 0.0
if n_io != 0 {
avgrq_sz = float64(delta_rsec+delta_wsec) / float64(n_io) // 平均每次 IO 操作读写扇区数
await = float64(ruse+wuse) / float64(n_io) // 平均每次 IO 操作等待时间
svctm = float64(use) / float64(n_io) // 平均每次 IO 操作服务时间
}
duration := IODelta(device, TS)
L = append(L, GaugeValue("disk.io.read_bytes", float64(delta_rsec)*512.0, tags))
L = append(L, GaugeValue("disk.io.write_bytes", float64(delta_wsec)*512.0, tags))
L = append(L, GaugeValue("disk.io.avgrq_sz", avgrq_sz, tags))
L = append(L, GaugeValue("disk.io.avgqu-sz", float64(IODelta(device, IOMsecWeightedTotal))/1000.0, tags)) // 平均 IO 队列长度,即 IO 加权时间(以秒为单位)
L = append(L, GaugeValue("disk.io.await", await, tags))
L = append(L, GaugeValue("disk.io.svctm", svctm, tags))
tmp := float64(use) * 100.0 / float64(duration)
if tmp > 100.0 {
tmp = 100.0
}
L = append(L, GaugeValue("disk.io.util", tmp, tags))
}
return
}
/*
判断该设备是否要监控
*/
func ShouldHandleDevice(device string) bool {
normal := len(device) == 3 && (strings.HasPrefix(device, "sd") || strings.HasPrefix(device, "vd"))
aws := len(device) >= 4 && strings.HasPrefix(device, "xvd")
flash := len(device) >= 4 && (strings.HasPrefix(device, "fio") || strings.HasPrefix(device, "nvme"))
return normal || aws || flash
}
网络监控是/proc/net/netstat和/proc/net/snmp这两个文件,NetstatMetrics和UdpMetrics两个函数的代码很简单,就是用第三方库读取文件,然后用counter类型把原始数据处理成speed展示出来。但是里面的指标已经超出了我的理解范围,这里略过不看,对指标感兴趣可以看这个文proc 文件系统调节参数介绍
进程监控,会调用第三方库去读取/proc目录下所有的数字目录,/proc下一个数字目录就表示对应pid号的进程信息,这里读取两个,一个是存储进程信息的status文件,一个是存储进程命令的cmdline文件
/*
进程监控数据采集
*/
func ProcMetrics() (L []*model.MetricValue) {
reportProcs := g.ReportProcs() // 读取配置的要监控的进程
sz := len(reportProcs)
if sz == 0 {
return
}
ps, err := nux.AllProcs() // 读取 /proc 下所有进程文件的进程名、命令行
if err != nil {
log.Println(err)
return
}
pslen := len(ps)
for tags, m := range reportProcs {
cnt := 0
for i := 0; i < pslen; i++ {
if is_a(ps[i], m) { // 如果符合用户设置的进程过滤规则
cnt++ // 计数加一
}
}
L = append(L, GaugeValue(g.PROC_NUM, cnt, tags)) // 符合用户配置的进程过滤规则的进程数
}
return
}
/*
根据用户配置的进程名和命令行,判断该进程是否匹配
*/
func is_a(p *nux.Proc, m map[int]string) bool {
// 判断进程名或命令是否匹配
for key, val := range m {
if key == 1 {
// 判断进程名是否相等
if val != p.Name {
return false
}
} else if key == 2 {
// 判断命令行是否包含指定字符串
if !strings.Contains(p.Cmdline, val) {
return false
}
}
}
return true
}
磁盘监控,根据agent的cfg.json里配置的collect.mountPoint略有不同:如果完全没有配置,open-falcon会采集所有磁盘的数据;如果配置了,只会采集配置的这些磁盘的数据
采集数据就是调第三方库读取/proc/mounts下的挂载磁盘信息,采集指标都很通俗易懂
/*
磁盘监控数据采集
*/
func DeviceMetrics() (L []*model.MetricValue) {
mountPoints, err := nux.ListMountPoint() // 读取 /proc/mounts 目录下挂载的设备信息
if err != nil {
log.Error("collect device metrics fail:", err)
return
}
var myMountPoints map[string]bool = make(map[string]bool)
if len(g.Config().Collector.MountPoint) > 0 {
for _, mp := range g.Config().Collector.MountPoint { // 获取配置的要监控的挂载设备
myMountPoints[mp] = true
}
}
var diskTotal uint64 = 0
var diskUsed uint64 = 0
for idx := range mountPoints {
fsSpec, fsFile, fsVfstype := mountPoints[idx][0], mountPoints[idx][1], mountPoints[idx][2] // 获取每个设备的分区定位、挂载目录、磁盘类型
if len(myMountPoints) > 0 { //如果配置了 cfg.json 的 collect.mountPoint ,则只检测配置中的磁盘
if _, ok := myMountPoints[fsFile]; !ok { // 跳过未配置的磁盘,debug 模式下,如果挂载设备未配置监控,会记录
log.Debug("mount point not matched with config", fsFile, "ignored.")
continue
}
}
var du *nux.DeviceUsage
du, err = nux.BuildDeviceUsage(fsSpec, fsFile, fsVfstype) // 获取指定挂载设备的信息
if err != nil {
log.Error(err)
continue
}
if du.BlocksAll == 0 {
continue
}
diskTotal += du.BlocksAll // 累加总容量
diskUsed += du.BlocksUsed // 累加使用的总容量
tags := fmt.Sprintf("mount=%s,fstype=%s", du.FsFile, du.FsVfstype)
L = append(L, GaugeValue("df.bytes.total", du.BlocksAll, tags))
L = append(L, GaugeValue("df.bytes.used", du.BlocksUsed, tags))
L = append(L, GaugeValue("df.bytes.free", du.BlocksFree, tags))
L = append(L, GaugeValue("df.bytes.used.percent", du.BlocksUsedPercent, tags))
L = append(L, GaugeValue("df.bytes.free.percent", du.BlocksFreePercent, tags))
if du.InodesAll == 0 {
continue
}
L = append(L, GaugeValue("df.inodes.total", du.InodesAll, tags))
L = append(L, GaugeValue("df.inodes.used", du.InodesUsed, tags))
L = append(L, GaugeValue("df.inodes.free", du.InodesFree, tags))
L = append(L, GaugeValue("df.inodes.used.percent", du.InodesUsedPercent, tags))
L = append(L, GaugeValue("df.inodes.free.percent", du.InodesFreePercent, tags))
}
if len(L) > 0 && diskTotal > 0 {
L = append(L, GaugeValue("df.statistics.total", float64(diskTotal)))
L = append(L, GaugeValue("df.statistics.used", float64(diskUsed)))
L = append(L, GaugeValue("df.statistics.used.percent", float64(diskUsed)*100.0/float64(diskTotal)))
}
return
}
端口监控,是通过系统命令查看的
/*
端口监控数据采集
*/
func PortMetrics() (L []*model.MetricValue) {
reportPorts := g.ReportPorts() // 获取配置的要监控的端口号
sz := len(reportPorts)
if sz == 0 {
return
}
allTcpPorts, err := nux.TcpPorts() // 通过命令 ss -t -l -n 列出所有正在 tcp 通信的端口号
if err != nil {
log.Println(err)
return
}
allUdpPorts, err := nux.UdpPorts() // 通过命令 ss -u -a -n 列出所有正在 udp 通信的端口号
if err != nil {
log.Println(err)
return
}
for i := 0; i < sz; i++ {
tags := fmt.Sprintf("port=%d", reportPorts[i])
if slice.ContainsInt64(allTcpPorts, reportPorts[i]) || slice.ContainsInt64(allUdpPorts, reportPorts[i]) { // 如果正在 tcp 或 udp 通信的端口中包含了改端口,就认为端口存活
L = append(L, GaugeValue(g.NET_PORT_LISTEN, 1, tags))
} else {
L = append(L, GaugeValue(g.NET_PORT_LISTEN, 0, tags))
}
}
return
}
socket状态监控是通过命令ss -s获得的,采集的是TCP开头的那一行括号中的指标
/*
socket 状态监控数据采集
*/
func SocketStatSummaryMetrics() (L []*model.MetricValue) {
ssMap, err := nux.SocketStatSummary() // 通过 ss -s 命令获取 tcp 连接状态
if err != nil {
log.Println(err)
return
}
for k, v := range ssMap {
L = append(L, GaugeValue("ss."+k, v))
}
return
}
目录监控,是通过命令du -bs采集的,目录很大的时候这个命令执行时间比较长,因此每个目录都是开了一个goroutine去做的,并且有超时响应的处理
/*
目录监控数据采集
*/
func DuMetrics() (L []*model.MetricValue) {
paths := g.DuPaths() // 获取配置的要监控的目录
result := make(chan *model.MetricValue, len(paths))
var wg sync.WaitGroup
for _, path := range paths {
wg.Add(1)
go func(path string) {
var err error
defer func() {
if err != nil {
log.Println(err)
result <- GaugeValue(g.DU_BS, -1, "path="+path) // 如果采集某个目录的监控指标出错,则返回 -1 作为监控值
}
wg.Done()
}()
cmd := exec.Command("du", "-bs", path) // 执行命令 du -bs 获取目录大小(单位为字节)
var stdout bytes.Buffer
cmd.Stdout = &stdout
var stderr bytes.Buffer
cmd.Stderr = &stderr
err = cmd.Start()
if err != nil {
return
}
err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Second) // 超时处理
if isTimeout {
err = errors.New(fmt.Sprintf("exec cmd : du -bs %s timeout", path))
return
}
errStr := stderr.String() // 命令报错处理
if errStr != "" {
err = errors.New(errStr)
return
}
if err != nil {
err = errors.New(fmt.Sprintf("du -bs %s failed: %s", path, err.Error()))
return
}
arr := strings.Fields(stdout.String())
if len(arr) < 2 {
err = errors.New(fmt.Sprintf("du -bs %s failed: %s", path, "return fields < 2"))
return
}
size, err := strconv.ParseUint(arr[0], 10, 64) // 命令返回的第一列为大小
if err != nil {
err = errors.New(fmt.Sprintf("cannot parse du -bs %s output", path))
return
}
result <- GaugeValue(g.DU_BS, size, "path="+path)
}(path)
}
wg.Wait()
resultLen := len(result)
for i := 0; i < resultLen; i++ {
L = append(L, <-result)
}
return
}
url的监控就是curl一下
/*
url 监控数据采集
*/
func UrlMetrics() (L []*model.MetricValue) {
reportUrls := g.ReportUrls() // 获取要监控的 url
sz := len(reportUrls)
if sz == 0 {
return
}
hostname, err := g.Hostname()
if err != nil {
hostname = "None"
}
for furl, timeout := range reportUrls {
tags := fmt.Sprintf("url=%v,timeout=%v,src=%v", furl, timeout, hostname) // 通过 curl 命令检查 url 是否在规定时间内响应
if ok, _ := probeUrl(furl, timeout); !ok {
L = append(L, GaugeValue(g.URL_CHECK_HEALTH, 0, tags))
continue
}
L = append(L, GaugeValue(g.URL_CHECK_HEALTH, 1, tags))
}
return
}
/*
检测 url 是否正常响应
*/
func probeUrl(furl string, timeout string) (bool, error) {
bs, err := sys.CmdOutBytes("curl", "--max-filesize", "102400", "-I", "-m", timeout, "-o", "/dev/null", "-s", "-w", "%{http_code}", furl)
if err != nil {
log.Printf("probe url [%v] failed.the err is: [%v]\n", furl, err)
return false, err
}
reader := bufio.NewReader(bytes.NewBuffer(bs))
retcode, err := file.ReadLine(reader) // 获取响应码
if err != nil {
log.Println("read retcode failed.err is:", err)
return false, err
}
if strings.TrimSpace(string(retcode)) != "200" { // 响应码为 200 才认为正常
log.Printf("return code [%v] is not 200.query url is [%v]", string(retcode), furl)
return false, err
}
return true, err
}
我电脑没有gpu,所以gpu那块没有仔细看,是用第三方库gonvml采集的数据,做一些单位换算就可以了,指标都通俗易懂
3.7 http服务
虽然main函数调用的是Start(),但是这个函数只是启动了http,关键的代码还是在http这个包的init()函数里
先看Start(),非常简单
/*
开启 http 服务
*/
func Start() {
if !g.Config().Http.Enabled { // 如果配置不开启 http,返回
return
}
addr := g.Config().Http.Listen // 获取 http 服务启动的地址
if addr == "" {
return
}
s := &http.Server{
Addr: addr,
MaxHeaderBytes: 1 << 30,
}
log.Println("listening", addr)
log.Fatalln(s.ListenAndServe()) // 开启 http
}
初始化函数里启动了很多API,这些才是真正的后端代码
/*
初始化 API
*/
func init() {
configAdminRoutes()
configCpuRoutes()
configDfRoutes()
configHealthRoutes()
configIoStatRoutes()
configKernelRoutes()
configMemoryRoutes()
configPageRoutes()
configPluginRoutes()
configPushRoutes()
configRunRoutes()
configSystemRoutes()
}
这个是管理用的API,前面一直提到的可信任IP就是在这里用的了,来自可信任IP的API请求可以完成某些重要操作,比如停止、加载配置
/*
管理类 API
*/
func configAdminRoutes() {
// 结束 agent 程序
http.HandleFunc("/exit", func(w http.ResponseWriter, r *http.Request) {
if g.IsTrustable(r.RemoteAddr) { // 来自可信任 IP 的调用才能接受
w.Write([]byte("exiting...")) // 成功退出,返回 exiting...
go func() {
time.Sleep(time.Second)
os.Exit(0)
}()
} else {
w.Write([]byte("no privilege")) // 不是来自可信任 IP,返回 no privilege
}
})
// 重新读取配置
http.HandleFunc("/config/reload", func(w http.ResponseWriter, r *http.Request) {
if g.IsTrustable(r.RemoteAddr) {
g.ParseConfig(g.ConfigFile)
RenderDataJson(w, g.Config()) // 返回 config 内容
} else {
w.Write([]byte("no privilege"))
}
})
// 返回工作路径
http.HandleFunc("/workdir", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, file.SelfDir())
})
// 返回可信任 IP
http.HandleFunc("/ips", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, g.TrustableIps())
})
}
获取cpu信息的API,基本直接调用了采集监控数据的函数
/*
CPU API
*/
func configCpuRoutes() {
// 获取 cpu 个数
http.HandleFunc("/proc/cpu/num", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, runtime.NumCPU())
})
// 获取 cpu 主频
http.HandleFunc("/proc/cpu/mhz", func(w http.ResponseWriter, r *http.Request) {
data, err := nux.CpuMHz()
AutoRender(w, data, err)
})
// 以字符串形式返回 cpu 使用情况
http.HandleFunc("/page/cpu/usage", func(w http.ResponseWriter, r *http.Request) {
if !funcs.CpuPrepared() {
RenderMsgJson(w, "not prepared")
return
}
idle := funcs.CpuIdle()
busy := 100.0 - idle
item := [10]string{
fmt.Sprintf("%.1f%%", idle),
fmt.Sprintf("%.1f%%", busy),
fmt.Sprintf("%.1f%%", funcs.CpuUser()),
fmt.Sprintf("%.1f%%", funcs.CpuNice()),
fmt.Sprintf("%.1f%%", funcs.CpuSystem()),
fmt.Sprintf("%.1f%%", funcs.CpuIowait()),
fmt.Sprintf("%.1f%%", funcs.CpuIrq()),
fmt.Sprintf("%.1f%%", funcs.CpuSoftIrq()),
fmt.Sprintf("%.1f%%", funcs.CpuSteal()),
fmt.Sprintf("%.1f%%", funcs.CpuGuest()),
}
RenderDataJson(w, [][10]string{item})
})
// 以 map 形式返回 cpu 使用情况
http.HandleFunc("/proc/cpu/usage", func(w http.ResponseWriter, r *http.Request) {
if !funcs.CpuPrepared() {
RenderMsgJson(w, "not prepared")
return
}
idle := funcs.CpuIdle()
busy := 100.0 - idle
RenderDataJson(w, map[string]interface{}{
"idle": idle,
"busy": busy,
"user": funcs.CpuUser(),
"nice": funcs.CpuNice(),
"system": funcs.CpuSystem(),
"iowait": funcs.CpuIowait(),
"irq": funcs.CpuIrq(),
"softirq": funcs.CpuSoftIrq(),
"steal": funcs.CpuSteal(),
"guest": funcs.CpuGuest(),
})
})
}
磁盘的API和监控的函数简直一模一样的逻辑
/*
磁盘 API
*/
func configDfRoutes() {
// 获取磁盘和文件系统对象的使用情况
http.HandleFunc("/page/df", func(w http.ResponseWriter, r *http.Request) {
mountPoints, err := nux.ListMountPoint()
if err != nil {
RenderMsgJson(w, err.Error())
return
}
var ret [][]interface{} = make([][]interface{}, 0)
for idx := range mountPoints {
var du *nux.DeviceUsage
du, err = nux.BuildDeviceUsage(mountPoints[idx][0], mountPoints[idx][1], mountPoints[idx][2])
if err == nil {
ret = append(ret,
[]interface{}{
du.FsSpec,
core.ReadableSize(float64(du.BlocksAll)),
core.ReadableSize(float64(du.BlocksUsed)),
core.ReadableSize(float64(du.BlocksFree)),
fmt.Sprintf("%.1f%%", du.BlocksUsedPercent),
du.FsFile,
core.ReadableSize(float64(du.InodesAll)),
core.ReadableSize(float64(du.InodesUsed)),
core.ReadableSize(float64(du.InodesFree)),
fmt.Sprintf("%.1f%%", du.InodesUsedPercent),
du.FsVfstype,
})
}
}
RenderDataJson(w, ret)
})
}
客户端自身健康度检测的API很简单
/*
客户端健康情况 API
*/
func configHealthRoutes() {
// 如果客户端正常工作,返回 ok
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})
// 返回客户端版本
http.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(g.VERSION))
})
}
IO状态的API,讲道理,这几个函数真的是一个人写的吗,有的直接调funcs里面的监控采集函数,有的还要写一遍同样的逻辑
/*
IO 状态 API
*/
func configIoStatRoutes() {
// 返回 IO 情况
http.HandleFunc("/page/diskio", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, funcs.IOStatsForPage())
})
}
内核信息的API
/*
内核信息 API
*/
func configKernelRoutes() {
// 返回 hostname,获取顺序是 agent 配置、系统环境变量 FALCON_ENDPOINT、从系统获取
http.HandleFunc("/proc/kernel/hostname", func(w http.ResponseWriter, r *http.Request) {
data, err := g.Hostname()
AutoRender(w, data, err)
})
// 返回最大进程数
http.HandleFunc("/proc/kernel/maxproc", func(w http.ResponseWriter, r *http.Request) {
data, err := nux.KernelMaxProc()
AutoRender(w, data, err)
})
// 返回最大文件数
http.HandleFunc("/proc/kernel/maxfiles", func(w http.ResponseWriter, r *http.Request) {
data, err := nux.KernelMaxFiles()
AutoRender(w, data, err)
})
// 返回内核版本
http.HandleFunc("/proc/kernel/version", func(w http.ResponseWriter, r *http.Request) {
data, err := sys.CmdOutNoLn("uname", "-r") // 执行命令 uname -r 获取内核版本
AutoRender(w, data, err)
})
}
内存API也没啥好说的,为啥返回的形式还要搞的这么多
/*
内存 API
*/
func configMemoryRoutes() {
// 以数组形式返回内存使用率信息
http.HandleFunc("/page/memory", func(w http.ResponseWriter, r *http.Request) {
mem, err := nux.MemInfo()
if err != nil {
RenderMsgJson(w, err.Error())
return
}
memFree := mem.MemFree + mem.Buffers + mem.Cached
memUsed := mem.MemTotal - memFree
var t uint64 = 1024 * 1024
RenderDataJson(w, []interface{}{mem.MemTotal / t, memUsed / t, memFree / t})
})
// 以 map 形式返回内存使用率信息
http.HandleFunc("/proc/memory", func(w http.ResponseWriter, r *http.Request) {
mem, err := nux.MemInfo()
if err != nil {
RenderMsgJson(w, err.Error())
return
}
memFree := mem.MemFree + mem.Buffers + mem.Cached
memUsed := mem.MemTotal - memFree
RenderDataJson(w, map[string]interface{}{
"total": mem.MemTotal,
"free": memFree,
"used": memUsed,
})
})
}
这个静态资源API其实目前只支持一个页面,就是客户端IP:port那个页面,当然你也可以自己在agent/public目录下,创建目录放其他的index.html,也可以获取到
/*
静态资源获取 API
*/
func configPageRoutes() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "/") {
if !file.IsExist(filepath.Join(g.Root, "/public", r.URL.Path, "index.html")) { // 访问 客户端IP: port/ 即可获取 index.html
http.NotFound(w, r)
return
}
}
http.FileServer(http.Dir(filepath.Join(g.Root, "/public"))).ServeHTTP(w, r)
})
}
plugin的API
/*
plugin API
*/
func configPluginRoutes() {
// 从 git 更新插件
http.HandleFunc("/plugin/update", func(w http.ResponseWriter, r *http.Request) {
if !g.Config().Plugin.Enabled {
w.Write([]byte("plugin not enabled"))
return
}
dir := g.Config().Plugin.Dir
parentDir := file.Dir(dir)
file.InsureDir(parentDir) // 确保 plugin 父目录存在,不存在创建
if file.IsExist(dir) { // 如果插件目录已经存在,pull
// git pull
cmd := exec.Command("git", "pull")
cmd.Dir = dir // 在 plugin 目录执行 pull
err := cmd.Run()
if err != nil {
w.Write([]byte(fmt.Sprintf("git pull in dir:%s fail. error: %s", dir, err)))
return
}
} else { // 如果插件目录不存在,clone
// git clone
cmd := exec.Command("git", "clone", g.Config().Plugin.Git, file.Basename(dir))
cmd.Dir = parentDir // 在父目录执行 clone
err := cmd.Run()
if err != nil {
w.Write([]byte(fmt.Sprintf("git clone in dir:%s fail. error: %s", parentDir, err)))
return
}
}
w.Write([]byte("success"))
})
// plugin 强制 reset
http.HandleFunc("/plugin/reset", func(w http.ResponseWriter, r *http.Request) {
if !g.Config().Plugin.Enabled {
w.Write([]byte("plugin not enabled"))
return
}
dir := g.Config().Plugin.Dir
// git reset
if file.IsExist(dir) {
cmd := exec.Command("git", "reset", "--hard")
cmd.Dir = dir
err := cmd.Run()
if err != nil {
w.Write([]byte(fmt.Sprintf("git reset --hard in dir:%s fail. error: %s", dir, err)))
return
}
}
w.Write([]byte("success"))
})
// 查看当前生效的 plugin
http.HandleFunc("/plugins", func(w http.ResponseWriter, r *http.Request) {
//TODO: not thread safe
RenderDataJson(w, plugins.Plugins)
})
}
push的API是最常用的,每次调用push他都会直接发给transfer,我记得美团对open-falcon的改进有一条就是增加了一个缓存,避免不同业务频繁调用push接口的造成的发送性能下降、数据丢失等问题
/*
push API
*/
func configPushRoutes() {
http.HandleFunc("/v1/push", func(w http.ResponseWriter, req *http.Request) {
if req.ContentLength == 0 {
http.Error(w, "body is blank", http.StatusBadRequest)
return
}
decoder := json.NewDecoder(req.Body)
var metrics []*model.MetricValue
err := decoder.Decode(&metrics) // push 的信息转为 metric
if err != nil {
http.Error(w, "connot decode body", http.StatusBadRequest)
return
}
g.SendToTransfer(metrics) // 发送给 transfer
w.Write([]byte("success"))
})
}
run的API可以在机器上执行一些命令,这个非常危险,所以配置的backdoor这项要求打开,请求的IP地址也要是可信任IP才行
/*
执行命令 API
*/
func configRunRoutes() {
http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
if !g.Config().Http.Backdoor { // 如果不允许 backdoor,报错返回
w.Write([]byte("/run disabled"))
return
}
if g.IsTrustable(r.RemoteAddr) { // 检测是否是可信任 IP
if r.ContentLength == 0 {
http.Error(w, "body is blank", http.StatusBadRequest)
return
}
bs, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
body := string(bs)
out, err := sys.CmdOutBytes("sh", "-c", body) // 执行命令
if err != nil {
w.Write([]byte("exec fail: " + err.Error()))
return
}
w.Write(out) // 返回命令输出
} else {
w.Write([]byte("no privilege"))
}
})
}
系统信息API,可以查询系统时间、系统启动时间、cpu负载
/*
系统信息 API
*/
func configSystemRoutes() {
// 获取当前时间
http.HandleFunc("/system/date", func(w http.ResponseWriter, req *http.Request) {
RenderDataJson(w, time.Now().Format("2006-01-02 15:04:05"))
})
// 以字符串形式返回系统启动时间
http.HandleFunc("/page/system/uptime", func(w http.ResponseWriter, req *http.Request) {
days, hours, mins, err := nux.SystemUptime()
AutoRender(w, fmt.Sprintf("%d days %d hours %d minutes", days, hours, mins), err)
})
// 以 map 形式返回系统启动时间
http.HandleFunc("/proc/system/uptime", func(w http.ResponseWriter, req *http.Request) {
days, hours, mins, err := nux.SystemUptime()
if err != nil {
RenderMsgJson(w, err.Error())
return
}
RenderDataJson(w, map[string]interface{}{
"days": days,
"hours": hours,
"mins": mins,
})
})
// 以数组形式返回 cpu load
http.HandleFunc("/page/system/loadavg", func(w http.ResponseWriter, req *http.Request) {
cpuNum := runtime.NumCPU()
load, err := nux.LoadAvg()
if err != nil {
RenderMsgJson(w, err.Error())
return
}
ret := [3][2]interface{}{
{load.Avg1min, int64(load.Avg1min * 100.0 / float64(cpuNum))},
{load.Avg5min, int64(load.Avg5min * 100.0 / float64(cpuNum))},
{load.Avg15min, int64(load.Avg15min * 100.0 / float64(cpuNum))},
}
RenderDataJson(w, ret)
})
// 以对象形式返回 cpu load
http.HandleFunc("/proc/system/loadavg", func(w http.ResponseWriter, req *http.Request) {
data, err := nux.LoadAvg()
AutoRender(w, data, err)
})
}