本人水準:參加工作六個月,剛看完一本《go實戰》的菜雞
代碼版本:2019年1月15日使用go get github.com/open-falcon/falcon-plus拉下來的代碼
agent的功能就是不停采集機器各種資料發送給其他子產品,提供一個自定義push metric的接口,向hbs發送一些心跳資訊
1 概覽
1.1 目錄結構

agent的代碼還是比較少的,先粗略講一下每個包都做什麼
- cron:定時任務,包括從hbs同步plugin、内建監控項和可信任IP等資訊,向hbs發送心跳資訊,收集監控資料
- funcs:收集監控資料的函數,基本都是通過第三方庫nux去收集的
- g:全局共享資訊的存放,包括配置資訊,日志,網絡通信的工具類,常量等
- http:API的後端實作
- plugins:使用者配置plugin的管理和啟動
1.2 main函數
上代碼,中文注釋都是我自己加的
前面的一些初始化代碼都很簡單,這裡略去不講,從BuildMappers開始分析
func main() {
// 指令行參數
cfg := flag.String("c", "cfg.json", "configuration file") // 配置檔案,預設為 cfg.json
version := flag.Bool("v", false, "show version") // 是否隻顯示版本,預設不顯示
check := flag.Bool("check", false, "check collector") // 是否隻檢查收集器,預設不檢查
// 擷取一下指令行參數
flag.Parse()
// 列印版本
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
// 檢查是否能正常采集監控資料
if *check {
funcs.CheckCollector()
os.Exit(0)
}
// 讀取配置檔案
g.ParseConfig(*cfg)
// 設定日志級别
if g.Config().Debug {
g.InitLog("debug")
} else {
g.InitLog("info")
}
// 初始化
g.InitRootDir() // 初始化根目錄
g.InitLocalIp() // 初始化本地 IP
g.InitRpcClients() // 初始化 hbs rpc 用戶端
// 建立收集資料的函數和發送資料間隔映射
funcs.BuildMappers()
// 定時任務(每秒),通過檔案 /proc/cpustats 和 /proc/diskstats,更新 cpu 和 disk 的原始資料,等待後一步處理
go cron.InitDataHistory()
// 定時任務
cron.ReportAgentStatus() // 向 hbs 發送用戶端心跳資訊
cron.SyncMinePlugins() // 同步 plugins
cron.SyncBuiltinMetrics() // 從 hbs 同步 BuiltinMetrics
cron.SyncTrustableIps() // 同步可信任 IP 位址
cron.Collect() // 采集監控資料
// 開啟 http 服務
go http.Start()
// 阻塞 main 函數
select {}
}
2 建立映射
看着很長,其實就是每組采集監控資料的函數(分組大概是基礎名額、device、網絡、GPU等等),和向transfer發送的資料間隔相對應放到一個叫FuncsAndInterval的自定義結構體裡面
/*
收集資料的函數和發送資料間隔映射
*/
func BuildMappers() {
interval := g.Config().Transfer.Interval // 向 transfer 發送資料間隔
// 擷取監控資料函數、與發送資料間隔映射
Mappers = []FuncsAndInterval{
{
Fs: []func() []*model.MetricValue{
AgentMetrics,
CpuMetrics,
NetMetrics,
KernelMetrics,
LoadAvgMetrics,
MemMetrics,
DiskIOMetrics,
IOStatsMetrics,
NetstatMetrics,
ProcMetrics,
UdpMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
DeviceMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
PortMetrics,
SocketStatSummaryMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
DuMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
UrlMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
GpuMetrics,
},
Interval: interval,
},
}
}
結構體FuncsAndInterval的定義,很簡單
type FuncsAndInterval struct {
Fs []func() []*model.MetricValue // 一組采集監控資料函數
Interval int // 向 transfer 發送資料間隔
}
采集監控資料函數的傳回值統一為[]*model.MetricValue,來看兩個采集監控資料的函數,先簡單掃一遍不追求細節
agent.alive是最熟悉的一個監控項了,不用其他邏輯來采集資料直接就是1
/*
擷取 agent 的 MetricValue
傳回值固定為 1,表示用戶端正常工作
*/
func AgentMetrics() []*model.MetricValue {
return []*model.MetricValue{GaugeValue("agent.alive", 1)}
}
cpu的就複雜很多,十來個,每個都調用了不同的函數去采集監控項
/*
擷取 cpu 的 MetricValue
*/
func CpuMetrics() []*model.MetricValue {
if !CpuPrepared() {
return []*model.MetricValue{}
}
// 擷取各項 cpu 監控名額
cpuIdleVal := CpuIdle()
idle := GaugeValue("cpu.idle", cpuIdleVal)
busy := GaugeValue("cpu.busy", 100.0-cpuIdleVal)
user := GaugeValue("cpu.user", CpuUser())
nice := GaugeValue("cpu.nice", CpuNice())
system := GaugeValue("cpu.system", CpuSystem())
iowait := GaugeValue("cpu.iowait", CpuIowait())
irq := GaugeValue("cpu.irq", CpuIrq())
softirq := GaugeValue("cpu.softirq", CpuSoftIrq())
steal := GaugeValue("cpu.steal", CpuSteal())
guest := GaugeValue("cpu.guest", CpuGuest())
switches := CounterValue("cpu.switches", CurrentCpuSwitches())
return []*model.MetricValue{idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches}
}
3 定時任務
這部分是agent功能的主體,每個定時任務都是開了一個goroutine去完成的,
// 定時任務(每秒),通過檔案 /proc/stat 和 /proc/diskstats,更新 cpu 和 disk 的原始資料,等待後一步處理
go cron.InitDataHistory()
// 定時任務
cron.ReportAgentStatus() // 向 hbs 發送用戶端心跳資訊
cron.SyncMinePlugins() // 同步 plugins
cron.SyncBuiltinMetrics() // 從 hbs 同步用戶端需要采集的 metric
cron.SyncTrustableIps() // 同步可信任 IP 位址
cron.Collect() // 采集監控資料
3.1 初始化曆史
open-falcon是通過/proc/stat和/proc/diskstats這兩個系統檔案采集cpu和disk的資料,保留最近兩次的資料是為了處理delta類名額
/*
定時(每秒)通過 /proc/stat 和 /proc/diskstats 檔案采集 cpu 和 disk 的資料,保留最近兩次的資料
*/
func InitDataHistory() {
for {
funcs.UpdateCpuStat() // 更新 cpu 狀态
funcs.UpdateDiskStats() // 更新 disk 狀态
time.Sleep(g.COLLECT_INTERVAL) // 睡一個收集間隔(一秒)
}
}
3.1.1 cpu曆史資料
看一下UpdateCpuStat(),比較簡單
const (
historyCount int = 2 // 保留多少輪曆史資料
)
var (
procStatHistory [historyCount]*nux.ProcStat // 存放 cpu 曆史資料
psLock = new(sync.RWMutex) // 鎖
)
/*
更新 cpu 狀态
*/
func UpdateCpuStat() error {
// 讀取 /proc/stat 檔案
ps, err := nux.CurrentProcStat()
if err != nil {
return err
}
psLock.Lock()
defer psLock.Unlock()
// 抛棄過期的曆史資料,給新資料騰出位置
for i := historyCount - 1; i > 0; i-- {
procStatHistory[i] = procStatHistory[i-1]
}
procStatHistory[0] = ps // 儲存最新資料
return nil
}
/proc/stat這個檔案記錄了開機以來的cpu使用資訊,比如在我的虛拟機上長這樣。具體的含義可以看這篇部落格【Linux】/proc/stat詳解 完整驗證版
- 以cpu開頭的那一行表示所有cpu的數值總和,我的虛拟機隻有一個cpu,是以cpu和cpu0完全一緻
- cpu0之後的9個數字分别表示使用者時間、nice使用者時間、系統時間、空閑時間、IO等待時間、硬中斷時間、軟中斷時間、steal時間、來賓時間
- intr表示中斷,第一個資料是總中斷次數,之後是某些中斷發生的次數
- ctxt表示cpu上下文切換次數
- btime表示系統啟動時間戳(這個我貼的那篇博文有誤,他說是系統啟動的秒,算算就知道不可能)
- processes表示建立process的個數
- procs_running表示目前運作隊列的任務數目
- procs_blocked表示目前運作隊列的阻塞任務數目
- softirq表示軟中斷,資料含義和intr類似
這個檔案的讀取是通過nux包完成的,讀檔案的邏輯略去不說,看一下讀取之後傳回的結構體
CpuUsage那裡的Total是讀取過程中把每個cpu時間累加得來的
func (this *ProcStat) String() string {
return fmt.Sprintf("<Cpu:%v, Cpus:%v, Ctxt:%d, Processes:%d, ProcsRunning:%d, ProcsBlocking:%d>",
this.Cpu,
this.Cpus,
this.Ctxt,
this.Processes,
this.ProcsRunning,
this.ProcsBlocked)
}
type CpuUsage struct {
User uint64 // time spent in user mode
Nice uint64 // time spent in user mode with low priority (nice)
System uint64 // time spent in system mode
Idle uint64 // time spent in the idle task
Iowait uint64 // time spent waiting for I/O to complete (since Linux 2.5.41)
Irq uint64 // time spent servicing interrupts (since 2.6.0-test4)
SoftIrq uint64 // time spent servicing softirqs (since 2.6.0-test4)
Steal uint64 // time spent in other OSes when running in a virtualized environment (since 2.6.11)
Guest uint64 // time spent running a virtual CPU for guest operating systems under the control of the Linux kernel. (since 2.6.24)
Total uint64 // total of all time fields
}
3.1.2 disk曆史資料
UpdateDiskStats(),和cpu類似,唯一差別的是曆史資料的儲存形式從切片變成了映射,每個裝置的資訊分開了
var (
diskStatsMap = make(map[string][2]*nux.DiskStats) // 存放 disk 曆史資料的映射,映射關系為裝置名-裝置資訊
dsLock = new(sync.RWMutex) // 鎖
)
/*
更新 disk 狀态
*/
func UpdateDiskStats() error {
// 讀取 /proc/diskstats 檔案
dsList, err := nux.ListDiskStats()
if err != nil {
return err
}
dsLock.Lock()
defer dsLock.Unlock()
// 抛棄過期的曆史資料,給新資料騰出位置,儲存新資料
for i := 0; i < len(dsList); i++ {
device := dsList[i].Device
diskStatsMap[device] = [2]*nux.DiskStats{dsList[i], diskStatsMap[device][0]}
}
return nil
}
/proc/diskstats檔案在我虛拟機上長這樣,具體含義可以參考這篇博文/proc/diskstats檔案注解
第一列為主裝置号,第二列為次裝置号,第三列為裝置名稱
後面的11列,表示讀完成次數、合并讀完成次數、讀扇區次數、讀操作花費毫秒、寫完成次數、合并寫完成次數、寫扇區次數、寫操作花費毫秒、正在處理的輸入輸出請求數、輸入輸出花費毫秒、輸入輸出花費的權重毫秒
讀取也是通過nux包,傳回的結構體如下
type DiskStats struct {
Major int
Minor int
Device string
ReadRequests uint64 // Total number of reads completed successfully.
ReadMerged uint64 // Adjacent read requests merged in a single req.
ReadSectors uint64 // Total number of sectors read successfully.
MsecRead uint64 // Total number of ms spent by all reads.
WriteRequests uint64 // total number of writes completed successfully.
WriteMerged uint64 // Adjacent write requests merged in a single req.
WriteSectors uint64 // total number of sectors written successfully.
MsecWrite uint64 // Total number of ms spent by all writes.
IosInProgress uint64 // Number of actual I/O requests currently in flight.
MsecTotal uint64 // Amount of time during which ios_in_progress >= 1.
MsecWeightedTotal uint64 // Measure of recent I/O completion time and backlog.
TS time.Time
}
3.2 向 hbs 發送心跳
代碼如下,基本就是一個簡單的rpc,遠端調用hbs的Agent.ReportStatus方法
/*
向 hbs 彙報用戶端狀态
*/
func ReportAgentStatus() {
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" { // 如果允許向 hbs 發送心跳,并配置了 hbs 位址
go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second) // 定時發送心跳
}
}
/*
定時彙報用戶端狀态
*/
func reportAgentStatus(interval time.Duration) {
for {
hostname, err := g.Hostname() // 擷取目前主機的 hostname,依次嘗試從配置檔案、環境變量 FALCON_ENDPOINT、系統 hostname 擷取
if err != nil {
hostname = fmt.Sprintf("error:%s", err.Error())
}
// 生成彙報請求
req := model.AgentReportRequest{
Hostname: hostname,
IP: g.IP(), // IP
AgentVersion: g.VERSION, // 版本
PluginVersion: g.GetCurrPluginVersion(), // 插件版本号,即最後一次 git commit 的 hash
}
var resp model.SimpleRpcResponse
err = g.HbsClient.Call("Agent.ReportStatus", req, &resp) // rpc 調用 hbs 的 Agent.ReportStatus,擷取響應
if err != nil || resp.Code != 0 {
log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp)
}
time.Sleep(interval) // 睡一個心跳彙報間隔時間
}
}
可以看一下hbsClient.Call函數的細節
/*
rpc 調用
*/
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {
this.Lock()
defer this.Unlock()
err := this.serverConn() // 連接配接伺服器
if err != nil {
return err
}
timeout := time.Duration(10 * time.Second) // 逾時時間為 10 秒
done := make(chan error, 1)
go func() {
err := this.rpcClient.Call(method, args, reply) // rpc 調用方法
done <- err
}()
select {
// 如果 rpc 調用逾時,關閉連接配接,報錯
case <-time.After(timeout):
log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
this.close()
return errors.New(this.RpcServer + " rpc call timeout")
// rpc 調用如果有報錯,關閉連接配接,報錯
case err := <-done:
if err != nil {
this.close()
return err
}
}
return nil
}
3.3 同步plugins
agent會用哈希表緩存plugins的相關資訊以便定期執行,除此之外,還要定期去hbs擷取使用者在web頁面上面配置的plugins路徑來更新緩存,總之同步plugins的操作就是維護好這個存儲plugins的哈希表
函數調用的ClearAllPlugins、DelNoUsePlugins、AddNewPlugins都在plugins.go裡,而plugins.go又調用了scheduler.go
// Copyright 2017 Xiaomi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cron
import (
"github.com/open-falcon/falcon-plus/common/model"
"github.com/open-falcon/falcon-plus/modules/agent/g"
"github.com/open-falcon/falcon-plus/modules/agent/plugins"
"log"
"strings"
"time"
)
/*
同步使用者自定義 plugins
*/
func SyncMinePlugins() {
// 配置未使能 plugin,傳回
if !g.Config().Plugin.Enabled {
return
}
// 配置未使能 hbs,傳回
if !g.Config().Heartbeat.Enabled {
return
}
// 未配置 hbs 位址,傳回
if g.Config().Heartbeat.Addr == "" {
return
}
go syncMinePlugins()
}
/*
同步 plugins
*/
func syncMinePlugins() {
var (
timestamp int64 = -1
pluginDirs []string
)
duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
for {
time.Sleep(duration) // 睡一個周期間隔
hostname, err := g.Hostname() // 擷取目前主機的 hostname,依次嘗試從配置檔案、環境變量 FALCON_ENDPOINT、系統 hostname 擷取
if err != nil {
continue
}
req := model.AgentHeartbeatRequest{ // 建立 rpc 請求
Hostname: hostname,
}
var resp model.AgentPluginsResponse
err = g.HbsClient.Call("Agent.MinePlugins", req, &resp) // rpc 調用 hbs 的 Agent.MinePlugins
if err != nil {
log.Println("ERROR:", err)
continue
}
if resp.Timestamp <= timestamp { // 如果傳回資料的時間戳小于等于上次的同步的時間戳,跳過本次同步
continue
}
pluginDirs = resp.Plugins // 擷取使用者在 web 頁面配置的本主機 plugin 存放位址
timestamp = resp.Timestamp
if g.Config().Debug {
log.Println(&resp)
}
if len(pluginDirs) == 0 { // 若使用者沒有配置,清空緩存的 plugins
plugins.ClearAllPlugins()
}
desiredAll := make(map[string]*plugins.Plugin) // 儲存配置的目錄下所有符合命名規則的監控腳本
for _, p := range pluginDirs {
underOneDir := plugins.ListPlugins(strings.Trim(p, "/")) // 列出該目錄下所有符合命名規則setp_xxx的監控腳本(使用者配置的路徑,前後的“/”會被去掉)
for k, v := range underOneDir {
desiredAll[k] = v
}
}
// 更新 plugins 緩存
plugins.DelNoUsePlugins(desiredAll) // 清除無用的
plugins.AddNewPlugins(desiredAll) // 增加新的
}
}
現在來看看plugins.go和scheduler.go,一點一點看
plugins.go裡面儲存了兩個hash表,鍵值都是plugin的腳本目錄,儲存的對象一個叫Plugin一個叫PluginScheduler
var (
Plugins = make(map[string]*Plugin) // 儲存使用者在 web 配置的要執行的 plugin,鍵值是 plugin 腳本目錄
PluginsWithScheduler = make(map[string]*PluginScheduler) // 儲存定時執行的 plugin,鍵值是 plugin 腳本目錄
)
看一下Plugin和PluginScheduler,Plugin其實就是從hbs擷取的資訊,PluginScheduler封裝了一層,加了定時器和通道
/*
plugin
*/
type Plugin struct {
FilePath string // 檔案目錄
MTime int64 // 檔案修改時間
Cycle int // 監控周期
}
/*
定時 plugin
*/
type PluginScheduler struct {
Ticker *time.Ticker // 定時器
Plugin *Plugin // plugin
Quit chan struct{} // 接收停止定時任務信号的通道
}
Plugin的增删改查十分簡單
/*
停止不再使用的 plugin,并從緩存的 plugins 表中删除
*/
func DelNoUsePlugins(newPlugins map[string]*Plugin) {
for currKey, currPlugin := range Plugins {
newPlugin, ok := newPlugins[currKey]
if !ok || currPlugin.MTime != newPlugin.MTime { // 如果從 hbs 拉取的新 plugins 清單裡沒有該 plugin,或檔案修改時間不同,删除老 plugin
deletePlugin(currKey)
}
}
}
/*
在緩存的 plugins 表中增加新 plugin,并啟動定時執行 plugin
*/
func AddNewPlugins(newPlugins map[string]*Plugin) {
for fpath, newPlugin := range newPlugins {
if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime { // 如果該 plugin 已經存在,且檔案修改時間符合,跳過添加
continue
}
Plugins[fpath] = newPlugin
sch := NewPluginScheduler(newPlugin) // 生成定時 plugin
PluginsWithScheduler[fpath] = sch
sch.Schedule() // 啟動定時執行
}
}
/*
停止所有 plugin,清空緩存 plugins 表
*/
func ClearAllPlugins() {
for k := range Plugins {
deletePlugin(k)
}
}
/*
停止 plugin
*/
func deletePlugin(key string) {
v, ok := PluginsWithScheduler[key]
if ok {
v.Stop() // 停止定時任務
delete(PluginsWithScheduler, key) // 從緩存中删除
}
delete(Plugins, key) // 從緩存中删除
}
PluginScheduler主要是定時執行任務,就一個定時器加一個停止通道,每個Plugin對應一個單獨的goroutine
/*
構造函數
*/
func NewPluginScheduler(p *Plugin) *PluginScheduler {
scheduler := PluginScheduler{Plugin: p}
scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second) // 定時器
scheduler.Quit = make(chan struct{})
return &scheduler
}
/*
啟動定時任務
*/
func (this *PluginScheduler) Schedule() {
go func() {
for {
select {
case <-this.Ticker.C:
PluginRun(this.Plugin) // 定時任務
case <-this.Quit:
this.Ticker.Stop() // 收到停止信号,停止定時器,終止任務傳回
return
}
}
}()
}
/*
停止任務
*/
func (this *PluginScheduler) Stop() {
close(this.Quit) // 關閉接收停止信号的通道
}
/*
運作一次 plugin 腳本
*/
func PluginRun(plugin *Plugin) {
timeout := plugin.Cycle*1000 - 500 // 任務逾時時間,在下一次定時任務開始前 0.5 秒
fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath)
// 檔案不存在報錯
if !file.IsExist(fpath) {
log.Println("no such plugin:", fpath)
return
}
debug := g.Config().Debug
if debug {
log.Println(fpath, "running...")
}
// 執行腳本
cmd := exec.Command(fpath)
var stdout bytes.Buffer
cmd.Stdout = &stdout
var stderr bytes.Buffer
cmd.Stderr = &stderr
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
err := cmd.Start()
if err != nil {
log.Printf("[ERROR] plugin start fail, error: %s\n", err)
return
}
if debug {
log.Println("plugin started:", fpath)
}
err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond) // 執行指令
errStr := stderr.String()
if errStr != "" {
logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log")
if _, err = file.WriteString(logFile, errStr); err != nil {
log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err)
}
}
// 逾時報錯
if isTimeout {
if err == nil && debug {
log.Println("[INFO] timeout and kill process", fpath, "successfully")
}
if err != nil {
log.Println("[ERROR] kill process", fpath, "occur error:", err)
}
return
}
if err != nil {
log.Println("[ERROR] exec plugin", fpath, "fail. error:", err)
return
}
// 執行成功且沒有輸出,在 debug 模式下會記錄
data := stdout.Bytes()
if len(data) == 0 {
if debug {
log.Println("[DEBUG] stdout of", fpath, "is blank")
}
return
}
// 輸出轉 MetricValue
var metrics []*model.MetricValue
err = json.Unmarshal(data, &metrics)
if err != nil {
log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String())
return
}
// 發送給 transfer
g.SendToTransfer(metrics)
}
3.4 從hbs同步内建Metric
嘛是内建Metric呢,比如最熟悉的net.port.listen就是open-falcon内建的一個metric,每個周期agent都去hbs擷取使用者配置好的内建metric,從tag中抽取出要監控的具體資訊,比如net.port.listen/port=8080,就會抽出8080這個端口,緩存起來,定時采集資料上報
内建Metric都是在Template那裡配置後,由hbs發送給用戶端的,agent和hbs的互動大緻和同步plugin的互動一樣,都是rpc
這裡一共有四種内建metric,url.check.health是判斷url的響應是否逾時的,net.port.listen監聽端口,du.bs監控某個目錄的磁盤,proc.num監控程序名或指令包含某些字元串的程序總數
/*
從 hbs 同步 BuiltinMetrics
*/
func SyncBuiltinMetrics() {
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
go syncBuiltinMetrics()
}
}
/*
同步 BuiltinMetrics
*/
func syncBuiltinMetrics() {
var timestamp int64 = -1
var checksum string = "nil"
duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
for {
time.Sleep(duration)
var ports = []int64{}
var paths = []string{}
var procs = make(map[string]map[int]string)
var urls = make(map[string]string)
hostname, err := g.Hostname()
if err != nil {
continue
}
req := model.AgentHeartbeatRequest{
Hostname: hostname,
Checksum: checksum,
}
var resp model.BuiltinMetricResponse
err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp) // rpc 調用 hbs 的 Agent.BuiltinMetrics 方法
if err != nil {
log.Println("ERROR:", err)
continue
}
// 通過時間戳和校驗和判斷響應是不是最新的
if resp.Timestamp <= timestamp {
continue
}
if resp.Checksum == checksum {
continue
}
timestamp = resp.Timestamp
checksum = resp.Checksum
for _, metric := range resp.Metrics {
if metric.Metric == g.URL_CHECK_HEALTH { // url 檢測
arr := strings.Split(metric.Tags, ",")
if len(arr) != 2 {
continue
}
url := strings.Split(arr[0], "=")
if len(url) != 2 {
continue
}
stime := strings.Split(arr[1], "=")
if len(stime) != 2 {
continue
}
if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil {
urls[url[1]] = stime[1]
} else {
log.Println("metric ParseInt timeout failed:", err)
}
}
if metric.Metric == g.NET_PORT_LISTEN { // 端口檢測
arr := strings.Split(metric.Tags, "=")
if len(arr) != 2 {
continue
}
if port, err := strconv.ParseInt(arr[1], 10, 64); err == nil {
ports = append(ports, port)
} else {
log.Println("metrics ParseInt failed:", err)
}
continue
}
if metric.Metric == g.DU_BS {
arr := strings.Split(metric.Tags, "=")
if len(arr) != 2 {
continue
}
paths = append(paths, strings.TrimSpace(arr[1]))
continue
}
if metric.Metric == g.PROC_NUM { // 程序數
arr := strings.Split(metric.Tags, ",")
tmpMap := make(map[int]string)
for i := 0; i < len(arr); i++ {
if strings.HasPrefix(arr[i], "name=") {
tmpMap[1] = strings.TrimSpace(arr[i][5:])
} else if strings.HasPrefix(arr[i], "cmdline=") {
tmpMap[2] = strings.TrimSpace(arr[i][8:])
}
}
procs[metric.Tags] = tmpMap
}
}
// 緩存配置
g.SetReportUrls(urls)
g.SetReportPorts(ports)
g.SetReportProcs(procs)
g.SetDuPaths(paths)
}
}
3.5 同步可信任IP
這個就更簡單了,沒啥新東西
/*
同步可信任 IP 位址
*/
func SyncTrustableIps() {
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
go syncTrustableIps()
}
}
/*
同步可信任 IP
*/
func syncTrustableIps() {
duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
for {
time.Sleep(duration)
var ips string
err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips) // rpc 調用 hbs 的 Agent.TrustableIps 方法
if err != nil {
log.Println("ERROR: call Agent.TrustableIps fail", err)
continue
}
g.SetTrustableIps(ips) // 緩存
}
}
3.6 采集監控資料
重點了,終于到了最關鍵的采集監控資料,其實裡面這些采集函數都大同小異,都是通過第三方庫nux讀取一些系統檔案,再做一些計算百分比之類的簡單處理
collector.go隻是一個殼,定義了一些封裝的邏輯,采集資料的函數都儲存在funcs.go裡面的Mapper裡
/*
采集資料
*/
func Collect() {
if !g.Config().Transfer.Enabled {
return
}
if len(g.Config().Transfer.Addrs) == 0 {
return
}
for _, v := range funcs.Mappers {
go collect(int64(v.Interval), v.Fs)
}
}
/*
采集單組資料
*/
func collect(sec int64, fns []func() []*model.MetricValue) {
t := time.NewTicker(time.Second * time.Duration(sec)) // 定時器
defer t.Stop()
for {
<-t.C
hostname, err := g.Hostname()
if err != nil {
continue
}
mvs := []*model.MetricValue{} // 儲存采集後的 metric
ignoreMetrics := g.Config().IgnoreMetrics // 配置的ignore metric
for _, fn := range fns {
items := fn() // 調用函數,擷取監控值
if items == nil {
continue
}
if len(items) == 0 {
continue
}
for _, mv := range items {
if b, ok := ignoreMetrics[mv.Metric]; ok && b { // 如果配置了忽略,跳過資料采集
continue
} else {
mvs = append(mvs, mv)
}
}
}
// 設定時間戳、endpoint、step
now := time.Now().Unix()
for j := 0; j < len(mvs); j++ {
mvs[j].Step = sec
mvs[j].Endpoint = hostname
mvs[j].Timestamp = now
}
// 發送給 transfer
g.SendToTransfer(mvs)
}
}
看一下發送transfer的代碼,如果配置了多個transfer,他會随機選取位址進行嘗試發送,直到發送成功為止
/*
監控資料發送給 transfer
*/
func SendToTransfer(metrics []*model.MetricValue) {
if len(metrics) == 0 {
return
}
dt := Config().DefaultTags // 讀取配置的預設 tag
if len(dt) > 0 {
var buf bytes.Buffer
default_tags_list := []string{}
for k, v := range dt {
buf.Reset()
buf.WriteString(k)
buf.WriteString("=")
buf.WriteString(v)
default_tags_list = append(default_tags_list, buf.String())
}
default_tags := strings.Join(default_tags_list, ",") // 生成預設 tag 的字元串
// 給要上報的 metric 添加預設 tag
for i, x := range metrics {
buf.Reset()
if x.Tags == "" {
metrics[i].Tags = default_tags
} else {
buf.WriteString(metrics[i].Tags)
buf.WriteString(",")
buf.WriteString(default_tags)
metrics[i].Tags = buf.String()
}
}
}
debug := Config().Debug
if debug {
log.Printf("=> <Total=%d> %v\n", len(metrics), metrics[0]) // debug 模式下,記錄每次上報 metric 的個數并記錄第一個 metric
}
var resp model.TransferResponse
SendMetrics(metrics, &resp) // 發送給 transfer
if debug {
log.Println("<=", &resp)
}
}
var (
TransferClientsLock *sync.RWMutex = new(sync.RWMutex)
TransferClients map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{}
)
/*
發送資料給 transfer
*/
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
rand.Seed(time.Now().UnixNano())
for _, i := range rand.Perm(len(Config().Transfer.Addrs)) { // 随機在配置的 transfer 位址中選擇一個嘗試發送,失敗則嘗試下一個
addr := Config().Transfer.Addrs[i]
c := getTransferClient(addr)
if c == nil {
c = initTransferClient(addr)
}
if updateMetrics(c, metrics, resp) { // 資料發送成功就退出嘗試
break
}
}
}
/*
初始化 transfer 連接配接用戶端
*/
func initTransferClient(addr string) *SingleConnRpcClient {
var c *SingleConnRpcClient = &SingleConnRpcClient{
RpcServer: addr,
Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond,
}
TransferClientsLock.Lock()
defer TransferClientsLock.Unlock()
TransferClients[addr] = c
return c
}
/*
rpc 調用 transfer 的 Transfer.Update 方法
*/
func updateMetrics(c *SingleConnRpcClient, metrics []*model.MetricValue, resp *model.TransferResponse) bool {
err := c.Call("Transfer.Update", metrics, resp)
if err != nil {
log.Println("call Transfer.Update fail:", c, err)
return false
}
return true
}
/*
擷取 transfer 連接配接用戶端
*/
func getTransferClient(addr string) *SingleConnRpcClient {
TransferClientsLock.RLock()
defer TransferClientsLock.RUnlock()
if c, ok := TransferClients[addr]; ok {
return c
}
return nil
}
要采集的監控名額都在這裡了,一個一個函數看
var Mappers []FuncsAndInterval
/*
收集資料的函數和發送資料間隔映射
*/
func BuildMappers() {
interval := g.Config().Transfer.Interval // 向 transfer 發送資料間隔
// 擷取監控資料函數、與發送資料間隔映射
Mappers = []FuncsAndInterval{
{
Fs: []func() []*model.MetricValue{
AgentMetrics,
CpuMetrics,
NetMetrics,
KernelMetrics,
LoadAvgMetrics,
MemMetrics,
DiskIOMetrics,
IOStatsMetrics,
NetstatMetrics,
ProcMetrics,
UdpMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
DeviceMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
PortMetrics,
SocketStatSummaryMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
DuMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
UrlMetrics,
},
Interval: interval,
},
{
Fs: []func() []*model.MetricValue{
GpuMetrics,
},
Interval: interval,
},
}
}
agent以前看過了,很簡單
/*
擷取 agent 的 MetricValue
傳回值固定為 1,表示用戶端正常工作
*/
func AgentMetrics() []*model.MetricValue {
return []*model.MetricValue{GaugeValue("agent.alive", 1)}
}
3.1說過,linux記錄cpu的資訊記錄的是一個累計的值,是以為了擷取每分鐘cpu的資料,必須計算內插補點,是以才有了3.1中的記錄曆史資料的操作。因為有procStatHistory這個共用變量,每個函數基本都加了鎖,防止每個周期之間的資料混淆,造成錯誤,比如計算cpu.idle/cpu.total,擷取了cpu.total後UpdateCpuStat()函數被調用,更新了數值再擷取cpu.idle,出來的結果就不對
cpu的各種時間百分比計算都大同小異,上下文切換這個函數沒有處理數值,直接弄成了counter資料類型,之後會計算為speed
const (
historyCount int = 2 // 保留多少輪曆史資料
)
var (
procStatHistory [historyCount]*nux.ProcStat // 存放 cpu 曆史資料
psLock = new(sync.RWMutex) // 鎖
)
/*
更新 cpu 狀态
*/
func UpdateCpuStat() error {
// 讀取 /proc/stat 檔案
ps, err := nux.CurrentProcStat()
if err != nil {
return err
}
psLock.Lock()
defer psLock.Unlock()
// 抛棄過期的曆史資料,給新資料騰出位置
for i := historyCount - 1; i > 0; i-- {
procStatHistory[i] = procStatHistory[i-1]
}
procStatHistory[0] = ps // 儲存最新資料
return nil
}
/*
cpu total 最新一次的變化內插補點
*/
func deltaTotal() uint64 {
if procStatHistory[1] == nil {
return 0
}
return procStatHistory[0].Cpu.Total - procStatHistory[1].Cpu.Total
}
/*
計算 cpu idle 時間百分比
*/
func CpuIdle() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Idle-procStatHistory[1].Cpu.Idle) * invQuotient
}
/*
計算 cpu user 時間百分比
*/
func CpuUser() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.User-procStatHistory[1].Cpu.User) * invQuotient
}
/*
計算 cpu nice 時間百分比
*/
func CpuNice() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Nice-procStatHistory[1].Cpu.Nice) * invQuotient
}
/*
計算 cpu system 時間百分比
*/
func CpuSystem() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.System-procStatHistory[1].Cpu.System) * invQuotient
}
/*
計算 cpu io wait 時間百分比
*/
func CpuIowait() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Iowait-procStatHistory[1].Cpu.Iowait) * invQuotient
}
/*
計算 cpu 中斷時間百分比
*/
func CpuIrq() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Irq-procStatHistory[1].Cpu.Irq) * invQuotient
}
/*
計算 cpu 軟中斷時間百分比
*/
func CpuSoftIrq() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.SoftIrq-procStatHistory[1].Cpu.SoftIrq) * invQuotient
}
/*
計算 cpu steal 時間百分比
*/
func CpuSteal() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Steal-procStatHistory[1].Cpu.Steal) * invQuotient
}
/*
計算 cpu guest 時間百分比
*/
func CpuGuest() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Guest-procStatHistory[1].Cpu.Guest) * invQuotient
}
/*
計算 cpu 上下文切換次數
*/
func CurrentCpuSwitches() uint64 {
psLock.RLock()
defer psLock.RUnlock()
return procStatHistory[0].Ctxt
}
/*
檢測 cpu 資料是否已經準備完畢(至少需要兩個周期的曆史資料,才能計算內插補點)
*/
func CpuPrepared() bool {
psLock.RLock()
defer psLock.RUnlock()
return procStatHistory[1] != nil
}
/*
擷取 cpu 的 MetricValue
*/
func CpuMetrics() []*model.MetricValue {
// 檢查曆史資料是否已經有了兩個周期,足夠計算內插補點
if !CpuPrepared() {
return []*model.MetricValue{}
}
// 擷取各項 cpu 監控名額
cpuIdleVal := CpuIdle()
idle := GaugeValue("cpu.idle", cpuIdleVal)
busy := GaugeValue("cpu.busy", 100.0-cpuIdleVal)
user := GaugeValue("cpu.user", CpuUser())
nice := GaugeValue("cpu.nice", CpuNice())
system := GaugeValue("cpu.system", CpuSystem())
iowait := GaugeValue("cpu.iowait", CpuIowait())
irq := GaugeValue("cpu.irq", CpuIrq())
softirq := GaugeValue("cpu.softirq", CpuSoftIrq())
steal := GaugeValue("cpu.steal", CpuSteal())
guest := GaugeValue("cpu.guest", CpuGuest())
switches := CounterValue("cpu.switches", CurrentCpuSwitches()) // 利用了 counter 資料類型,之後會被計算為 speed
return []*model.MetricValue{idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches}
}
網卡的資訊類似cpu的擷取方式,也是調用了第三方庫,通過系統檔案/proc/net/dev和/sys/class/net/%s/speed擷取網卡資訊
這是我虛拟機的/proc/net/dev,一行對應一個網卡
左邊的8清單示接收,分别是系統啟動以來,接收的總位元組數、包數、錯誤包數、丢包數、FIFO緩沖區錯誤數、分組幀錯誤數、壓縮資料包數、多點傳播幀數;右邊的8清單示發送,大部分相同,不同的colls表示沖突數、carrier表示載波損耗的數量(說實話,已經完全無法了解了)
我虛拟機的/sys/class/net/eth0/speed,好吧就一個數,查了一下說是網卡最新的連接配接速度,差不多就是帶寬啦,機關是Mbits/sec
第三方庫還提供了一些處理過後的名額,比如出入流量占帶寬百分比等
/*
網卡監控資料采集
*/
func NetMetrics() []*model.MetricValue {
return CoreNetMetrics(g.Config().Collector.IfacePrefix) // 擷取配置的要監控網卡的字首
}
/*
網卡監控
*/
func CoreNetMetrics(ifacePrefix []string) []*model.MetricValue {
netIfs, err := nux.NetIfs(ifacePrefix) // 通過系統檔案 /proc/net/dev 和 /sys/class/net/%s/speed 擷取網卡資訊
if err != nil {
log.Println(err)
return []*model.MetricValue{}
}
cnt := len(netIfs)
ret := make([]*model.MetricValue, cnt*26) // 網卡的資訊全放在一個一維數組裡
for idx, netIf := range netIfs {
iface := "iface=" + netIf.Iface
ret[idx*26+0] = CounterValue("net.if.in.bytes", netIf.InBytes, iface)
ret[idx*26+1] = CounterValue("net.if.in.packets", netIf.InPackages, iface)
ret[idx*26+2] = CounterValue("net.if.in.errors", netIf.InErrors, iface)
ret[idx*26+3] = CounterValue("net.if.in.dropped", netIf.InDropped, iface)
ret[idx*26+4] = CounterValue("net.if.in.fifo.errs", netIf.InFifoErrs, iface)
ret[idx*26+5] = CounterValue("net.if.in.frame.errs", netIf.InFrameErrs, iface)
ret[idx*26+6] = CounterValue("net.if.in.compressed", netIf.InCompressed, iface)
ret[idx*26+7] = CounterValue("net.if.in.multicast", netIf.InMulticast, iface)
ret[idx*26+8] = CounterValue("net.if.out.bytes", netIf.OutBytes, iface)
ret[idx*26+9] = CounterValue("net.if.out.packets", netIf.OutPackages, iface)
ret[idx*26+10] = CounterValue("net.if.out.errors", netIf.OutErrors, iface)
ret[idx*26+11] = CounterValue("net.if.out.dropped", netIf.OutDropped, iface)
ret[idx*26+12] = CounterValue("net.if.out.fifo.errs", netIf.OutFifoErrs, iface)
ret[idx*26+13] = CounterValue("net.if.out.collisions", netIf.OutCollisions, iface)
ret[idx*26+14] = CounterValue("net.if.out.carrier.errs", netIf.OutCarrierErrs, iface)
ret[idx*26+15] = CounterValue("net.if.out.compressed", netIf.OutCompressed, iface)
ret[idx*26+16] = CounterValue("net.if.total.bytes", netIf.TotalBytes, iface) // 第三方庫計算的,發送和接收的和
ret[idx*26+17] = CounterValue("net.if.total.packets", netIf.TotalPackages, iface)
ret[idx*26+18] = CounterValue("net.if.total.errors", netIf.TotalErrors, iface)
ret[idx*26+19] = CounterValue("net.if.total.dropped", netIf.TotalDropped, iface)
ret[idx*26+20] = GaugeValue("net.if.speed.bits", netIf.SpeedBits, iface) // 網卡最新的連接配接速度,即帶寬,機關是 Mbits/sec
ret[idx*26+21] = CounterValue("net.if.in.percent", netIf.InPercent, iface) // 入流量占帶寬百分比
ret[idx*26+22] = CounterValue("net.if.out.percent", netIf.OutPercent, iface) // 出流量占帶寬百分比
ret[idx*26+23] = CounterValue("net.if.in.bits", netIf.InBytes*8, iface) // 計算 bit 數
ret[idx*26+24] = CounterValue("net.if.out.bits", netIf.OutBytes*8, iface)
ret[idx*26+25] = CounterValue("net.if.total.bits", netIf.TotalBytes*8, iface)
}
return ret
}
kernel的資訊采集很簡單,也是調第三方庫,打開的系統檔案基本都很簡單,最複雜的就是/proc/sys/fs/file-nr,第一清單示系統已經配置設定并使用的檔案描述符數,第二清單示已經配置設定但并未使用的檔案描述符數,第三清單示系統支援的最大檔案描述符數
檔案描述符是一個程序打開檔案時建立的,同一個檔案可以有不同的檔案描述符,是以這幾項監控類似于打開檔案的數量
/*
核心監控資料采集
*/
func KernelMetrics() (L []*model.MetricValue) {
maxFiles, err := nux.KernelMaxFiles() // 通過 /proc/sys/fs/file-max 擷取系統允許的最大檔案描述符數
if err != nil {
log.Println(err)
return
}
L = append(L, GaugeValue("kernel.maxfiles", maxFiles))
maxProc, err := nux.KernelMaxProc() // 通過 /proc/sys/kernel/pid_max 擷取系統允許最大程序數
if err != nil {
log.Println(err)
return
}
L = append(L, GaugeValue("kernel.maxproc", maxProc))
allocateFiles, err := nux.KernelAllocateFiles() // 通過 /proc/sys/fs/file-nr 擷取系統目前使用的檔案描述符數
if err != nil {
log.Println(err)
return
}
L = append(L, GaugeValue("kernel.files.allocated", allocateFiles))
L = append(L, GaugeValue("kernel.files.left", maxFiles-allocateFiles)) // 系統還可以使用多少檔案描述符
return
}
cpu負載是通過/proc/loadavg采集的,每列分别表示1、5、15分鐘内在運作隊列中等待或等待磁盤IO的程序、正在運作的程序數占程序總數百分比、最近運作的程序ID号
代碼很短
/*
cpu 負載監控資料采集
*/
func LoadAvgMetrics() []*model.MetricValue {
load, err := nux.LoadAvg()
if err != nil {
log.Println(err)
return nil
}
return []*model.MetricValue{
GaugeValue("load.1min", load.Avg1min),
GaugeValue("load.5min", load.Avg5min),
GaugeValue("load.15min", load.Avg15min),
}
}
記憶體看的檔案是/proc/meminfo,一行是一個參數,采集的名額淺顯易懂,就不多講
/*
memory 監控資料采集
*/
func MemMetrics() []*model.MetricValue {
m, err := nux.MemInfo()
if err != nil {
log.Println(err)
return nil
}
memFree := m.MemFree + m.Buffers + m.Cached // 空閑記憶體總量等于空閑記憶體 + 給檔案的緩沖大小 + 高速緩沖存儲器大小
if m.MemAvailable > 0 { // 如果能直接從 /proc/meminfo 獲得空閑記憶體總量,使用檔案中的值
memFree = m.MemAvailable
}
memUsed := m.MemTotal - memFree // 計算使用量
// 計算百分比
pmemFree := 0.0
pmemUsed := 0.0
if m.MemTotal != 0 {
pmemFree = float64(memFree) * 100.0 / float64(m.MemTotal)
pmemUsed = float64(memUsed) * 100.0 / float64(m.MemTotal)
}
pswapFree := 0.0
pswapUsed := 0.0
if m.SwapTotal != 0 {
pswapFree = float64(m.SwapFree) * 100.0 / float64(m.SwapTotal)
pswapUsed = float64(m.SwapUsed) * 100.0 / float64(m.SwapTotal)
}
return []*model.MetricValue{
GaugeValue("mem.memtotal", m.MemTotal),
GaugeValue("mem.memused", memUsed),
GaugeValue("mem.memfree", memFree),
GaugeValue("mem.swaptotal", m.SwapTotal),
GaugeValue("mem.swapused", m.SwapUsed),
GaugeValue("mem.swapfree", m.SwapFree),
GaugeValue("mem.memfree.percent", pmemFree),
GaugeValue("mem.memused.percent", pmemUsed),
GaugeValue("mem.swapfree.percent", pswapFree),
GaugeValue("mem.swapused.percent", pswapUsed),
}
}
磁盤IO監控有兩個函數,DiskIOMetrics和IOStatsMetrics,都在一個檔案裡,磁盤資訊是從/proc/diskstats這個檔案讀的,當然并不是所有磁盤的資訊都會讀出來,必須是sd、vd、xvd、fio、nvme開頭的裝置名、且長度符合規定的裝置才會被監控,詳見函數ShouldHandleDevice
/*
讀完成次數
*/
func IOReadRequests(arr [2]*nux.DiskStats) uint64 {
return arr[0].ReadRequests - arr[1].ReadRequests
}
/*
合并讀完成次數
*/
func IOReadMerged(arr [2]*nux.DiskStats) uint64 {
return arr[0].ReadMerged - arr[1].ReadMerged
}
/*
讀扇區次數
*/
func IOReadSectors(arr [2]*nux.DiskStats) uint64 {
return arr[0].ReadSectors - arr[1].ReadSectors
}
/*
讀操作花費毫秒
*/
func IOMsecRead(arr [2]*nux.DiskStats) uint64 {
return arr[0].MsecRead - arr[1].MsecRead
}
/*
寫完成次數
*/
func IOWriteRequests(arr [2]*nux.DiskStats) uint64 {
return arr[0].WriteRequests - arr[1].WriteRequests
}
/*
合并寫完成次數
*/
func IOWriteMerged(arr [2]*nux.DiskStats) uint64 {
return arr[0].WriteMerged - arr[1].WriteMerged
}
/*
寫扇區次數
*/
func IOWriteSectors(arr [2]*nux.DiskStats) uint64 {
return arr[0].WriteSectors - arr[1].WriteSectors
}
/*
寫操作花費毫秒
*/
func IOMsecWrite(arr [2]*nux.DiskStats) uint64 {
return arr[0].MsecWrite - arr[1].MsecWrite
}
/*
io 操作花費毫秒
*/
func IOMsecTotal(arr [2]*nux.DiskStats) uint64 {
return arr[0].MsecTotal - arr[1].MsecTotal
}
/*
io 操作花費權重毫秒
*/
func IOMsecWeightedTotal(arr [2]*nux.DiskStats) uint64 {
return arr[0].MsecWeightedTotal - arr[1].MsecWeightedTotal
}
/*
計算兩次曆史資料的時間間隔有多少秒
*/
func TS(arr [2]*nux.DiskStats) uint64 {
return uint64(arr[0].TS.Sub(arr[1].TS).Nanoseconds() / 1000000)
}
/*
根據裝置名取出對應的一組曆史資料,傳遞給相應函數
*/
func IODelta(device string, f func([2]*nux.DiskStats) uint64) uint64 {
val, ok := diskStatsMap[device]
if !ok {
return 0
}
if val[1] == nil {
return 0
}
return f(val)
}
/*
采集磁盤IO監控資料
*/
func DiskIOMetrics() (L []*model.MetricValue) {
dsList, err := nux.ListDiskStats()
if err != nil {
log.Println(err)
return
}
for _, ds := range dsList {
if !ShouldHandleDevice(ds.Device) { // 根據裝置名的字首判定是否要進行監控
continue
}
device := "device=" + ds.Device
L = append(L, CounterValue("disk.io.read_requests", ds.ReadRequests, device))
L = append(L, CounterValue("disk.io.read_merged", ds.ReadMerged, device))
L = append(L, CounterValue("disk.io.read_sectors", ds.ReadSectors, device))
L = append(L, CounterValue("disk.io.msec_read", ds.MsecRead, device))
L = append(L, CounterValue("disk.io.write_requests", ds.WriteRequests, device))
L = append(L, CounterValue("disk.io.write_merged", ds.WriteMerged, device))
L = append(L, CounterValue("disk.io.write_sectors", ds.WriteSectors, device))
L = append(L, CounterValue("disk.io.msec_write", ds.MsecWrite, device))
L = append(L, CounterValue("disk.io.ios_in_progress", ds.IosInProgress, device))
L = append(L, CounterValue("disk.io.msec_total", ds.MsecTotal, device))
L = append(L, CounterValue("disk.io.msec_weighted_total", ds.MsecWeightedTotal, device))
}
return
}
/*
采集磁盤IO監控資料
*/
func IOStatsMetrics() (L []*model.MetricValue) {
dsLock.RLock()
defer dsLock.RUnlock()
for device := range diskStatsMap {
if !ShouldHandleDevice(device) { // 根據裝置名的字首判定是否要進行監控
continue
}
tags := "device=" + device
rio := IODelta(device, IOReadRequests)
wio := IODelta(device, IOWriteRequests)
delta_rsec := IODelta(device, IOReadSectors)
delta_wsec := IODelta(device, IOWriteSectors)
ruse := IODelta(device, IOMsecRead)
wuse := IODelta(device, IOMsecWrite)
use := IODelta(device, IOMsecTotal)
n_io := rio + wio
avgrq_sz := 0.0
await := 0.0
svctm := 0.0
if n_io != 0 {
avgrq_sz = float64(delta_rsec+delta_wsec) / float64(n_io) // 平均每次 IO 操作讀寫扇區數
await = float64(ruse+wuse) / float64(n_io) // 平均每次 IO 操作等待時間
svctm = float64(use) / float64(n_io) // 平均每次 IO 操作服務時間
}
duration := IODelta(device, TS)
L = append(L, GaugeValue("disk.io.read_bytes", float64(delta_rsec)*512.0, tags))
L = append(L, GaugeValue("disk.io.write_bytes", float64(delta_wsec)*512.0, tags))
L = append(L, GaugeValue("disk.io.avgrq_sz", avgrq_sz, tags))
L = append(L, GaugeValue("disk.io.avgqu-sz", float64(IODelta(device, IOMsecWeightedTotal))/1000.0, tags)) // 平均 IO 隊列長度,即 IO 權重時間(以秒為機關)
L = append(L, GaugeValue("disk.io.await", await, tags))
L = append(L, GaugeValue("disk.io.svctm", svctm, tags))
tmp := float64(use) * 100.0 / float64(duration)
if tmp > 100.0 {
tmp = 100.0
}
L = append(L, GaugeValue("disk.io.util", tmp, tags))
}
return
}
/*
判斷該裝置是否要監控
*/
func ShouldHandleDevice(device string) bool {
normal := len(device) == 3 && (strings.HasPrefix(device, "sd") || strings.HasPrefix(device, "vd"))
aws := len(device) >= 4 && strings.HasPrefix(device, "xvd")
flash := len(device) >= 4 && (strings.HasPrefix(device, "fio") || strings.HasPrefix(device, "nvme"))
return normal || aws || flash
}
網絡監控是/proc/net/netstat和/proc/net/snmp這兩個檔案,NetstatMetrics和UdpMetrics兩個函數的代碼很簡單,就是用第三方庫讀取檔案,然後用counter類型把原始資料處理成speed展示出來。但是裡面的名額已經超出了我的了解範圍,這裡略過不看,對名額感興趣可以看這個文proc 檔案系統調節參數介紹
程序監控,會調用第三方庫去讀取/proc目錄下所有的數字目錄,/proc下一個數字目錄就表示對應pid号的程序資訊,這裡讀取兩個,一個是存儲程序資訊的status檔案,一個是存儲程序指令的cmdline檔案
/*
程序監控資料采集
*/
func ProcMetrics() (L []*model.MetricValue) {
reportProcs := g.ReportProcs() // 讀取配置的要監控的程序
sz := len(reportProcs)
if sz == 0 {
return
}
ps, err := nux.AllProcs() // 讀取 /proc 下所有程序檔案的程序名、指令行
if err != nil {
log.Println(err)
return
}
pslen := len(ps)
for tags, m := range reportProcs {
cnt := 0
for i := 0; i < pslen; i++ {
if is_a(ps[i], m) { // 如果符合使用者設定的程序過濾規則
cnt++ // 計數加一
}
}
L = append(L, GaugeValue(g.PROC_NUM, cnt, tags)) // 符合使用者配置的程序過濾規則的程序數
}
return
}
/*
根據使用者配置的程序名和指令行,判斷該程序是否比對
*/
func is_a(p *nux.Proc, m map[int]string) bool {
// 判斷程序名或指令是否比對
for key, val := range m {
if key == 1 {
// 判斷程序名是否相等
if val != p.Name {
return false
}
} else if key == 2 {
// 判斷指令行是否包含指定字元串
if !strings.Contains(p.Cmdline, val) {
return false
}
}
}
return true
}
磁盤監控,根據agent的cfg.json裡配置的collect.mountPoint略有不同:如果完全沒有配置,open-falcon會采集所有磁盤的資料;如果配置了,隻會采集配置的這些磁盤的資料
采集資料就是調第三方庫讀取/proc/mounts下的挂載磁盤資訊,采集名額都很通俗易懂
/*
磁盤監控資料采集
*/
func DeviceMetrics() (L []*model.MetricValue) {
mountPoints, err := nux.ListMountPoint() // 讀取 /proc/mounts 目錄下挂載的裝置資訊
if err != nil {
log.Error("collect device metrics fail:", err)
return
}
var myMountPoints map[string]bool = make(map[string]bool)
if len(g.Config().Collector.MountPoint) > 0 {
for _, mp := range g.Config().Collector.MountPoint { // 擷取配置的要監控的挂載裝置
myMountPoints[mp] = true
}
}
var diskTotal uint64 = 0
var diskUsed uint64 = 0
for idx := range mountPoints {
fsSpec, fsFile, fsVfstype := mountPoints[idx][0], mountPoints[idx][1], mountPoints[idx][2] // 擷取每個裝置的分區定位、挂載目錄、磁盤類型
if len(myMountPoints) > 0 { //如果配置了 cfg.json 的 collect.mountPoint ,則隻檢測配置中的磁盤
if _, ok := myMountPoints[fsFile]; !ok { // 跳過未配置的磁盤,debug 模式下,如果挂載裝置未配置監控,會記錄
log.Debug("mount point not matched with config", fsFile, "ignored.")
continue
}
}
var du *nux.DeviceUsage
du, err = nux.BuildDeviceUsage(fsSpec, fsFile, fsVfstype) // 擷取指定挂載裝置的資訊
if err != nil {
log.Error(err)
continue
}
if du.BlocksAll == 0 {
continue
}
diskTotal += du.BlocksAll // 累加總容量
diskUsed += du.BlocksUsed // 累加使用的總容量
tags := fmt.Sprintf("mount=%s,fstype=%s", du.FsFile, du.FsVfstype)
L = append(L, GaugeValue("df.bytes.total", du.BlocksAll, tags))
L = append(L, GaugeValue("df.bytes.used", du.BlocksUsed, tags))
L = append(L, GaugeValue("df.bytes.free", du.BlocksFree, tags))
L = append(L, GaugeValue("df.bytes.used.percent", du.BlocksUsedPercent, tags))
L = append(L, GaugeValue("df.bytes.free.percent", du.BlocksFreePercent, tags))
if du.InodesAll == 0 {
continue
}
L = append(L, GaugeValue("df.inodes.total", du.InodesAll, tags))
L = append(L, GaugeValue("df.inodes.used", du.InodesUsed, tags))
L = append(L, GaugeValue("df.inodes.free", du.InodesFree, tags))
L = append(L, GaugeValue("df.inodes.used.percent", du.InodesUsedPercent, tags))
L = append(L, GaugeValue("df.inodes.free.percent", du.InodesFreePercent, tags))
}
if len(L) > 0 && diskTotal > 0 {
L = append(L, GaugeValue("df.statistics.total", float64(diskTotal)))
L = append(L, GaugeValue("df.statistics.used", float64(diskUsed)))
L = append(L, GaugeValue("df.statistics.used.percent", float64(diskUsed)*100.0/float64(diskTotal)))
}
return
}
端口監控,是通過系統指令檢視的
/*
端口監控資料采集
*/
func PortMetrics() (L []*model.MetricValue) {
reportPorts := g.ReportPorts() // 擷取配置的要監控的端口号
sz := len(reportPorts)
if sz == 0 {
return
}
allTcpPorts, err := nux.TcpPorts() // 通過指令 ss -t -l -n 列出所有正在 tcp 通信的端口号
if err != nil {
log.Println(err)
return
}
allUdpPorts, err := nux.UdpPorts() // 通過指令 ss -u -a -n 列出所有正在 udp 通信的端口号
if err != nil {
log.Println(err)
return
}
for i := 0; i < sz; i++ {
tags := fmt.Sprintf("port=%d", reportPorts[i])
if slice.ContainsInt64(allTcpPorts, reportPorts[i]) || slice.ContainsInt64(allUdpPorts, reportPorts[i]) { // 如果正在 tcp 或 udp 通信的端口中包含了改端口,就認為端口存活
L = append(L, GaugeValue(g.NET_PORT_LISTEN, 1, tags))
} else {
L = append(L, GaugeValue(g.NET_PORT_LISTEN, 0, tags))
}
}
return
}
socket狀态監控是通過指令ss -s獲得的,采集的是TCP開頭的那一行括号中的名額
/*
socket 狀态監控資料采集
*/
func SocketStatSummaryMetrics() (L []*model.MetricValue) {
ssMap, err := nux.SocketStatSummary() // 通過 ss -s 指令擷取 tcp 連接配接狀态
if err != nil {
log.Println(err)
return
}
for k, v := range ssMap {
L = append(L, GaugeValue("ss."+k, v))
}
return
}
目錄監控,是通過指令du -bs采集的,目錄很大的時候這個指令執行時間比較長,是以每個目錄都是開了一個goroutine去做的,并且有逾時響應的處理
/*
目錄監控資料采集
*/
func DuMetrics() (L []*model.MetricValue) {
paths := g.DuPaths() // 擷取配置的要監控的目錄
result := make(chan *model.MetricValue, len(paths))
var wg sync.WaitGroup
for _, path := range paths {
wg.Add(1)
go func(path string) {
var err error
defer func() {
if err != nil {
log.Println(err)
result <- GaugeValue(g.DU_BS, -1, "path="+path) // 如果采集某個目錄的監控名額出錯,則傳回 -1 作為監控值
}
wg.Done()
}()
cmd := exec.Command("du", "-bs", path) // 執行指令 du -bs 擷取目錄大小(機關為位元組)
var stdout bytes.Buffer
cmd.Stdout = &stdout
var stderr bytes.Buffer
cmd.Stderr = &stderr
err = cmd.Start()
if err != nil {
return
}
err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Second) // 逾時處理
if isTimeout {
err = errors.New(fmt.Sprintf("exec cmd : du -bs %s timeout", path))
return
}
errStr := stderr.String() // 指令報錯處理
if errStr != "" {
err = errors.New(errStr)
return
}
if err != nil {
err = errors.New(fmt.Sprintf("du -bs %s failed: %s", path, err.Error()))
return
}
arr := strings.Fields(stdout.String())
if len(arr) < 2 {
err = errors.New(fmt.Sprintf("du -bs %s failed: %s", path, "return fields < 2"))
return
}
size, err := strconv.ParseUint(arr[0], 10, 64) // 指令傳回的第一列為大小
if err != nil {
err = errors.New(fmt.Sprintf("cannot parse du -bs %s output", path))
return
}
result <- GaugeValue(g.DU_BS, size, "path="+path)
}(path)
}
wg.Wait()
resultLen := len(result)
for i := 0; i < resultLen; i++ {
L = append(L, <-result)
}
return
}
url的監控就是curl一下
/*
url 監控資料采集
*/
func UrlMetrics() (L []*model.MetricValue) {
reportUrls := g.ReportUrls() // 擷取要監控的 url
sz := len(reportUrls)
if sz == 0 {
return
}
hostname, err := g.Hostname()
if err != nil {
hostname = "None"
}
for furl, timeout := range reportUrls {
tags := fmt.Sprintf("url=%v,timeout=%v,src=%v", furl, timeout, hostname) // 通過 curl 指令檢查 url 是否在規定時間内響應
if ok, _ := probeUrl(furl, timeout); !ok {
L = append(L, GaugeValue(g.URL_CHECK_HEALTH, 0, tags))
continue
}
L = append(L, GaugeValue(g.URL_CHECK_HEALTH, 1, tags))
}
return
}
/*
檢測 url 是否正常響應
*/
func probeUrl(furl string, timeout string) (bool, error) {
bs, err := sys.CmdOutBytes("curl", "--max-filesize", "102400", "-I", "-m", timeout, "-o", "/dev/null", "-s", "-w", "%{http_code}", furl)
if err != nil {
log.Printf("probe url [%v] failed.the err is: [%v]\n", furl, err)
return false, err
}
reader := bufio.NewReader(bytes.NewBuffer(bs))
retcode, err := file.ReadLine(reader) // 擷取響應碼
if err != nil {
log.Println("read retcode failed.err is:", err)
return false, err
}
if strings.TrimSpace(string(retcode)) != "200" { // 響應碼為 200 才認為正常
log.Printf("return code [%v] is not 200.query url is [%v]", string(retcode), furl)
return false, err
}
return true, err
}
我電腦沒有gpu,是以gpu那塊沒有仔細看,是用第三方庫gonvml采集的資料,做一些機關換算就可以了,名額都通俗易懂
3.7 http服務
雖然main函數調用的是Start(),但是這個函數隻是啟動了http,關鍵的代碼還是在http這個包的init()函數裡
先看Start(),非常簡單
/*
開啟 http 服務
*/
func Start() {
if !g.Config().Http.Enabled { // 如果配置不開啟 http,傳回
return
}
addr := g.Config().Http.Listen // 擷取 http 服務啟動的位址
if addr == "" {
return
}
s := &http.Server{
Addr: addr,
MaxHeaderBytes: 1 << 30,
}
log.Println("listening", addr)
log.Fatalln(s.ListenAndServe()) // 開啟 http
}
初始化函數裡啟動了很多API,這些才是真正的後端代碼
/*
初始化 API
*/
func init() {
configAdminRoutes()
configCpuRoutes()
configDfRoutes()
configHealthRoutes()
configIoStatRoutes()
configKernelRoutes()
configMemoryRoutes()
configPageRoutes()
configPluginRoutes()
configPushRoutes()
configRunRoutes()
configSystemRoutes()
}
這個是管理用的API,前面一直提到的可信任IP就是在這裡用的了,來自可信任IP的API請求可以完成某些重要操作,比如停止、加載配置
/*
管理類 API
*/
func configAdminRoutes() {
// 結束 agent 程式
http.HandleFunc("/exit", func(w http.ResponseWriter, r *http.Request) {
if g.IsTrustable(r.RemoteAddr) { // 來自可信任 IP 的調用才能接受
w.Write([]byte("exiting...")) // 成功退出,傳回 exiting...
go func() {
time.Sleep(time.Second)
os.Exit(0)
}()
} else {
w.Write([]byte("no privilege")) // 不是來自可信任 IP,傳回 no privilege
}
})
// 重新讀取配置
http.HandleFunc("/config/reload", func(w http.ResponseWriter, r *http.Request) {
if g.IsTrustable(r.RemoteAddr) {
g.ParseConfig(g.ConfigFile)
RenderDataJson(w, g.Config()) // 傳回 config 内容
} else {
w.Write([]byte("no privilege"))
}
})
// 傳回工作路徑
http.HandleFunc("/workdir", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, file.SelfDir())
})
// 傳回可信任 IP
http.HandleFunc("/ips", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, g.TrustableIps())
})
}
擷取cpu資訊的API,基本直接調用了采集監控資料的函數
/*
CPU API
*/
func configCpuRoutes() {
// 擷取 cpu 個數
http.HandleFunc("/proc/cpu/num", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, runtime.NumCPU())
})
// 擷取 cpu 主頻
http.HandleFunc("/proc/cpu/mhz", func(w http.ResponseWriter, r *http.Request) {
data, err := nux.CpuMHz()
AutoRender(w, data, err)
})
// 以字元串形式傳回 cpu 使用情況
http.HandleFunc("/page/cpu/usage", func(w http.ResponseWriter, r *http.Request) {
if !funcs.CpuPrepared() {
RenderMsgJson(w, "not prepared")
return
}
idle := funcs.CpuIdle()
busy := 100.0 - idle
item := [10]string{
fmt.Sprintf("%.1f%%", idle),
fmt.Sprintf("%.1f%%", busy),
fmt.Sprintf("%.1f%%", funcs.CpuUser()),
fmt.Sprintf("%.1f%%", funcs.CpuNice()),
fmt.Sprintf("%.1f%%", funcs.CpuSystem()),
fmt.Sprintf("%.1f%%", funcs.CpuIowait()),
fmt.Sprintf("%.1f%%", funcs.CpuIrq()),
fmt.Sprintf("%.1f%%", funcs.CpuSoftIrq()),
fmt.Sprintf("%.1f%%", funcs.CpuSteal()),
fmt.Sprintf("%.1f%%", funcs.CpuGuest()),
}
RenderDataJson(w, [][10]string{item})
})
// 以 map 形式傳回 cpu 使用情況
http.HandleFunc("/proc/cpu/usage", func(w http.ResponseWriter, r *http.Request) {
if !funcs.CpuPrepared() {
RenderMsgJson(w, "not prepared")
return
}
idle := funcs.CpuIdle()
busy := 100.0 - idle
RenderDataJson(w, map[string]interface{}{
"idle": idle,
"busy": busy,
"user": funcs.CpuUser(),
"nice": funcs.CpuNice(),
"system": funcs.CpuSystem(),
"iowait": funcs.CpuIowait(),
"irq": funcs.CpuIrq(),
"softirq": funcs.CpuSoftIrq(),
"steal": funcs.CpuSteal(),
"guest": funcs.CpuGuest(),
})
})
}
磁盤的API和監控的函數簡直一模一樣的邏輯
/*
磁盤 API
*/
func configDfRoutes() {
// 擷取磁盤和檔案系統對象的使用情況
http.HandleFunc("/page/df", func(w http.ResponseWriter, r *http.Request) {
mountPoints, err := nux.ListMountPoint()
if err != nil {
RenderMsgJson(w, err.Error())
return
}
var ret [][]interface{} = make([][]interface{}, 0)
for idx := range mountPoints {
var du *nux.DeviceUsage
du, err = nux.BuildDeviceUsage(mountPoints[idx][0], mountPoints[idx][1], mountPoints[idx][2])
if err == nil {
ret = append(ret,
[]interface{}{
du.FsSpec,
core.ReadableSize(float64(du.BlocksAll)),
core.ReadableSize(float64(du.BlocksUsed)),
core.ReadableSize(float64(du.BlocksFree)),
fmt.Sprintf("%.1f%%", du.BlocksUsedPercent),
du.FsFile,
core.ReadableSize(float64(du.InodesAll)),
core.ReadableSize(float64(du.InodesUsed)),
core.ReadableSize(float64(du.InodesFree)),
fmt.Sprintf("%.1f%%", du.InodesUsedPercent),
du.FsVfstype,
})
}
}
RenderDataJson(w, ret)
})
}
用戶端自身健康度檢測的API很簡單
/*
用戶端健康情況 API
*/
func configHealthRoutes() {
// 如果用戶端正常工作,傳回 ok
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})
// 傳回用戶端版本
http.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(g.VERSION))
})
}
IO狀态的API,講道理,這幾個函數真的是一個人寫的嗎,有的直接調funcs裡面的監控采集函數,有的還要寫一遍同樣的邏輯
/*
IO 狀态 API
*/
func configIoStatRoutes() {
// 傳回 IO 情況
http.HandleFunc("/page/diskio", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, funcs.IOStatsForPage())
})
}
核心資訊的API
/*
核心資訊 API
*/
func configKernelRoutes() {
// 傳回 hostname,擷取順序是 agent 配置、系統環境變量 FALCON_ENDPOINT、從系統擷取
http.HandleFunc("/proc/kernel/hostname", func(w http.ResponseWriter, r *http.Request) {
data, err := g.Hostname()
AutoRender(w, data, err)
})
// 傳回最大程序數
http.HandleFunc("/proc/kernel/maxproc", func(w http.ResponseWriter, r *http.Request) {
data, err := nux.KernelMaxProc()
AutoRender(w, data, err)
})
// 傳回最大檔案數
http.HandleFunc("/proc/kernel/maxfiles", func(w http.ResponseWriter, r *http.Request) {
data, err := nux.KernelMaxFiles()
AutoRender(w, data, err)
})
// 傳回核心版本
http.HandleFunc("/proc/kernel/version", func(w http.ResponseWriter, r *http.Request) {
data, err := sys.CmdOutNoLn("uname", "-r") // 執行指令 uname -r 擷取核心版本
AutoRender(w, data, err)
})
}
記憶體API也沒啥好說的,為啥傳回的形式還要搞的這麼多
/*
記憶體 API
*/
func configMemoryRoutes() {
// 以數組形式傳回記憶體使用率資訊
http.HandleFunc("/page/memory", func(w http.ResponseWriter, r *http.Request) {
mem, err := nux.MemInfo()
if err != nil {
RenderMsgJson(w, err.Error())
return
}
memFree := mem.MemFree + mem.Buffers + mem.Cached
memUsed := mem.MemTotal - memFree
var t uint64 = 1024 * 1024
RenderDataJson(w, []interface{}{mem.MemTotal / t, memUsed / t, memFree / t})
})
// 以 map 形式傳回記憶體使用率資訊
http.HandleFunc("/proc/memory", func(w http.ResponseWriter, r *http.Request) {
mem, err := nux.MemInfo()
if err != nil {
RenderMsgJson(w, err.Error())
return
}
memFree := mem.MemFree + mem.Buffers + mem.Cached
memUsed := mem.MemTotal - memFree
RenderDataJson(w, map[string]interface{}{
"total": mem.MemTotal,
"free": memFree,
"used": memUsed,
})
})
}
這個靜态資源API其實目前隻支援一個頁面,就是用戶端IP:port那個頁面,當然你也可以自己在agent/public目錄下,建立目錄放其他的index.html,也可以擷取到
/*
靜态資源擷取 API
*/
func configPageRoutes() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "/") {
if !file.IsExist(filepath.Join(g.Root, "/public", r.URL.Path, "index.html")) { // 通路 用戶端IP: port/ 即可擷取 index.html
http.NotFound(w, r)
return
}
}
http.FileServer(http.Dir(filepath.Join(g.Root, "/public"))).ServeHTTP(w, r)
})
}
plugin的API
/*
plugin API
*/
func configPluginRoutes() {
// 從 git 更新插件
http.HandleFunc("/plugin/update", func(w http.ResponseWriter, r *http.Request) {
if !g.Config().Plugin.Enabled {
w.Write([]byte("plugin not enabled"))
return
}
dir := g.Config().Plugin.Dir
parentDir := file.Dir(dir)
file.InsureDir(parentDir) // 確定 plugin 父目錄存在,不存在建立
if file.IsExist(dir) { // 如果插件目錄已經存在,pull
// git pull
cmd := exec.Command("git", "pull")
cmd.Dir = dir // 在 plugin 目錄執行 pull
err := cmd.Run()
if err != nil {
w.Write([]byte(fmt.Sprintf("git pull in dir:%s fail. error: %s", dir, err)))
return
}
} else { // 如果插件目錄不存在,clone
// git clone
cmd := exec.Command("git", "clone", g.Config().Plugin.Git, file.Basename(dir))
cmd.Dir = parentDir // 在父目錄執行 clone
err := cmd.Run()
if err != nil {
w.Write([]byte(fmt.Sprintf("git clone in dir:%s fail. error: %s", parentDir, err)))
return
}
}
w.Write([]byte("success"))
})
// plugin 強制 reset
http.HandleFunc("/plugin/reset", func(w http.ResponseWriter, r *http.Request) {
if !g.Config().Plugin.Enabled {
w.Write([]byte("plugin not enabled"))
return
}
dir := g.Config().Plugin.Dir
// git reset
if file.IsExist(dir) {
cmd := exec.Command("git", "reset", "--hard")
cmd.Dir = dir
err := cmd.Run()
if err != nil {
w.Write([]byte(fmt.Sprintf("git reset --hard in dir:%s fail. error: %s", dir, err)))
return
}
}
w.Write([]byte("success"))
})
// 檢視目前生效的 plugin
http.HandleFunc("/plugins", func(w http.ResponseWriter, r *http.Request) {
//TODO: not thread safe
RenderDataJson(w, plugins.Plugins)
})
}
push的API是最常用的,每次調用push他都會直接發給transfer,我記得美團對open-falcon的改進有一條就是增加了一個緩存,避免不同業務頻繁調用push接口的造成的發送性能下降、資料丢失等問題
/*
push API
*/
func configPushRoutes() {
http.HandleFunc("/v1/push", func(w http.ResponseWriter, req *http.Request) {
if req.ContentLength == 0 {
http.Error(w, "body is blank", http.StatusBadRequest)
return
}
decoder := json.NewDecoder(req.Body)
var metrics []*model.MetricValue
err := decoder.Decode(&metrics) // push 的資訊轉為 metric
if err != nil {
http.Error(w, "connot decode body", http.StatusBadRequest)
return
}
g.SendToTransfer(metrics) // 發送給 transfer
w.Write([]byte("success"))
})
}
run的API可以在機器上執行一些指令,這個非常危險,是以配置的backdoor這項要求打開,請求的IP位址也要是可信任IP才行
/*
執行指令 API
*/
func configRunRoutes() {
http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
if !g.Config().Http.Backdoor { // 如果不允許 backdoor,報錯傳回
w.Write([]byte("/run disabled"))
return
}
if g.IsTrustable(r.RemoteAddr) { // 檢測是否是可信任 IP
if r.ContentLength == 0 {
http.Error(w, "body is blank", http.StatusBadRequest)
return
}
bs, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
body := string(bs)
out, err := sys.CmdOutBytes("sh", "-c", body) // 執行指令
if err != nil {
w.Write([]byte("exec fail: " + err.Error()))
return
}
w.Write(out) // 傳回指令輸出
} else {
w.Write([]byte("no privilege"))
}
})
}
系統資訊API,可以查詢系統時間、系統啟動時間、cpu負載
/*
系統資訊 API
*/
func configSystemRoutes() {
// 擷取目前時間
http.HandleFunc("/system/date", func(w http.ResponseWriter, req *http.Request) {
RenderDataJson(w, time.Now().Format("2006-01-02 15:04:05"))
})
// 以字元串形式傳回系統啟動時間
http.HandleFunc("/page/system/uptime", func(w http.ResponseWriter, req *http.Request) {
days, hours, mins, err := nux.SystemUptime()
AutoRender(w, fmt.Sprintf("%d days %d hours %d minutes", days, hours, mins), err)
})
// 以 map 形式傳回系統啟動時間
http.HandleFunc("/proc/system/uptime", func(w http.ResponseWriter, req *http.Request) {
days, hours, mins, err := nux.SystemUptime()
if err != nil {
RenderMsgJson(w, err.Error())
return
}
RenderDataJson(w, map[string]interface{}{
"days": days,
"hours": hours,
"mins": mins,
})
})
// 以數組形式傳回 cpu load
http.HandleFunc("/page/system/loadavg", func(w http.ResponseWriter, req *http.Request) {
cpuNum := runtime.NumCPU()
load, err := nux.LoadAvg()
if err != nil {
RenderMsgJson(w, err.Error())
return
}
ret := [3][2]interface{}{
{load.Avg1min, int64(load.Avg1min * 100.0 / float64(cpuNum))},
{load.Avg5min, int64(load.Avg5min * 100.0 / float64(cpuNum))},
{load.Avg15min, int64(load.Avg15min * 100.0 / float64(cpuNum))},
}
RenderDataJson(w, ret)
})
// 以對象形式傳回 cpu load
http.HandleFunc("/proc/system/loadavg", func(w http.ResponseWriter, req *http.Request) {
data, err := nux.LoadAvg()
AutoRender(w, data, err)
})
}