天天看點

open-falcon源碼閱讀(二)——agent源碼閱讀1 概覽2 建立映射3 定時任務

本人水準:參加工作六個月,剛看完一本《go實戰》的菜雞

代碼版本:2019年1月15日使用go get github.com/open-falcon/falcon-plus拉下來的代碼

agent的功能就是不停采集機器各種資料發送給其他子產品,提供一個自定義push metric的接口,向hbs發送一些心跳資訊

1 概覽

1.1 目錄結構

open-falcon源碼閱讀(二)——agent源碼閱讀1 概覽2 建立映射3 定時任務

agent的代碼還是比較少的,先粗略講一下每個包都做什麼

  • cron:定時任務,包括從hbs同步plugin、内建監控項和可信任IP等資訊,向hbs發送心跳資訊,收集監控資料
  • funcs:收集監控資料的函數,基本都是通過第三方庫nux去收集的
  • g:全局共享資訊的存放,包括配置資訊,日志,網絡通信的工具類,常量等
  • http:API的後端實作
  • plugins:使用者配置plugin的管理和啟動

1.2 main函數

上代碼,中文注釋都是我自己加的

前面的一些初始化代碼都很簡單,這裡略去不講,從BuildMappers開始分析

func main() {
	// 指令行參數
	cfg := flag.String("c", "cfg.json", "configuration file")  // 配置檔案,預設為 cfg.json
	version := flag.Bool("v", false, "show version")  // 是否隻顯示版本,預設不顯示
	check := flag.Bool("check", false, "check collector")  // 是否隻檢查收集器,預設不檢查

	// 擷取一下指令行參數
	flag.Parse()

	// 列印版本
	if *version {
		fmt.Println(g.VERSION)
		os.Exit(0)
	}

	// 檢查是否能正常采集監控資料
	if *check {
		funcs.CheckCollector()
		os.Exit(0)
	}

	// 讀取配置檔案
	g.ParseConfig(*cfg)

	// 設定日志級别
	if g.Config().Debug {
		g.InitLog("debug")
	} else {
		g.InitLog("info")
	}

	// 初始化
	g.InitRootDir()  // 初始化根目錄
	g.InitLocalIp()  // 初始化本地 IP
	g.InitRpcClients() // 初始化 hbs rpc 用戶端

	// 建立收集資料的函數和發送資料間隔映射
	funcs.BuildMappers()

	// 定時任務(每秒),通過檔案 /proc/cpustats 和 /proc/diskstats,更新 cpu 和 disk 的原始資料,等待後一步處理
	go cron.InitDataHistory()

	// 定時任務
	cron.ReportAgentStatus()  // 向 hbs 發送用戶端心跳資訊
	cron.SyncMinePlugins()  // 同步 plugins
	cron.SyncBuiltinMetrics()  // 從 hbs 同步 BuiltinMetrics
	cron.SyncTrustableIps()  // 同步可信任 IP 位址
	cron.Collect()  // 采集監控資料

	// 開啟 http 服務
	go http.Start()

	// 阻塞 main 函數
	select {}
}
           

2 建立映射

看着很長,其實就是每組采集監控資料的函數(分組大概是基礎名額、device、網絡、GPU等等),和向transfer發送的資料間隔相對應放到一個叫FuncsAndInterval的自定義結構體裡面

/*
	收集資料的函數和發送資料間隔映射
 */
func BuildMappers() {
	interval := g.Config().Transfer.Interval  // 向 transfer 發送資料間隔
	// 擷取監控資料函數、與發送資料間隔映射
	Mappers = []FuncsAndInterval{
		{
			Fs: []func() []*model.MetricValue{
				AgentMetrics,
				CpuMetrics,
				NetMetrics,
				KernelMetrics,
				LoadAvgMetrics,
				MemMetrics,
				DiskIOMetrics,
				IOStatsMetrics,
				NetstatMetrics,
				ProcMetrics,
				UdpMetrics,
			},
			Interval: interval,
		},
		{
			Fs: []func() []*model.MetricValue{
				DeviceMetrics,
			},
			Interval: interval,
		},
		{
			Fs: []func() []*model.MetricValue{
				PortMetrics,
				SocketStatSummaryMetrics,
			},
			Interval: interval,
		},
		{
			Fs: []func() []*model.MetricValue{
				DuMetrics,
			},
			Interval: interval,
		},
		{
			Fs: []func() []*model.MetricValue{
				UrlMetrics,
			},
			Interval: interval,
		},
		{
			Fs: []func() []*model.MetricValue{
				GpuMetrics,
			},
			Interval: interval,
		},
	}
}
           

結構體FuncsAndInterval的定義,很簡單

type FuncsAndInterval struct {
	Fs       []func() []*model.MetricValue  // 一組采集監控資料函數
	Interval int  // 向 transfer 發送資料間隔
}
           

采集監控資料函數的傳回值統一為[]*model.MetricValue,來看兩個采集監控資料的函數,先簡單掃一遍不追求細節

agent.alive是最熟悉的一個監控項了,不用其他邏輯來采集資料直接就是1

/*
	擷取 agent 的 MetricValue
	傳回值固定為 1,表示用戶端正常工作
 */
func AgentMetrics() []*model.MetricValue {
	return []*model.MetricValue{GaugeValue("agent.alive", 1)}
}
           

cpu的就複雜很多,十來個,每個都調用了不同的函數去采集監控項

/*
	擷取 cpu 的 MetricValue
 */
func CpuMetrics() []*model.MetricValue {
	if !CpuPrepared() {
		return []*model.MetricValue{}
	}

	// 擷取各項 cpu 監控名額
	cpuIdleVal := CpuIdle()
	idle := GaugeValue("cpu.idle", cpuIdleVal)
	busy := GaugeValue("cpu.busy", 100.0-cpuIdleVal)
	user := GaugeValue("cpu.user", CpuUser())
	nice := GaugeValue("cpu.nice", CpuNice())
	system := GaugeValue("cpu.system", CpuSystem())
	iowait := GaugeValue("cpu.iowait", CpuIowait())
	irq := GaugeValue("cpu.irq", CpuIrq())
	softirq := GaugeValue("cpu.softirq", CpuSoftIrq())
	steal := GaugeValue("cpu.steal", CpuSteal())
	guest := GaugeValue("cpu.guest", CpuGuest())
	switches := CounterValue("cpu.switches", CurrentCpuSwitches())
	return []*model.MetricValue{idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches}
}
           

3 定時任務

這部分是agent功能的主體,每個定時任務都是開了一個goroutine去完成的,

// 定時任務(每秒),通過檔案 /proc/stat 和 /proc/diskstats,更新 cpu 和 disk 的原始資料,等待後一步處理
go cron.InitDataHistory()

// 定時任務
cron.ReportAgentStatus()  // 向 hbs 發送用戶端心跳資訊
cron.SyncMinePlugins()  // 同步 plugins
cron.SyncBuiltinMetrics()  // 從 hbs 同步用戶端需要采集的 metric
cron.SyncTrustableIps()  // 同步可信任 IP 位址
cron.Collect()  // 采集監控資料
           

3.1 初始化曆史

open-falcon是通過/proc/stat和/proc/diskstats這兩個系統檔案采集cpu和disk的資料,保留最近兩次的資料是為了處理delta類名額

/*
	定時(每秒)通過 /proc/stat 和 /proc/diskstats 檔案采集 cpu 和 disk 的資料,保留最近兩次的資料
 */
func InitDataHistory() {
	for {
		funcs.UpdateCpuStat()  // 更新 cpu 狀态
		funcs.UpdateDiskStats()  // 更新 disk 狀态
		time.Sleep(g.COLLECT_INTERVAL)  // 睡一個收集間隔(一秒)
	}
}
           

3.1.1 cpu曆史資料

看一下UpdateCpuStat(),比較簡單

const (
	historyCount int = 2  // 保留多少輪曆史資料
)

var (
	procStatHistory [historyCount]*nux.ProcStat  // 存放 cpu 曆史資料
	psLock          = new(sync.RWMutex)  // 鎖
)

/*
	更新 cpu 狀态
 */
func UpdateCpuStat() error {
	// 讀取 /proc/stat 檔案
	ps, err := nux.CurrentProcStat()
	if err != nil {
		return err
	}

	psLock.Lock()
	defer psLock.Unlock()
	// 抛棄過期的曆史資料,給新資料騰出位置
	for i := historyCount - 1; i > 0; i-- {
		procStatHistory[i] = procStatHistory[i-1]
	}

	procStatHistory[0] = ps  // 儲存最新資料
	return nil
}
           

/proc/stat這個檔案記錄了開機以來的cpu使用資訊,比如在我的虛拟機上長這樣。具體的含義可以看這篇部落格【Linux】/proc/stat詳解 完整驗證版

open-falcon源碼閱讀(二)——agent源碼閱讀1 概覽2 建立映射3 定時任務
  • 以cpu開頭的那一行表示所有cpu的數值總和,我的虛拟機隻有一個cpu,是以cpu和cpu0完全一緻
  • cpu0之後的9個數字分别表示使用者時間、nice使用者時間、系統時間、空閑時間、IO等待時間、硬中斷時間、軟中斷時間、steal時間、來賓時間
  • intr表示中斷,第一個資料是總中斷次數,之後是某些中斷發生的次數
  • ctxt表示cpu上下文切換次數
  • btime表示系統啟動時間戳(這個我貼的那篇博文有誤,他說是系統啟動的秒,算算就知道不可能)
  • processes表示建立process的個數
  • procs_running表示目前運作隊列的任務數目
  • procs_blocked表示目前運作隊列的阻塞任務數目
  • softirq表示軟中斷,資料含義和intr類似

這個檔案的讀取是通過nux包完成的,讀檔案的邏輯略去不說,看一下讀取之後傳回的結構體

CpuUsage那裡的Total是讀取過程中把每個cpu時間累加得來的

func (this *ProcStat) String() string {
	return fmt.Sprintf("<Cpu:%v, Cpus:%v, Ctxt:%d, Processes:%d, ProcsRunning:%d, ProcsBlocking:%d>",
		this.Cpu,
		this.Cpus,
		this.Ctxt,
		this.Processes,
		this.ProcsRunning,
		this.ProcsBlocked)
}

type CpuUsage struct {
	User    uint64 // time spent in user mode
	Nice    uint64 // time spent in user mode with low priority (nice)
	System  uint64 // time spent in system mode
	Idle    uint64 // time spent in the idle task
	Iowait  uint64 // time spent waiting for I/O to complete (since Linux 2.5.41)
	Irq     uint64 // time spent servicing  interrupts  (since  2.6.0-test4)
	SoftIrq uint64 // time spent servicing softirqs (since 2.6.0-test4)
	Steal   uint64 // time spent in other OSes when running in a virtualized environment (since 2.6.11)
	Guest   uint64 // time spent running a virtual CPU for guest operating systems under the control of the Linux kernel. (since 2.6.24)
	Total   uint64 // total of all time fields
}
           

3.1.2 disk曆史資料

UpdateDiskStats(),和cpu類似,唯一差別的是曆史資料的儲存形式從切片變成了映射,每個裝置的資訊分開了

var (
	diskStatsMap = make(map[string][2]*nux.DiskStats)  // 存放 disk 曆史資料的映射,映射關系為裝置名-裝置資訊
	dsLock       = new(sync.RWMutex)  // 鎖
)

/*
	更新 disk 狀态
 */
func UpdateDiskStats() error {
	// 讀取 /proc/diskstats 檔案
	dsList, err := nux.ListDiskStats()
	if err != nil {
		return err
	}

	dsLock.Lock()
	defer dsLock.Unlock()
	// 抛棄過期的曆史資料,給新資料騰出位置,儲存新資料
	for i := 0; i < len(dsList); i++ {
		device := dsList[i].Device
		diskStatsMap[device] = [2]*nux.DiskStats{dsList[i], diskStatsMap[device][0]}
	}
	return nil
}
           

/proc/diskstats檔案在我虛拟機上長這樣,具體含義可以參考這篇博文/proc/diskstats檔案注解

open-falcon源碼閱讀(二)——agent源碼閱讀1 概覽2 建立映射3 定時任務

 第一列為主裝置号,第二列為次裝置号,第三列為裝置名稱

後面的11列,表示讀完成次數、合并讀完成次數、讀扇區次數、讀操作花費毫秒、寫完成次數、合并寫完成次數、寫扇區次數、寫操作花費毫秒、正在處理的輸入輸出請求數、輸入輸出花費毫秒、輸入輸出花費的權重毫秒

讀取也是通過nux包,傳回的結構體如下

type DiskStats struct {
	Major             int
	Minor             int
	Device            string
	ReadRequests      uint64 // Total number of reads completed successfully.
	ReadMerged        uint64 // Adjacent read requests merged in a single req.
	ReadSectors       uint64 // Total number of sectors read successfully.
	MsecRead          uint64 // Total number of ms spent by all reads.
	WriteRequests     uint64 // total number of writes completed successfully.
	WriteMerged       uint64 // Adjacent write requests merged in a single req.
	WriteSectors      uint64 // total number of sectors written successfully.
	MsecWrite         uint64 // Total number of ms spent by all writes.
	IosInProgress     uint64 // Number of actual I/O requests currently in flight.
	MsecTotal         uint64 // Amount of time during which ios_in_progress >= 1.
	MsecWeightedTotal uint64 // Measure of recent I/O completion time and backlog.
	TS                time.Time
}
           

3.2 向 hbs 發送心跳

代碼如下,基本就是一個簡單的rpc,遠端調用hbs的Agent.ReportStatus方法

/*
	向 hbs 彙報用戶端狀态
 */
func ReportAgentStatus() {
	if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {  // 如果允許向 hbs 發送心跳,并配置了 hbs 位址
		go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second)  // 定時發送心跳
	}
}

/*
	定時彙報用戶端狀态
 */
func reportAgentStatus(interval time.Duration) {
	for {
		hostname, err := g.Hostname()  // 擷取目前主機的 hostname,依次嘗試從配置檔案、環境變量 FALCON_ENDPOINT、系統 hostname 擷取
		if err != nil {
			hostname = fmt.Sprintf("error:%s", err.Error())
		}

		// 生成彙報請求
		req := model.AgentReportRequest{
			Hostname:      hostname,
			IP:            g.IP(),  // IP
			AgentVersion:  g.VERSION,  // 版本
			PluginVersion: g.GetCurrPluginVersion(),  // 插件版本号,即最後一次 git commit 的 hash
		}

		var resp model.SimpleRpcResponse
		err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)  // rpc 調用 hbs 的 Agent.ReportStatus,擷取響應
		if err != nil || resp.Code != 0 {
			log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp)
		}

		time.Sleep(interval)  // 睡一個心跳彙報間隔時間
	}
}
           

可以看一下hbsClient.Call函數的細節

/*
	rpc 調用
*/
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {

	this.Lock()
	defer this.Unlock()

	err := this.serverConn()  // 連接配接伺服器
	if err != nil {
		return err
	}

	timeout := time.Duration(10 * time.Second)  // 逾時時間為 10 秒
	done := make(chan error, 1)

	go func() {
		err := this.rpcClient.Call(method, args, reply)  // rpc 調用方法
		done <- err
	}()

	select {
	// 如果 rpc 調用逾時,關閉連接配接,報錯
	case <-time.After(timeout):
		log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
		this.close()
		return errors.New(this.RpcServer + " rpc call timeout")
	// rpc 調用如果有報錯,關閉連接配接,報錯
	case err := <-done:
		if err != nil {
			this.close()
			return err
		}
	}

	return nil
}
           

3.3 同步plugins

agent會用哈希表緩存plugins的相關資訊以便定期執行,除此之外,還要定期去hbs擷取使用者在web頁面上面配置的plugins路徑來更新緩存,總之同步plugins的操作就是維護好這個存儲plugins的哈希表

函數調用的ClearAllPlugins、DelNoUsePlugins、AddNewPlugins都在plugins.go裡,而plugins.go又調用了scheduler.go

// Copyright 2017 Xiaomi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cron

import (
	"github.com/open-falcon/falcon-plus/common/model"
	"github.com/open-falcon/falcon-plus/modules/agent/g"
	"github.com/open-falcon/falcon-plus/modules/agent/plugins"
	"log"
	"strings"
	"time"
)

/*
	同步使用者自定義 plugins
 */
func SyncMinePlugins() {
	// 配置未使能 plugin,傳回
	if !g.Config().Plugin.Enabled {
		return
	}

	// 配置未使能 hbs,傳回
	if !g.Config().Heartbeat.Enabled {
		return
	}

	// 未配置 hbs 位址,傳回
	if g.Config().Heartbeat.Addr == "" {
		return
	}

	go syncMinePlugins()
}

/*
	同步 plugins
 */
func syncMinePlugins() {

	var (
		timestamp  int64 = -1
		pluginDirs []string
	)

	duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second

	for {
		time.Sleep(duration)  // 睡一個周期間隔

		hostname, err := g.Hostname()  // 擷取目前主機的 hostname,依次嘗試從配置檔案、環境變量 FALCON_ENDPOINT、系統 hostname 擷取
		if err != nil {
			continue
		}

		req := model.AgentHeartbeatRequest{  // 建立 rpc 請求
			Hostname: hostname,
		}

		var resp model.AgentPluginsResponse
		err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)  // rpc 調用 hbs 的 Agent.MinePlugins
		if err != nil {
			log.Println("ERROR:", err)
			continue
		}

		if resp.Timestamp <= timestamp {  // 如果傳回資料的時間戳小于等于上次的同步的時間戳,跳過本次同步
			continue
		}

		pluginDirs = resp.Plugins  // 擷取使用者在 web 頁面配置的本主機 plugin 存放位址
		timestamp = resp.Timestamp

		if g.Config().Debug {
			log.Println(&resp)
		}

		if len(pluginDirs) == 0 {  // 若使用者沒有配置,清空緩存的 plugins
			plugins.ClearAllPlugins()
		}

		desiredAll := make(map[string]*plugins.Plugin)  // 儲存配置的目錄下所有符合命名規則的監控腳本

		for _, p := range pluginDirs {
			underOneDir := plugins.ListPlugins(strings.Trim(p, "/"))  // 列出該目錄下所有符合命名規則setp_xxx的監控腳本(使用者配置的路徑,前後的“/”會被去掉)
			for k, v := range underOneDir {
				desiredAll[k] = v
			}
		}

		// 更新 plugins 緩存
		plugins.DelNoUsePlugins(desiredAll)  // 清除無用的
		plugins.AddNewPlugins(desiredAll)  // 增加新的

	}
}
           

現在來看看plugins.go和scheduler.go,一點一點看

plugins.go裡面儲存了兩個hash表,鍵值都是plugin的腳本目錄,儲存的對象一個叫Plugin一個叫PluginScheduler

var (
	Plugins              = make(map[string]*Plugin)  // 儲存使用者在 web 配置的要執行的 plugin,鍵值是 plugin 腳本目錄
	PluginsWithScheduler = make(map[string]*PluginScheduler)  // 儲存定時執行的 plugin,鍵值是 plugin 腳本目錄
)
           

看一下Plugin和PluginScheduler,Plugin其實就是從hbs擷取的資訊,PluginScheduler封裝了一層,加了定時器和通道

/*
	plugin
 */
type Plugin struct {
	FilePath string	// 檔案目錄
	MTime    int64	// 檔案修改時間
	Cycle    int	// 監控周期
}
           
/*
	定時 plugin
 */
type PluginScheduler struct {
	Ticker *time.Ticker  // 定時器
	Plugin *Plugin  // plugin
	Quit   chan struct{}  // 接收停止定時任務信号的通道
}
           

Plugin的增删改查十分簡單

/*
	停止不再使用的 plugin,并從緩存的 plugins 表中删除
 */
func DelNoUsePlugins(newPlugins map[string]*Plugin) {
	for currKey, currPlugin := range Plugins {
		newPlugin, ok := newPlugins[currKey]
		if !ok || currPlugin.MTime != newPlugin.MTime {  // 如果從 hbs 拉取的新 plugins 清單裡沒有該 plugin,或檔案修改時間不同,删除老 plugin
			deletePlugin(currKey)
		}
	}
}

/*
	在緩存的 plugins 表中增加新 plugin,并啟動定時執行 plugin
 */
func AddNewPlugins(newPlugins map[string]*Plugin) {
	for fpath, newPlugin := range newPlugins {
		if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {  // 如果該 plugin 已經存在,且檔案修改時間符合,跳過添加
			continue
		}

		Plugins[fpath] = newPlugin
		sch := NewPluginScheduler(newPlugin)  // 生成定時 plugin
		PluginsWithScheduler[fpath] = sch
		sch.Schedule()  // 啟動定時執行
	}
}

/*
	停止所有 plugin,清空緩存 plugins 表
 */
func ClearAllPlugins() {
	for k := range Plugins {
		deletePlugin(k)
	}
}

/*
	停止 plugin
 */
func deletePlugin(key string) {
	v, ok := PluginsWithScheduler[key]
	if ok {
		v.Stop()  // 停止定時任務
		delete(PluginsWithScheduler, key)  // 從緩存中删除
	}
	delete(Plugins, key)  // 從緩存中删除
}
           

PluginScheduler主要是定時執行任務,就一個定時器加一個停止通道,每個Plugin對應一個單獨的goroutine

/*
	構造函數
 */
func NewPluginScheduler(p *Plugin) *PluginScheduler {
	scheduler := PluginScheduler{Plugin: p}
	scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second)  // 定時器
	scheduler.Quit = make(chan struct{})
	return &scheduler
}

/*
	啟動定時任務
 */
func (this *PluginScheduler) Schedule() {
	go func() {
		for {
			select {
			case <-this.Ticker.C:
				PluginRun(this.Plugin)  // 定時任務
			case <-this.Quit:
				this.Ticker.Stop()  // 收到停止信号,停止定時器,終止任務傳回
				return
			}
		}
	}()
}

/*
	停止任務
 */
func (this *PluginScheduler) Stop() {
	close(this.Quit)  // 關閉接收停止信号的通道
}

/*
	運作一次 plugin 腳本
 */
func PluginRun(plugin *Plugin) {

	timeout := plugin.Cycle*1000 - 500  // 任務逾時時間,在下一次定時任務開始前 0.5 秒
	fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath)

	// 檔案不存在報錯
	if !file.IsExist(fpath) {
		log.Println("no such plugin:", fpath)
		return
	}

	debug := g.Config().Debug
	if debug {
		log.Println(fpath, "running...")
	}

	// 執行腳本
	cmd := exec.Command(fpath)
	var stdout bytes.Buffer
	cmd.Stdout = &stdout
	var stderr bytes.Buffer
	cmd.Stderr = &stderr
	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
	err := cmd.Start()
	if err != nil {
		log.Printf("[ERROR] plugin start fail, error: %s\n", err)
		return
	}
	if debug {
		log.Println("plugin started:", fpath)
	}

	err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond)  // 執行指令

	errStr := stderr.String()
	if errStr != "" {
		logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log")
		if _, err = file.WriteString(logFile, errStr); err != nil {
			log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err)
		}
	}

	// 逾時報錯
	if isTimeout {
		if err == nil && debug {
			log.Println("[INFO] timeout and kill process", fpath, "successfully")
		}

		if err != nil {
			log.Println("[ERROR] kill process", fpath, "occur error:", err)
		}

		return
	}

	if err != nil {
		log.Println("[ERROR] exec plugin", fpath, "fail. error:", err)
		return
	}

	// 執行成功且沒有輸出,在 debug 模式下會記錄
	data := stdout.Bytes()
	if len(data) == 0 {
		if debug {
			log.Println("[DEBUG] stdout of", fpath, "is blank")
		}
		return
	}

	// 輸出轉 MetricValue
	var metrics []*model.MetricValue
	err = json.Unmarshal(data, &metrics)
	if err != nil {
		log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String())
		return
	}

	// 發送給 transfer
	g.SendToTransfer(metrics)
}
           

3.4 從hbs同步内建Metric

嘛是内建Metric呢,比如最熟悉的net.port.listen就是open-falcon内建的一個metric,每個周期agent都去hbs擷取使用者配置好的内建metric,從tag中抽取出要監控的具體資訊,比如net.port.listen/port=8080,就會抽出8080這個端口,緩存起來,定時采集資料上報

内建Metric都是在Template那裡配置後,由hbs發送給用戶端的,agent和hbs的互動大緻和同步plugin的互動一樣,都是rpc

這裡一共有四種内建metric,url.check.health是判斷url的響應是否逾時的,net.port.listen監聽端口,du.bs監控某個目錄的磁盤,proc.num監控程序名或指令包含某些字元串的程序總數

/*
	從 hbs 同步 BuiltinMetrics
 */
func SyncBuiltinMetrics() {
	if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
		go syncBuiltinMetrics()
	}
}

/*
	同步 BuiltinMetrics
 */
func syncBuiltinMetrics() {

	var timestamp int64 = -1
	var checksum string = "nil"

	duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second

	for {
		time.Sleep(duration)

		var ports = []int64{}
		var paths = []string{}
		var procs = make(map[string]map[int]string)
		var urls = make(map[string]string)

		hostname, err := g.Hostname()
		if err != nil {
			continue
		}

		req := model.AgentHeartbeatRequest{
			Hostname: hostname,
			Checksum: checksum,
		}

		var resp model.BuiltinMetricResponse
		err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)  // rpc 調用 hbs 的 Agent.BuiltinMetrics 方法
		if err != nil {
			log.Println("ERROR:", err)
			continue
		}

		// 通過時間戳和校驗和判斷響應是不是最新的
		if resp.Timestamp <= timestamp {
			continue
		}

		if resp.Checksum == checksum {
			continue
		}

		timestamp = resp.Timestamp
		checksum = resp.Checksum

		for _, metric := range resp.Metrics {

			if metric.Metric == g.URL_CHECK_HEALTH {  // url 檢測
				arr := strings.Split(metric.Tags, ",")
				if len(arr) != 2 {
					continue
				}
				url := strings.Split(arr[0], "=")
				if len(url) != 2 {
					continue
				}
				stime := strings.Split(arr[1], "=")
				if len(stime) != 2 {
					continue
				}
				if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil {
					urls[url[1]] = stime[1]
				} else {
					log.Println("metric ParseInt timeout failed:", err)
				}
			}

			if metric.Metric == g.NET_PORT_LISTEN {  // 端口檢測
				arr := strings.Split(metric.Tags, "=")
				if len(arr) != 2 {
					continue
				}

				if port, err := strconv.ParseInt(arr[1], 10, 64); err == nil {
					ports = append(ports, port)
				} else {
					log.Println("metrics ParseInt failed:", err)
				}

				continue
			}

			if metric.Metric == g.DU_BS {
				arr := strings.Split(metric.Tags, "=")
				if len(arr) != 2 {
					continue
				}

				paths = append(paths, strings.TrimSpace(arr[1]))
				continue
			}

			if metric.Metric == g.PROC_NUM {  // 程序數
				arr := strings.Split(metric.Tags, ",")

				tmpMap := make(map[int]string)

				for i := 0; i < len(arr); i++ {
					if strings.HasPrefix(arr[i], "name=") {
						tmpMap[1] = strings.TrimSpace(arr[i][5:])
					} else if strings.HasPrefix(arr[i], "cmdline=") {
						tmpMap[2] = strings.TrimSpace(arr[i][8:])
					}
				}

				procs[metric.Tags] = tmpMap
			}
		}

		// 緩存配置
		g.SetReportUrls(urls)
		g.SetReportPorts(ports)
		g.SetReportProcs(procs)
		g.SetDuPaths(paths)

	}
}
           

3.5 同步可信任IP

這個就更簡單了,沒啥新東西

/*
	同步可信任 IP 位址
 */
func SyncTrustableIps() {
	if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
		go syncTrustableIps()
	}
}

/*
	同步可信任 IP 
 */
func syncTrustableIps() {

	duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second

	for {
		time.Sleep(duration)

		var ips string
		err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips)  // rpc 調用 hbs 的 Agent.TrustableIps 方法
		if err != nil {
			log.Println("ERROR: call Agent.TrustableIps fail", err)
			continue
		}

		g.SetTrustableIps(ips)  // 緩存
	}
}
           

3.6 采集監控資料

重點了,終于到了最關鍵的采集監控資料,其實裡面這些采集函數都大同小異,都是通過第三方庫nux讀取一些系統檔案,再做一些計算百分比之類的簡單處理

collector.go隻是一個殼,定義了一些封裝的邏輯,采集資料的函數都儲存在funcs.go裡面的Mapper裡

/*
	采集資料
 */
func Collect() {

	if !g.Config().Transfer.Enabled {
		return
	}

	if len(g.Config().Transfer.Addrs) == 0 {
		return
	}

	for _, v := range funcs.Mappers {
		go collect(int64(v.Interval), v.Fs)
	}
}

/*
	采集單組資料
 */
func collect(sec int64, fns []func() []*model.MetricValue) {
	t := time.NewTicker(time.Second * time.Duration(sec))  // 定時器
	defer t.Stop()
	for {
		<-t.C

		hostname, err := g.Hostname()
		if err != nil {
			continue
		}

		mvs := []*model.MetricValue{}  // 儲存采集後的 metric
		ignoreMetrics := g.Config().IgnoreMetrics  // 配置的ignore metric

		for _, fn := range fns {
			items := fn()  // 調用函數,擷取監控值
			if items == nil {
				continue
			}

			if len(items) == 0 {
				continue
			}

			for _, mv := range items {
				if b, ok := ignoreMetrics[mv.Metric]; ok && b {  // 如果配置了忽略,跳過資料采集
					continue
				} else {
					mvs = append(mvs, mv)
				}
			}
		}
		
		// 設定時間戳、endpoint、step
		now := time.Now().Unix()
		for j := 0; j < len(mvs); j++ {
			mvs[j].Step = sec
			mvs[j].Endpoint = hostname
			mvs[j].Timestamp = now
		}
		
		// 發送給 transfer
		g.SendToTransfer(mvs)

	}
}
           

看一下發送transfer的代碼,如果配置了多個transfer,他會随機選取位址進行嘗試發送,直到發送成功為止

/*
	監控資料發送給 transfer
 */
func SendToTransfer(metrics []*model.MetricValue) {
	if len(metrics) == 0 {
		return
	}

	dt := Config().DefaultTags  // 讀取配置的預設 tag
	if len(dt) > 0 {
		var buf bytes.Buffer
		default_tags_list := []string{}
		for k, v := range dt {
			buf.Reset()
			buf.WriteString(k)
			buf.WriteString("=")
			buf.WriteString(v)
			default_tags_list = append(default_tags_list, buf.String())
		}
		default_tags := strings.Join(default_tags_list, ",")  // 生成預設 tag 的字元串

		// 給要上報的 metric 添加預設 tag
		for i, x := range metrics {
			buf.Reset()
			if x.Tags == "" {
				metrics[i].Tags = default_tags
			} else {
				buf.WriteString(metrics[i].Tags)
				buf.WriteString(",")
				buf.WriteString(default_tags)
				metrics[i].Tags = buf.String()
			}
		}
	}

	debug := Config().Debug

	if debug {
		log.Printf("=> <Total=%d> %v\n", len(metrics), metrics[0])  // debug 模式下,記錄每次上報 metric 的個數并記錄第一個 metric
	}

	var resp model.TransferResponse
	SendMetrics(metrics, &resp)  // 發送給 transfer

	if debug {
		log.Println("<=", &resp)
	}
}
           
var (
	TransferClientsLock *sync.RWMutex                   = new(sync.RWMutex)
	TransferClients     map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{}
)

/*
	發送資料給 transfer
 */
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
	rand.Seed(time.Now().UnixNano())
	for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {  // 随機在配置的 transfer 位址中選擇一個嘗試發送,失敗則嘗試下一個
		addr := Config().Transfer.Addrs[i]

		c := getTransferClient(addr)
		if c == nil {
			c = initTransferClient(addr)
		}

		if updateMetrics(c, metrics, resp) {  // 資料發送成功就退出嘗試
			break
		}
	}
}

/*
	初始化 transfer 連接配接用戶端
 */
func initTransferClient(addr string) *SingleConnRpcClient {
	var c *SingleConnRpcClient = &SingleConnRpcClient{
		RpcServer: addr,
		Timeout:   time.Duration(Config().Transfer.Timeout) * time.Millisecond,
	}
	TransferClientsLock.Lock()
	defer TransferClientsLock.Unlock()
	TransferClients[addr] = c

	return c
}

/*
	rpc 調用 transfer 的 Transfer.Update 方法
 */
func updateMetrics(c *SingleConnRpcClient, metrics []*model.MetricValue, resp *model.TransferResponse) bool {
	err := c.Call("Transfer.Update", metrics, resp)
	if err != nil {
		log.Println("call Transfer.Update fail:", c, err)
		return false
	}
	return true
}

/*
	擷取 transfer 連接配接用戶端
 */
func getTransferClient(addr string) *SingleConnRpcClient {
	TransferClientsLock.RLock()
	defer TransferClientsLock.RUnlock()

	if c, ok := TransferClients[addr]; ok {
		return c
	}
	return nil
}
           

要采集的監控名額都在這裡了,一個一個函數看

var Mappers []FuncsAndInterval

/*
	收集資料的函數和發送資料間隔映射
 */
func BuildMappers() {
	interval := g.Config().Transfer.Interval  // 向 transfer 發送資料間隔
	// 擷取監控資料函數、與發送資料間隔映射
	Mappers = []FuncsAndInterval{
		{
			Fs: []func() []*model.MetricValue{
				AgentMetrics,
				CpuMetrics,
				NetMetrics,
				KernelMetrics,
				LoadAvgMetrics,
				MemMetrics,
				DiskIOMetrics,
				IOStatsMetrics,
				NetstatMetrics,
				ProcMetrics,
				UdpMetrics,
			},
			Interval: interval,
		},
		{
			Fs: []func() []*model.MetricValue{
				DeviceMetrics,
			},
			Interval: interval,
		},
		{
			Fs: []func() []*model.MetricValue{
				PortMetrics,
				SocketStatSummaryMetrics,
			},
			Interval: interval,
		},
		{
			Fs: []func() []*model.MetricValue{
				DuMetrics,
			},
			Interval: interval,
		},
		{
			Fs: []func() []*model.MetricValue{
				UrlMetrics,
			},
			Interval: interval,
		},
		{
			Fs: []func() []*model.MetricValue{
				GpuMetrics,
			},
			Interval: interval,
		},
	}
}
           

agent以前看過了,很簡單

/*
	擷取 agent 的 MetricValue
	傳回值固定為 1,表示用戶端正常工作
 */
func AgentMetrics() []*model.MetricValue {
	return []*model.MetricValue{GaugeValue("agent.alive", 1)}
}
           

3.1說過,linux記錄cpu的資訊記錄的是一個累計的值,是以為了擷取每分鐘cpu的資料,必須計算內插補點,是以才有了3.1中的記錄曆史資料的操作。因為有procStatHistory這個共用變量,每個函數基本都加了鎖,防止每個周期之間的資料混淆,造成錯誤,比如計算cpu.idle/cpu.total,擷取了cpu.total後UpdateCpuStat()函數被調用,更新了數值再擷取cpu.idle,出來的結果就不對

cpu的各種時間百分比計算都大同小異,上下文切換這個函數沒有處理數值,直接弄成了counter資料類型,之後會計算為speed

const (
	historyCount int = 2  // 保留多少輪曆史資料
)

var (
	procStatHistory [historyCount]*nux.ProcStat  // 存放 cpu 曆史資料
	psLock          = new(sync.RWMutex)  // 鎖
)

/*
	更新 cpu 狀态
 */
func UpdateCpuStat() error {
	// 讀取 /proc/stat 檔案
	ps, err := nux.CurrentProcStat()
	if err != nil {
		return err
	}

	psLock.Lock()
	defer psLock.Unlock()
	// 抛棄過期的曆史資料,給新資料騰出位置
	for i := historyCount - 1; i > 0; i-- {
		procStatHistory[i] = procStatHistory[i-1]
	}

	procStatHistory[0] = ps  // 儲存最新資料
	return nil
}

/*
 	cpu total 最新一次的變化內插補點
 */
func deltaTotal() uint64 {
	if procStatHistory[1] == nil {
		return 0
	}
	return procStatHistory[0].Cpu.Total - procStatHistory[1].Cpu.Total
}

/*
	計算 cpu idle 時間百分比
 */
func CpuIdle() float64 {
	psLock.RLock()
	defer psLock.RUnlock()
	dt := deltaTotal()
	if dt == 0 {
		return 0.0
	}
	invQuotient := 100.00 / float64(dt)
	return float64(procStatHistory[0].Cpu.Idle-procStatHistory[1].Cpu.Idle) * invQuotient
}

/*
	計算 cpu user 時間百分比
 */
func CpuUser() float64 {
	psLock.RLock()
	defer psLock.RUnlock()
	dt := deltaTotal()
	if dt == 0 {
		return 0.0
	}
	invQuotient := 100.00 / float64(dt)
	return float64(procStatHistory[0].Cpu.User-procStatHistory[1].Cpu.User) * invQuotient
}

/*
	計算 cpu nice 時間百分比
 */
func CpuNice() float64 {
	psLock.RLock()
	defer psLock.RUnlock()
	dt := deltaTotal()
	if dt == 0 {
		return 0.0
	}
	invQuotient := 100.00 / float64(dt)
	return float64(procStatHistory[0].Cpu.Nice-procStatHistory[1].Cpu.Nice) * invQuotient
}

/*
	計算 cpu system 時間百分比
 */
func CpuSystem() float64 {
	psLock.RLock()
	defer psLock.RUnlock()
	dt := deltaTotal()
	if dt == 0 {
		return 0.0
	}
	invQuotient := 100.00 / float64(dt)
	return float64(procStatHistory[0].Cpu.System-procStatHistory[1].Cpu.System) * invQuotient
}

/*
	計算 cpu io wait 時間百分比
 */
func CpuIowait() float64 {
	psLock.RLock()
	defer psLock.RUnlock()
	dt := deltaTotal()
	if dt == 0 {
		return 0.0
	}
	invQuotient := 100.00 / float64(dt)
	return float64(procStatHistory[0].Cpu.Iowait-procStatHistory[1].Cpu.Iowait) * invQuotient
}

/*
	計算 cpu 中斷時間百分比
 */
func CpuIrq() float64 {
	psLock.RLock()
	defer psLock.RUnlock()
	dt := deltaTotal()
	if dt == 0 {
		return 0.0
	}
	invQuotient := 100.00 / float64(dt)
	return float64(procStatHistory[0].Cpu.Irq-procStatHistory[1].Cpu.Irq) * invQuotient
}

/*
	計算 cpu 軟中斷時間百分比
 */
func CpuSoftIrq() float64 {
	psLock.RLock()
	defer psLock.RUnlock()
	dt := deltaTotal()
	if dt == 0 {
		return 0.0
	}
	invQuotient := 100.00 / float64(dt)
	return float64(procStatHistory[0].Cpu.SoftIrq-procStatHistory[1].Cpu.SoftIrq) * invQuotient
}

/*
	計算 cpu  steal 時間百分比
 */
func CpuSteal() float64 {
	psLock.RLock()
	defer psLock.RUnlock()
	dt := deltaTotal()
	if dt == 0 {
		return 0.0
	}
	invQuotient := 100.00 / float64(dt)
	return float64(procStatHistory[0].Cpu.Steal-procStatHistory[1].Cpu.Steal) * invQuotient
}

/*
	計算 cpu  guest 時間百分比
 */
func CpuGuest() float64 {
	psLock.RLock()
	defer psLock.RUnlock()
	dt := deltaTotal()
	if dt == 0 {
		return 0.0
	}
	invQuotient := 100.00 / float64(dt)
	return float64(procStatHistory[0].Cpu.Guest-procStatHistory[1].Cpu.Guest) * invQuotient
}

/*
	計算 cpu 上下文切換次數
 */
func CurrentCpuSwitches() uint64 {
	psLock.RLock()
	defer psLock.RUnlock()
	return procStatHistory[0].Ctxt
}

/*
	檢測 cpu 資料是否已經準備完畢(至少需要兩個周期的曆史資料,才能計算內插補點)
 */
func CpuPrepared() bool {
	psLock.RLock()
	defer psLock.RUnlock()
	return procStatHistory[1] != nil
}

/*
	擷取 cpu 的 MetricValue
 */
func CpuMetrics() []*model.MetricValue {
	// 檢查曆史資料是否已經有了兩個周期,足夠計算內插補點
	if !CpuPrepared() {
		return []*model.MetricValue{}
	}

	// 擷取各項 cpu 監控名額
	cpuIdleVal := CpuIdle()
	idle := GaugeValue("cpu.idle", cpuIdleVal)
	busy := GaugeValue("cpu.busy", 100.0-cpuIdleVal)
	user := GaugeValue("cpu.user", CpuUser())
	nice := GaugeValue("cpu.nice", CpuNice())
	system := GaugeValue("cpu.system", CpuSystem())
	iowait := GaugeValue("cpu.iowait", CpuIowait())
	irq := GaugeValue("cpu.irq", CpuIrq())
	softirq := GaugeValue("cpu.softirq", CpuSoftIrq())
	steal := GaugeValue("cpu.steal", CpuSteal())
	guest := GaugeValue("cpu.guest", CpuGuest())
	switches := CounterValue("cpu.switches", CurrentCpuSwitches())  // 利用了 counter 資料類型,之後會被計算為 speed
	return []*model.MetricValue{idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches}
}
           

網卡的資訊類似cpu的擷取方式,也是調用了第三方庫,通過系統檔案/proc/net/dev和/sys/class/net/%s/speed擷取網卡資訊

這是我虛拟機的/proc/net/dev,一行對應一個網卡

open-falcon源碼閱讀(二)——agent源碼閱讀1 概覽2 建立映射3 定時任務

左邊的8清單示接收,分别是系統啟動以來,接收的總位元組數、包數、錯誤包數、丢包數、FIFO緩沖區錯誤數、分組幀錯誤數、壓縮資料包數、多點傳播幀數;右邊的8清單示發送,大部分相同,不同的colls表示沖突數、carrier表示載波損耗的數量(說實話,已經完全無法了解了)

我虛拟機的/sys/class/net/eth0/speed,好吧就一個數,查了一下說是網卡最新的連接配接速度,差不多就是帶寬啦,機關是Mbits/sec

open-falcon源碼閱讀(二)——agent源碼閱讀1 概覽2 建立映射3 定時任務

第三方庫還提供了一些處理過後的名額,比如出入流量占帶寬百分比等

/*
	網卡監控資料采集
 */
func NetMetrics() []*model.MetricValue {
	return CoreNetMetrics(g.Config().Collector.IfacePrefix)  // 擷取配置的要監控網卡的字首
}

/*
	網卡監控
 */
func CoreNetMetrics(ifacePrefix []string) []*model.MetricValue {

	netIfs, err := nux.NetIfs(ifacePrefix)  // 通過系統檔案 /proc/net/dev 和 /sys/class/net/%s/speed 擷取網卡資訊
	if err != nil {
		log.Println(err)
		return []*model.MetricValue{}
	}

	cnt := len(netIfs)
	ret := make([]*model.MetricValue, cnt*26)  // 網卡的資訊全放在一個一維數組裡

	for idx, netIf := range netIfs {
		iface := "iface=" + netIf.Iface
		ret[idx*26+0] = CounterValue("net.if.in.bytes", netIf.InBytes, iface)
		ret[idx*26+1] = CounterValue("net.if.in.packets", netIf.InPackages, iface)
		ret[idx*26+2] = CounterValue("net.if.in.errors", netIf.InErrors, iface)
		ret[idx*26+3] = CounterValue("net.if.in.dropped", netIf.InDropped, iface)
		ret[idx*26+4] = CounterValue("net.if.in.fifo.errs", netIf.InFifoErrs, iface)
		ret[idx*26+5] = CounterValue("net.if.in.frame.errs", netIf.InFrameErrs, iface)
		ret[idx*26+6] = CounterValue("net.if.in.compressed", netIf.InCompressed, iface)
		ret[idx*26+7] = CounterValue("net.if.in.multicast", netIf.InMulticast, iface)
		ret[idx*26+8] = CounterValue("net.if.out.bytes", netIf.OutBytes, iface)
		ret[idx*26+9] = CounterValue("net.if.out.packets", netIf.OutPackages, iface)
		ret[idx*26+10] = CounterValue("net.if.out.errors", netIf.OutErrors, iface)
		ret[idx*26+11] = CounterValue("net.if.out.dropped", netIf.OutDropped, iface)
		ret[idx*26+12] = CounterValue("net.if.out.fifo.errs", netIf.OutFifoErrs, iface)
		ret[idx*26+13] = CounterValue("net.if.out.collisions", netIf.OutCollisions, iface)
		ret[idx*26+14] = CounterValue("net.if.out.carrier.errs", netIf.OutCarrierErrs, iface)
		ret[idx*26+15] = CounterValue("net.if.out.compressed", netIf.OutCompressed, iface)
		ret[idx*26+16] = CounterValue("net.if.total.bytes", netIf.TotalBytes, iface)  // 第三方庫計算的,發送和接收的和
		ret[idx*26+17] = CounterValue("net.if.total.packets", netIf.TotalPackages, iface)
		ret[idx*26+18] = CounterValue("net.if.total.errors", netIf.TotalErrors, iface)
		ret[idx*26+19] = CounterValue("net.if.total.dropped", netIf.TotalDropped, iface)
		ret[idx*26+20] = GaugeValue("net.if.speed.bits", netIf.SpeedBits, iface)  // 網卡最新的連接配接速度,即帶寬,機關是 Mbits/sec
		ret[idx*26+21] = CounterValue("net.if.in.percent", netIf.InPercent, iface)  // 入流量占帶寬百分比
		ret[idx*26+22] = CounterValue("net.if.out.percent", netIf.OutPercent, iface)  // 出流量占帶寬百分比
		ret[idx*26+23] = CounterValue("net.if.in.bits", netIf.InBytes*8, iface)  // 計算 bit 數
		ret[idx*26+24] = CounterValue("net.if.out.bits", netIf.OutBytes*8, iface)
		ret[idx*26+25] = CounterValue("net.if.total.bits", netIf.TotalBytes*8, iface)
	}
	return ret
}
           

kernel的資訊采集很簡單,也是調第三方庫,打開的系統檔案基本都很簡單,最複雜的就是/proc/sys/fs/file-nr,第一清單示系統已經配置設定并使用的檔案描述符數,第二清單示已經配置設定但并未使用的檔案描述符數,第三清單示系統支援的最大檔案描述符數

檔案描述符是一個程序打開檔案時建立的,同一個檔案可以有不同的檔案描述符,是以這幾項監控類似于打開檔案的數量

open-falcon源碼閱讀(二)——agent源碼閱讀1 概覽2 建立映射3 定時任務
/*
	核心監控資料采集
 */
func KernelMetrics() (L []*model.MetricValue) {

	maxFiles, err := nux.KernelMaxFiles()  // 通過 /proc/sys/fs/file-max 擷取系統允許的最大檔案描述符數
	if err != nil {
		log.Println(err)
		return
	}

	L = append(L, GaugeValue("kernel.maxfiles", maxFiles))

	maxProc, err := nux.KernelMaxProc()  // 通過 /proc/sys/kernel/pid_max 擷取系統允許最大程序數
	if err != nil {
		log.Println(err)
		return
	}

	L = append(L, GaugeValue("kernel.maxproc", maxProc))

	allocateFiles, err := nux.KernelAllocateFiles()  // 通過 /proc/sys/fs/file-nr 擷取系統目前使用的檔案描述符數
	if err != nil {
		log.Println(err)
		return
	}

	L = append(L, GaugeValue("kernel.files.allocated", allocateFiles))
	L = append(L, GaugeValue("kernel.files.left", maxFiles-allocateFiles))  // 系統還可以使用多少檔案描述符
	return
}
           

cpu負載是通過/proc/loadavg采集的,每列分别表示1、5、15分鐘内在運作隊列中等待或等待磁盤IO的程序、正在運作的程序數占程序總數百分比、最近運作的程序ID号

open-falcon源碼閱讀(二)——agent源碼閱讀1 概覽2 建立映射3 定時任務

代碼很短

/*
	cpu 負載監控資料采集
 */
func LoadAvgMetrics() []*model.MetricValue {
	load, err := nux.LoadAvg()
	if err != nil {
		log.Println(err)
		return nil
	}

	return []*model.MetricValue{
		GaugeValue("load.1min", load.Avg1min),
		GaugeValue("load.5min", load.Avg5min),
		GaugeValue("load.15min", load.Avg15min),
	}

}
           

記憶體看的檔案是/proc/meminfo,一行是一個參數,采集的名額淺顯易懂,就不多講

/*
	memory 監控資料采集
 */
func MemMetrics() []*model.MetricValue {
	m, err := nux.MemInfo()
	if err != nil {
		log.Println(err)
		return nil
	}

	memFree := m.MemFree + m.Buffers + m.Cached  // 空閑記憶體總量等于空閑記憶體 + 給檔案的緩沖大小 + 高速緩沖存儲器大小
	if m.MemAvailable > 0 {  // 如果能直接從 /proc/meminfo 獲得空閑記憶體總量,使用檔案中的值
		memFree = m.MemAvailable
	}
	memUsed := m.MemTotal - memFree  // 計算使用量
	
	// 計算百分比
	pmemFree := 0.0
	pmemUsed := 0.0
	if m.MemTotal != 0 {
		pmemFree = float64(memFree) * 100.0 / float64(m.MemTotal)
		pmemUsed = float64(memUsed) * 100.0 / float64(m.MemTotal)
	}

	pswapFree := 0.0
	pswapUsed := 0.0
	if m.SwapTotal != 0 {
		pswapFree = float64(m.SwapFree) * 100.0 / float64(m.SwapTotal)
		pswapUsed = float64(m.SwapUsed) * 100.0 / float64(m.SwapTotal)
	}

	return []*model.MetricValue{
		GaugeValue("mem.memtotal", m.MemTotal),
		GaugeValue("mem.memused", memUsed),
		GaugeValue("mem.memfree", memFree),
		GaugeValue("mem.swaptotal", m.SwapTotal),
		GaugeValue("mem.swapused", m.SwapUsed),
		GaugeValue("mem.swapfree", m.SwapFree),
		GaugeValue("mem.memfree.percent", pmemFree),
		GaugeValue("mem.memused.percent", pmemUsed),
		GaugeValue("mem.swapfree.percent", pswapFree),
		GaugeValue("mem.swapused.percent", pswapUsed),
	}

}
           

磁盤IO監控有兩個函數,DiskIOMetrics和IOStatsMetrics,都在一個檔案裡,磁盤資訊是從/proc/diskstats這個檔案讀的,當然并不是所有磁盤的資訊都會讀出來,必須是sd、vd、xvd、fio、nvme開頭的裝置名、且長度符合規定的裝置才會被監控,詳見函數ShouldHandleDevice

/*
	讀完成次數
 */
func IOReadRequests(arr [2]*nux.DiskStats) uint64 {
	return arr[0].ReadRequests - arr[1].ReadRequests
}

/*
	合并讀完成次數
 */
func IOReadMerged(arr [2]*nux.DiskStats) uint64 {
	return arr[0].ReadMerged - arr[1].ReadMerged
}

/*
	讀扇區次數
 */
func IOReadSectors(arr [2]*nux.DiskStats) uint64 {
	return arr[0].ReadSectors - arr[1].ReadSectors
}

/*
	讀操作花費毫秒
 */
func IOMsecRead(arr [2]*nux.DiskStats) uint64 {
	return arr[0].MsecRead - arr[1].MsecRead
}

/*
	寫完成次數
 */
func IOWriteRequests(arr [2]*nux.DiskStats) uint64 {
	return arr[0].WriteRequests - arr[1].WriteRequests
}

/*
	合并寫完成次數
 */
func IOWriteMerged(arr [2]*nux.DiskStats) uint64 {
	return arr[0].WriteMerged - arr[1].WriteMerged
}

/*
	寫扇區次數
 */
func IOWriteSectors(arr [2]*nux.DiskStats) uint64 {
	return arr[0].WriteSectors - arr[1].WriteSectors
}

/*
	寫操作花費毫秒
 */
func IOMsecWrite(arr [2]*nux.DiskStats) uint64 {
	return arr[0].MsecWrite - arr[1].MsecWrite
}

/*
	io 操作花費毫秒
 */
func IOMsecTotal(arr [2]*nux.DiskStats) uint64 {
	return arr[0].MsecTotal - arr[1].MsecTotal
}

/*
	io 操作花費權重毫秒
 */
func IOMsecWeightedTotal(arr [2]*nux.DiskStats) uint64 {
	return arr[0].MsecWeightedTotal - arr[1].MsecWeightedTotal
}

/*
	計算兩次曆史資料的時間間隔有多少秒
 */
func TS(arr [2]*nux.DiskStats) uint64 {
	return uint64(arr[0].TS.Sub(arr[1].TS).Nanoseconds() / 1000000)
}

/*
	根據裝置名取出對應的一組曆史資料,傳遞給相應函數
 */
func IODelta(device string, f func([2]*nux.DiskStats) uint64) uint64 {
	val, ok := diskStatsMap[device]
	if !ok {
		return 0
	}

	if val[1] == nil {
		return 0
	}
	return f(val)
}

/*
	采集磁盤IO監控資料
 */
func DiskIOMetrics() (L []*model.MetricValue) {

	dsList, err := nux.ListDiskStats()
	if err != nil {
		log.Println(err)
		return
	}

	for _, ds := range dsList {
		if !ShouldHandleDevice(ds.Device) {  // 根據裝置名的字首判定是否要進行監控
			continue
		}

		device := "device=" + ds.Device

		L = append(L, CounterValue("disk.io.read_requests", ds.ReadRequests, device))
		L = append(L, CounterValue("disk.io.read_merged", ds.ReadMerged, device))
		L = append(L, CounterValue("disk.io.read_sectors", ds.ReadSectors, device))
		L = append(L, CounterValue("disk.io.msec_read", ds.MsecRead, device))
		L = append(L, CounterValue("disk.io.write_requests", ds.WriteRequests, device))
		L = append(L, CounterValue("disk.io.write_merged", ds.WriteMerged, device))
		L = append(L, CounterValue("disk.io.write_sectors", ds.WriteSectors, device))
		L = append(L, CounterValue("disk.io.msec_write", ds.MsecWrite, device))
		L = append(L, CounterValue("disk.io.ios_in_progress", ds.IosInProgress, device))
		L = append(L, CounterValue("disk.io.msec_total", ds.MsecTotal, device))
		L = append(L, CounterValue("disk.io.msec_weighted_total", ds.MsecWeightedTotal, device))
	}
	return
}

/*
	采集磁盤IO監控資料
 */
func IOStatsMetrics() (L []*model.MetricValue) {
	dsLock.RLock()
	defer dsLock.RUnlock()

	for device := range diskStatsMap {
		if !ShouldHandleDevice(device) { // 根據裝置名的字首判定是否要進行監控
			continue
		}

		tags := "device=" + device
		rio := IODelta(device, IOReadRequests)
		wio := IODelta(device, IOWriteRequests)
		delta_rsec := IODelta(device, IOReadSectors)
		delta_wsec := IODelta(device, IOWriteSectors)
		ruse := IODelta(device, IOMsecRead)
		wuse := IODelta(device, IOMsecWrite)
		use := IODelta(device, IOMsecTotal)
		n_io := rio + wio
		avgrq_sz := 0.0
		await := 0.0
		svctm := 0.0
		if n_io != 0 {
			avgrq_sz = float64(delta_rsec+delta_wsec) / float64(n_io)  // 平均每次 IO 操作讀寫扇區數
			await = float64(ruse+wuse) / float64(n_io) // 平均每次 IO 操作等待時間
			svctm = float64(use) / float64(n_io)  // 平均每次 IO 操作服務時間
		}

		duration := IODelta(device, TS)

		L = append(L, GaugeValue("disk.io.read_bytes", float64(delta_rsec)*512.0, tags))
		L = append(L, GaugeValue("disk.io.write_bytes", float64(delta_wsec)*512.0, tags))
		L = append(L, GaugeValue("disk.io.avgrq_sz", avgrq_sz, tags))
		L = append(L, GaugeValue("disk.io.avgqu-sz", float64(IODelta(device, IOMsecWeightedTotal))/1000.0, tags))  // 平均 IO 隊列長度,即 IO 權重時間(以秒為機關)
		L = append(L, GaugeValue("disk.io.await", await, tags))
		L = append(L, GaugeValue("disk.io.svctm", svctm, tags))
		tmp := float64(use) * 100.0 / float64(duration)
		if tmp > 100.0 {
			tmp = 100.0
		}
		L = append(L, GaugeValue("disk.io.util", tmp, tags))
	}

	return
}

/*
	判斷該裝置是否要監控
 */
func ShouldHandleDevice(device string) bool {
	normal := len(device) == 3 && (strings.HasPrefix(device, "sd") || strings.HasPrefix(device, "vd"))
	aws := len(device) >= 4 && strings.HasPrefix(device, "xvd")
	flash := len(device) >= 4 && (strings.HasPrefix(device, "fio") || strings.HasPrefix(device, "nvme"))
	return normal || aws || flash
}
           

網絡監控是/proc/net/netstat和/proc/net/snmp這兩個檔案,NetstatMetrics和UdpMetrics兩個函數的代碼很簡單,就是用第三方庫讀取檔案,然後用counter類型把原始資料處理成speed展示出來。但是裡面的名額已經超出了我的了解範圍,這裡略過不看,對名額感興趣可以看這個文proc 檔案系統調節參數介紹

程序監控,會調用第三方庫去讀取/proc目錄下所有的數字目錄,/proc下一個數字目錄就表示對應pid号的程序資訊,這裡讀取兩個,一個是存儲程序資訊的status檔案,一個是存儲程序指令的cmdline檔案

/*
	程序監控資料采集
 */
func ProcMetrics() (L []*model.MetricValue) {

	reportProcs := g.ReportProcs()  // 讀取配置的要監控的程序
	sz := len(reportProcs)
	if sz == 0 {
		return
	}

	ps, err := nux.AllProcs()  // 讀取 /proc 下所有程序檔案的程序名、指令行
	if err != nil {
		log.Println(err)
		return
	}

	pslen := len(ps)

	for tags, m := range reportProcs {
		cnt := 0
		for i := 0; i < pslen; i++ {
			if is_a(ps[i], m) {  // 如果符合使用者設定的程序過濾規則
				cnt++  // 計數加一
			}
		}

		L = append(L, GaugeValue(g.PROC_NUM, cnt, tags))  // 符合使用者配置的程序過濾規則的程序數
	}

	return
}

/*
	根據使用者配置的程序名和指令行,判斷該程序是否比對
 */
func is_a(p *nux.Proc, m map[int]string) bool {
	// 判斷程序名或指令是否比對
	for key, val := range m {
		if key == 1 {
			// 判斷程序名是否相等
			if val != p.Name {
				return false
			}
		} else if key == 2 {
			// 判斷指令行是否包含指定字元串
			if !strings.Contains(p.Cmdline, val) {
				return false
			}
		}
	}
	return true
}
           

磁盤監控,根據agent的cfg.json裡配置的collect.mountPoint略有不同:如果完全沒有配置,open-falcon會采集所有磁盤的資料;如果配置了,隻會采集配置的這些磁盤的資料

采集資料就是調第三方庫讀取/proc/mounts下的挂載磁盤資訊,采集名額都很通俗易懂

/*
	磁盤監控資料采集
 */
func DeviceMetrics() (L []*model.MetricValue) {
	mountPoints, err := nux.ListMountPoint()  // 讀取 /proc/mounts 目錄下挂載的裝置資訊
	if err != nil {
		log.Error("collect device metrics fail:", err)
		return
	}

	var myMountPoints map[string]bool = make(map[string]bool)

	if len(g.Config().Collector.MountPoint) > 0 {
		for _, mp := range g.Config().Collector.MountPoint {  // 擷取配置的要監控的挂載裝置
			myMountPoints[mp] = true
		}
	}

	var diskTotal uint64 = 0
	var diskUsed uint64 = 0

	for idx := range mountPoints {
		fsSpec, fsFile, fsVfstype := mountPoints[idx][0], mountPoints[idx][1], mountPoints[idx][2]  // 擷取每個裝置的分區定位、挂載目錄、磁盤類型
		if len(myMountPoints) > 0 { //如果配置了 cfg.json 的 collect.mountPoint ,則隻檢測配置中的磁盤
			if _, ok := myMountPoints[fsFile]; !ok {  // 跳過未配置的磁盤,debug 模式下,如果挂載裝置未配置監控,會記錄
				log.Debug("mount point not matched with config", fsFile, "ignored.")
				continue
			}
		}

		var du *nux.DeviceUsage
		du, err = nux.BuildDeviceUsage(fsSpec, fsFile, fsVfstype)  // 擷取指定挂載裝置的資訊
		if err != nil {
			log.Error(err)
			continue
		}

		if du.BlocksAll == 0 {
			continue
		}

		diskTotal += du.BlocksAll  // 累加總容量
		diskUsed += du.BlocksUsed  // 累加使用的總容量

		tags := fmt.Sprintf("mount=%s,fstype=%s", du.FsFile, du.FsVfstype)
		L = append(L, GaugeValue("df.bytes.total", du.BlocksAll, tags))
		L = append(L, GaugeValue("df.bytes.used", du.BlocksUsed, tags))
		L = append(L, GaugeValue("df.bytes.free", du.BlocksFree, tags))
		L = append(L, GaugeValue("df.bytes.used.percent", du.BlocksUsedPercent, tags))
		L = append(L, GaugeValue("df.bytes.free.percent", du.BlocksFreePercent, tags))

		if du.InodesAll == 0 {
			continue
		}

		L = append(L, GaugeValue("df.inodes.total", du.InodesAll, tags))
		L = append(L, GaugeValue("df.inodes.used", du.InodesUsed, tags))
		L = append(L, GaugeValue("df.inodes.free", du.InodesFree, tags))
		L = append(L, GaugeValue("df.inodes.used.percent", du.InodesUsedPercent, tags))
		L = append(L, GaugeValue("df.inodes.free.percent", du.InodesFreePercent, tags))

	}

	if len(L) > 0 && diskTotal > 0 {
		L = append(L, GaugeValue("df.statistics.total", float64(diskTotal)))
		L = append(L, GaugeValue("df.statistics.used", float64(diskUsed)))
		L = append(L, GaugeValue("df.statistics.used.percent", float64(diskUsed)*100.0/float64(diskTotal)))
	}

	return
}
           

端口監控,是通過系統指令檢視的

/*
	端口監控資料采集
 */
func PortMetrics() (L []*model.MetricValue) {

	reportPorts := g.ReportPorts()  // 擷取配置的要監控的端口号
	sz := len(reportPorts)
	if sz == 0 {
		return
	}

	allTcpPorts, err := nux.TcpPorts()  // 通過指令 ss -t -l -n 列出所有正在 tcp 通信的端口号
	if err != nil {
		log.Println(err)
		return
	}

	allUdpPorts, err := nux.UdpPorts()  // 通過指令 ss -u -a -n 列出所有正在 udp 通信的端口号
	if err != nil {
		log.Println(err)
		return
	}

	for i := 0; i < sz; i++ {
		tags := fmt.Sprintf("port=%d", reportPorts[i])
		if slice.ContainsInt64(allTcpPorts, reportPorts[i]) || slice.ContainsInt64(allUdpPorts, reportPorts[i]) {  // 如果正在 tcp 或 udp 通信的端口中包含了改端口,就認為端口存活
			L = append(L, GaugeValue(g.NET_PORT_LISTEN, 1, tags))
		} else {
			L = append(L, GaugeValue(g.NET_PORT_LISTEN, 0, tags))
		}
	}

	return
}
           

socket狀态監控是通過指令ss -s獲得的,采集的是TCP開頭的那一行括号中的名額

open-falcon源碼閱讀(二)——agent源碼閱讀1 概覽2 建立映射3 定時任務
/*
	socket 狀态監控資料采集
 */
func SocketStatSummaryMetrics() (L []*model.MetricValue) {
	ssMap, err := nux.SocketStatSummary()  // 通過 ss -s 指令擷取 tcp 連接配接狀态
	if err != nil {
		log.Println(err)
		return
	}

	for k, v := range ssMap {
		L = append(L, GaugeValue("ss."+k, v))
	}

	return
}
           

目錄監控,是通過指令du -bs采集的,目錄很大的時候這個指令執行時間比較長,是以每個目錄都是開了一個goroutine去做的,并且有逾時響應的處理

/*
	目錄監控資料采集
 */
func DuMetrics() (L []*model.MetricValue) {
	paths := g.DuPaths()  // 擷取配置的要監控的目錄
	result := make(chan *model.MetricValue, len(paths))
	var wg sync.WaitGroup

	for _, path := range paths {
		wg.Add(1)
		go func(path string) {
			var err error
			defer func() {
				if err != nil {
					log.Println(err)
					result <- GaugeValue(g.DU_BS, -1, "path="+path)  // 如果采集某個目錄的監控名額出錯,則傳回 -1 作為監控值
				}
				wg.Done()
			}()

			cmd := exec.Command("du", "-bs", path)  // 執行指令 du -bs 擷取目錄大小(機關為位元組)
			var stdout bytes.Buffer
			cmd.Stdout = &stdout
			var stderr bytes.Buffer
			cmd.Stderr = &stderr
			err = cmd.Start()
			if err != nil {
				return
			}

			err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Second)  // 逾時處理
			if isTimeout {
				err = errors.New(fmt.Sprintf("exec cmd : du -bs %s timeout", path))
				return
			}

			errStr := stderr.String()  // 指令報錯處理
			if errStr != "" {
				err = errors.New(errStr)
				return
			}

			if err != nil {
				err = errors.New(fmt.Sprintf("du -bs %s failed: %s", path, err.Error()))
				return
			}

			arr := strings.Fields(stdout.String())
			if len(arr) < 2 {
				err = errors.New(fmt.Sprintf("du -bs %s failed: %s", path, "return fields < 2"))
				return
			}

			size, err := strconv.ParseUint(arr[0], 10, 64)  // 指令傳回的第一列為大小
			if err != nil {
				err = errors.New(fmt.Sprintf("cannot parse du -bs %s output", path))
				return
			}
			result <- GaugeValue(g.DU_BS, size, "path="+path)
		}(path)
	}
	wg.Wait()

	resultLen := len(result)
	for i := 0; i < resultLen; i++ {
		L = append(L, <-result)
	}
	return
}
           

url的監控就是curl一下

/*
	url 監控資料采集
 */
func UrlMetrics() (L []*model.MetricValue) {
	reportUrls := g.ReportUrls()  // 擷取要監控的 url
	sz := len(reportUrls)
	if sz == 0 {
		return
	}
	hostname, err := g.Hostname()
	if err != nil {
		hostname = "None"
	}
	for furl, timeout := range reportUrls {
		tags := fmt.Sprintf("url=%v,timeout=%v,src=%v", furl, timeout, hostname)  // 通過 curl 指令檢查 url 是否在規定時間内響應
		if ok, _ := probeUrl(furl, timeout); !ok {
			L = append(L, GaugeValue(g.URL_CHECK_HEALTH, 0, tags))
			continue
		}
		L = append(L, GaugeValue(g.URL_CHECK_HEALTH, 1, tags))
	}
	return
}

/*
	檢測 url 是否正常響應
 */
func probeUrl(furl string, timeout string) (bool, error) {
	bs, err := sys.CmdOutBytes("curl", "--max-filesize", "102400", "-I", "-m", timeout, "-o", "/dev/null", "-s", "-w", "%{http_code}", furl)
	if err != nil {
		log.Printf("probe url [%v] failed.the err is: [%v]\n", furl, err)
		return false, err
	}
	reader := bufio.NewReader(bytes.NewBuffer(bs))
	retcode, err := file.ReadLine(reader)  // 擷取響應碼
	if err != nil {
		log.Println("read retcode failed.err is:", err)
		return false, err
	}
	if strings.TrimSpace(string(retcode)) != "200" {  // 響應碼為 200  才認為正常
		log.Printf("return code [%v] is not 200.query url is [%v]", string(retcode), furl)
		return false, err
	}
	return true, err
}
           

我電腦沒有gpu,是以gpu那塊沒有仔細看,是用第三方庫gonvml采集的資料,做一些機關換算就可以了,名額都通俗易懂

3.7 http服務

雖然main函數調用的是Start(),但是這個函數隻是啟動了http,關鍵的代碼還是在http這個包的init()函數裡

先看Start(),非常簡單

/*
	開啟 http 服務
 */
func Start() {
	if !g.Config().Http.Enabled {  // 如果配置不開啟 http,傳回
		return
	}

	addr := g.Config().Http.Listen  // 擷取 http 服務啟動的位址
	if addr == "" {
		return
	}

	s := &http.Server{
		Addr:           addr,
		MaxHeaderBytes: 1 << 30,
	}

	log.Println("listening", addr)
	log.Fatalln(s.ListenAndServe())  // 開啟 http
}
           

初始化函數裡啟動了很多API,這些才是真正的後端代碼

/*
	初始化 API
 */
func init() {
	configAdminRoutes()
	configCpuRoutes()
	configDfRoutes()
	configHealthRoutes()
	configIoStatRoutes()
	configKernelRoutes()
	configMemoryRoutes()
	configPageRoutes()
	configPluginRoutes()
	configPushRoutes()
	configRunRoutes()
	configSystemRoutes()
}
           

這個是管理用的API,前面一直提到的可信任IP就是在這裡用的了,來自可信任IP的API請求可以完成某些重要操作,比如停止、加載配置

/*
	管理類 API
 */
func configAdminRoutes() {
	// 結束 agent 程式
	http.HandleFunc("/exit", func(w http.ResponseWriter, r *http.Request) {
		if g.IsTrustable(r.RemoteAddr) {  // 來自可信任 IP 的調用才能接受
			w.Write([]byte("exiting..."))  // 成功退出,傳回 exiting...
			go func() {
				time.Sleep(time.Second)
				os.Exit(0)
			}()
		} else {
			w.Write([]byte("no privilege"))  // 不是來自可信任 IP,傳回 no privilege
		}
	})

	// 重新讀取配置
	http.HandleFunc("/config/reload", func(w http.ResponseWriter, r *http.Request) {
		if g.IsTrustable(r.RemoteAddr) {
			g.ParseConfig(g.ConfigFile)
			RenderDataJson(w, g.Config())  // 傳回 config 内容
		} else {
			w.Write([]byte("no privilege"))
		}
	})

	// 傳回工作路徑
	http.HandleFunc("/workdir", func(w http.ResponseWriter, r *http.Request) {
		RenderDataJson(w, file.SelfDir())
	})

	// 傳回可信任 IP
	http.HandleFunc("/ips", func(w http.ResponseWriter, r *http.Request) {
		RenderDataJson(w, g.TrustableIps())
	})
}
           

擷取cpu資訊的API,基本直接調用了采集監控資料的函數

/*
	CPU API
 */
func configCpuRoutes() {
	// 擷取 cpu 個數
	http.HandleFunc("/proc/cpu/num", func(w http.ResponseWriter, r *http.Request) {
		RenderDataJson(w, runtime.NumCPU())
	})

	// 擷取 cpu 主頻
	http.HandleFunc("/proc/cpu/mhz", func(w http.ResponseWriter, r *http.Request) {
		data, err := nux.CpuMHz()
		AutoRender(w, data, err)
	})

	// 以字元串形式傳回 cpu 使用情況
	http.HandleFunc("/page/cpu/usage", func(w http.ResponseWriter, r *http.Request) {
		if !funcs.CpuPrepared() {
			RenderMsgJson(w, "not prepared")
			return
		}

		idle := funcs.CpuIdle()
		busy := 100.0 - idle

		item := [10]string{
			fmt.Sprintf("%.1f%%", idle),
			fmt.Sprintf("%.1f%%", busy),
			fmt.Sprintf("%.1f%%", funcs.CpuUser()),
			fmt.Sprintf("%.1f%%", funcs.CpuNice()),
			fmt.Sprintf("%.1f%%", funcs.CpuSystem()),
			fmt.Sprintf("%.1f%%", funcs.CpuIowait()),
			fmt.Sprintf("%.1f%%", funcs.CpuIrq()),
			fmt.Sprintf("%.1f%%", funcs.CpuSoftIrq()),
			fmt.Sprintf("%.1f%%", funcs.CpuSteal()),
			fmt.Sprintf("%.1f%%", funcs.CpuGuest()),
		}

		RenderDataJson(w, [][10]string{item})
	})
	
	// 以 map 形式傳回 cpu 使用情況
	http.HandleFunc("/proc/cpu/usage", func(w http.ResponseWriter, r *http.Request) {
		if !funcs.CpuPrepared() {
			RenderMsgJson(w, "not prepared")
			return
		}

		idle := funcs.CpuIdle()
		busy := 100.0 - idle

		RenderDataJson(w, map[string]interface{}{
			"idle":    idle,
			"busy":    busy,
			"user":    funcs.CpuUser(),
			"nice":    funcs.CpuNice(),
			"system":  funcs.CpuSystem(),
			"iowait":  funcs.CpuIowait(),
			"irq":     funcs.CpuIrq(),
			"softirq": funcs.CpuSoftIrq(),
			"steal":   funcs.CpuSteal(),
			"guest":   funcs.CpuGuest(),
		})
	})
}
           

磁盤的API和監控的函數簡直一模一樣的邏輯

/*
	磁盤 API
 */
func configDfRoutes() {
	// 擷取磁盤和檔案系統對象的使用情況
	http.HandleFunc("/page/df", func(w http.ResponseWriter, r *http.Request) {
		mountPoints, err := nux.ListMountPoint()
		if err != nil {
			RenderMsgJson(w, err.Error())
			return
		}

		var ret [][]interface{} = make([][]interface{}, 0)
		for idx := range mountPoints {
			var du *nux.DeviceUsage
			du, err = nux.BuildDeviceUsage(mountPoints[idx][0], mountPoints[idx][1], mountPoints[idx][2])
			if err == nil {
				ret = append(ret,
					[]interface{}{
						du.FsSpec,
						core.ReadableSize(float64(du.BlocksAll)),
						core.ReadableSize(float64(du.BlocksUsed)),
						core.ReadableSize(float64(du.BlocksFree)),
						fmt.Sprintf("%.1f%%", du.BlocksUsedPercent),
						du.FsFile,
						core.ReadableSize(float64(du.InodesAll)),
						core.ReadableSize(float64(du.InodesUsed)),
						core.ReadableSize(float64(du.InodesFree)),
						fmt.Sprintf("%.1f%%", du.InodesUsedPercent),
						du.FsVfstype,
					})
			}
		}

		RenderDataJson(w, ret)
	})
}
           

用戶端自身健康度檢測的API很簡單

/*
	用戶端健康情況 API
 */
func configHealthRoutes() {
	// 如果用戶端正常工作,傳回 ok
	http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte("ok"))
	})
	
	// 傳回用戶端版本
	http.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte(g.VERSION))
	})
}
           

IO狀态的API,講道理,這幾個函數真的是一個人寫的嗎,有的直接調funcs裡面的監控采集函數,有的還要寫一遍同樣的邏輯

/*
	IO 狀态 API
 */
func configIoStatRoutes() {
	// 傳回 IO 情況
	http.HandleFunc("/page/diskio", func(w http.ResponseWriter, r *http.Request) {
		RenderDataJson(w, funcs.IOStatsForPage())
	})
}
           

核心資訊的API

/*
	核心資訊 API
 */
func configKernelRoutes() {
	// 傳回 hostname,擷取順序是 agent 配置、系統環境變量 FALCON_ENDPOINT、從系統擷取
	http.HandleFunc("/proc/kernel/hostname", func(w http.ResponseWriter, r *http.Request) {
		data, err := g.Hostname()
		AutoRender(w, data, err)
	})

	// 傳回最大程序數
	http.HandleFunc("/proc/kernel/maxproc", func(w http.ResponseWriter, r *http.Request) {
		data, err := nux.KernelMaxProc()
		AutoRender(w, data, err)
	})

	// 傳回最大檔案數
	http.HandleFunc("/proc/kernel/maxfiles", func(w http.ResponseWriter, r *http.Request) {
		data, err := nux.KernelMaxFiles()
		AutoRender(w, data, err)
	})

	// 傳回核心版本
	http.HandleFunc("/proc/kernel/version", func(w http.ResponseWriter, r *http.Request) {
		data, err := sys.CmdOutNoLn("uname", "-r")  // 執行指令 uname -r 擷取核心版本
		AutoRender(w, data, err)
	})

}
           

記憶體API也沒啥好說的,為啥傳回的形式還要搞的這麼多

/*
	記憶體 API
 */
func configMemoryRoutes() {
	// 以數組形式傳回記憶體使用率資訊
	http.HandleFunc("/page/memory", func(w http.ResponseWriter, r *http.Request) {
		mem, err := nux.MemInfo()
		if err != nil {
			RenderMsgJson(w, err.Error())
			return
		}

		memFree := mem.MemFree + mem.Buffers + mem.Cached
		memUsed := mem.MemTotal - memFree
		var t uint64 = 1024 * 1024
		RenderDataJson(w, []interface{}{mem.MemTotal / t, memUsed / t, memFree / t})
	})

	// 以 map 形式傳回記憶體使用率資訊
	http.HandleFunc("/proc/memory", func(w http.ResponseWriter, r *http.Request) {
		mem, err := nux.MemInfo()
		if err != nil {
			RenderMsgJson(w, err.Error())
			return
		}

		memFree := mem.MemFree + mem.Buffers + mem.Cached
		memUsed := mem.MemTotal - memFree

		RenderDataJson(w, map[string]interface{}{
			"total": mem.MemTotal,
			"free":  memFree,
			"used":  memUsed,
		})
	})
}
           

這個靜态資源API其實目前隻支援一個頁面,就是用戶端IP:port那個頁面,當然你也可以自己在agent/public目錄下,建立目錄放其他的index.html,也可以擷取到

/*
	靜态資源擷取 API
 */
func configPageRoutes() {

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		if strings.HasSuffix(r.URL.Path, "/") {
			if !file.IsExist(filepath.Join(g.Root, "/public", r.URL.Path, "index.html")) {  // 通路 用戶端IP: port/ 即可擷取 index.html 
				http.NotFound(w, r)
				return
			}
		}
		http.FileServer(http.Dir(filepath.Join(g.Root, "/public"))).ServeHTTP(w, r)
	})

}
           
open-falcon源碼閱讀(二)——agent源碼閱讀1 概覽2 建立映射3 定時任務

plugin的API

/*
	plugin API
 */
func configPluginRoutes() {
	// 從 git 更新插件
	http.HandleFunc("/plugin/update", func(w http.ResponseWriter, r *http.Request) {
		if !g.Config().Plugin.Enabled {
			w.Write([]byte("plugin not enabled"))
			return
		}

		dir := g.Config().Plugin.Dir
		parentDir := file.Dir(dir)
		file.InsureDir(parentDir)  // 確定 plugin 父目錄存在,不存在建立

		if file.IsExist(dir) {  // 如果插件目錄已經存在,pull
			// git pull
			cmd := exec.Command("git", "pull")
			cmd.Dir = dir  // 在 plugin 目錄執行 pull
			err := cmd.Run()
			if err != nil {
				w.Write([]byte(fmt.Sprintf("git pull in dir:%s fail. error: %s", dir, err)))
				return
			}
		} else {  // 如果插件目錄不存在,clone
			// git clone
			cmd := exec.Command("git", "clone", g.Config().Plugin.Git, file.Basename(dir))
			cmd.Dir = parentDir  // 在父目錄執行 clone
			err := cmd.Run()
			if err != nil {
				w.Write([]byte(fmt.Sprintf("git clone in dir:%s fail. error: %s", parentDir, err)))
				return
			}
		}

		w.Write([]byte("success"))
	})

	// plugin 強制 reset
	http.HandleFunc("/plugin/reset", func(w http.ResponseWriter, r *http.Request) {
		if !g.Config().Plugin.Enabled {
			w.Write([]byte("plugin not enabled"))
			return
		}

		dir := g.Config().Plugin.Dir

		// git reset
		if file.IsExist(dir) {
			cmd := exec.Command("git", "reset", "--hard")
			cmd.Dir = dir
			err := cmd.Run()
			if err != nil {
				w.Write([]byte(fmt.Sprintf("git reset --hard in dir:%s fail. error: %s", dir, err)))
				return
			}
		}
		w.Write([]byte("success"))
	})

	// 檢視目前生效的 plugin
	http.HandleFunc("/plugins", func(w http.ResponseWriter, r *http.Request) {
		//TODO: not thread safe
		RenderDataJson(w, plugins.Plugins)
	})
}
           

push的API是最常用的,每次調用push他都會直接發給transfer,我記得美團對open-falcon的改進有一條就是增加了一個緩存,避免不同業務頻繁調用push接口的造成的發送性能下降、資料丢失等問題

/*
	push API
 */
func configPushRoutes() {
	http.HandleFunc("/v1/push", func(w http.ResponseWriter, req *http.Request) {
		if req.ContentLength == 0 {
			http.Error(w, "body is blank", http.StatusBadRequest)
			return
		}

		decoder := json.NewDecoder(req.Body) 
		var metrics []*model.MetricValue
		err := decoder.Decode(&metrics)  // push 的資訊轉為 metric
		if err != nil {
			http.Error(w, "connot decode body", http.StatusBadRequest)
			return
		}

		g.SendToTransfer(metrics)  // 發送給 transfer
		w.Write([]byte("success"))
	})
}
           

run的API可以在機器上執行一些指令,這個非常危險,是以配置的backdoor這項要求打開,請求的IP位址也要是可信任IP才行

/*
	執行指令 API
 */
func configRunRoutes() {
	http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
		if !g.Config().Http.Backdoor {  // 如果不允許 backdoor,報錯傳回
			w.Write([]byte("/run disabled"))
			return
		}

		if g.IsTrustable(r.RemoteAddr) {  // 檢測是否是可信任 IP
			if r.ContentLength == 0 {
				http.Error(w, "body is blank", http.StatusBadRequest)
				return
			}

			bs, err := ioutil.ReadAll(r.Body)
			if err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}

			body := string(bs)
			out, err := sys.CmdOutBytes("sh", "-c", body)  // 執行指令
			if err != nil {
				w.Write([]byte("exec fail: " + err.Error()))
				return
			}

			w.Write(out)  // 傳回指令輸出
		} else {
			w.Write([]byte("no privilege"))
		}
	})
}
           

系統資訊API,可以查詢系統時間、系統啟動時間、cpu負載

/*
	系統資訊 API
 */
func configSystemRoutes() {

	// 擷取目前時間
	http.HandleFunc("/system/date", func(w http.ResponseWriter, req *http.Request) {
		RenderDataJson(w, time.Now().Format("2006-01-02 15:04:05"))
	})

	// 以字元串形式傳回系統啟動時間
	http.HandleFunc("/page/system/uptime", func(w http.ResponseWriter, req *http.Request) {
		days, hours, mins, err := nux.SystemUptime()
		AutoRender(w, fmt.Sprintf("%d days %d hours %d minutes", days, hours, mins), err)
	})

	// 以 map 形式傳回系統啟動時間
	http.HandleFunc("/proc/system/uptime", func(w http.ResponseWriter, req *http.Request) {
		days, hours, mins, err := nux.SystemUptime()
		if err != nil {
			RenderMsgJson(w, err.Error())
			return
		}

		RenderDataJson(w, map[string]interface{}{
			"days":  days,
			"hours": hours,
			"mins":  mins,
		})
	})
	
	// 以數組形式傳回 cpu load
	http.HandleFunc("/page/system/loadavg", func(w http.ResponseWriter, req *http.Request) {
		cpuNum := runtime.NumCPU()
		load, err := nux.LoadAvg()
		if err != nil {
			RenderMsgJson(w, err.Error())
			return
		}

		ret := [3][2]interface{}{
			{load.Avg1min, int64(load.Avg1min * 100.0 / float64(cpuNum))},
			{load.Avg5min, int64(load.Avg5min * 100.0 / float64(cpuNum))},
			{load.Avg15min, int64(load.Avg15min * 100.0 / float64(cpuNum))},
		}
		RenderDataJson(w, ret)
	})

	// 以對象形式傳回 cpu load
	http.HandleFunc("/proc/system/loadavg", func(w http.ResponseWriter, req *http.Request) {
		data, err := nux.LoadAvg()
		AutoRender(w, data, err)
	})

}