項目位址
代碼位址 https://github.com/ning1875/falcon-plus/tree/master/modules/agent
前言
在我們日常運維/運維開發工作中各種系統主要分為兩大流派
本文主要讨論下有agent側一些注意事項
用戶端服務端的C/S架構
優點
- c/s架構相比于基于ssh的并發和吞吐量要高的多
-
利用agent可做的事情很多以及更精準的控制
缺點
- 功能更新需要更新agent
-
agent如果保活是個頭疼的問題
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-Qzot3oVq-1610097723633)(/img/bVbKj0z)]
- 機器上agent過多如何管理又是個問題
agentless架構
特點無侵入性agent:典型應用就是基于ssh ansible
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-VdfGeqWA-1610097723636)(/img/bVbKil0)]
優點
- 無agent不需要關心保活和覆寫問題
-
功能更新主要在server端實作
缺點
- 基于ssh的功能/性能較差
經典client案例
配置管理/批量操作
-
Saltstack Minion
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-XAeGOnjp-1610097723638)(/img/bVbKile)]
-
Puppet Agent
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-vM21D68d-1610097723641)(/img/bVbKikY)]
監控
-
prometheus 各種各樣的exporter: node_exporter
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-lowhD1lc-1610097723643)(/img/bVbKilr)]
-
open-falcon falcon-agent
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-1NzKiK0i-1610097723645)(/img/bVbKils)]
-
Zabbix Agent
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-DxKWzVyk-1610097723647)(/img/bVbKilC)]
C/S架構中agent側注意事項
agent資源消耗
代碼應當簡潔,避免過多資源消耗
agent資源監控可以使用prometheus的 client_golang ,預設會export 程序的cpu_use 、fd、mem等資訊幫助我們定位資源消耗
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 38913.32
# HELP process_max_fds Maximum number of open file descriptors.
# TYPE process_max_fds gauge
process_max_fds 6.815744e+06
# HELP process_open_fds Number of open file descriptors.
# TYPE process_open_fds gauge
process_open_fds 15
# HELP process_resident_memory_bytes Resident memory size in bytes.
# TYPE process_resident_memory_bytes gauge
process_resident_memory_bytes 1.4659584e+07
# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1.59350253732e+09
# HELP process_virtual_memory_bytes Virtual memory size in bytes.
# TYPE process_virtual_memory_bytes gauge
process_virtual_memory_bytes 1.201352704e+09
# HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes.
# TYPE process_virtual_memory_max_bytes gauge
process_virtual_memory_max_bytes -1
agent如何更新管理
舉例:現在要更新agent版本 from v1.0 to v1.1
思路一:使用管理工具管理
如ansible-playbook 可以參考我之前的文章 使用ansible-playbook實作dnsdist快速劫持工具
我們可以使用下面python代碼将跑playbook封裝成一個方法,使用的時候隻需要傳入
ip清單,yaml,和額外的變量dict
即可
咳咳:這個方案典型問題就是受限于單個ansible性能問題(很多小夥伴都被折磨過吧),當然可以将大量的ip清單分片分發給多個ansible-server執行,再将結果merge一下
t = PlaybookApi([ip], yaml_path, {"conf_dir": conf_dir, "bk_file_name": bk_file_name})
t.run()
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.utils.vars import load_extra_vars
from ansible.utils.vars import load_options_vars
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.plugins.callback import CallbackBase
class ResultsCollector(CallbackBase):
def __init__(self, *args, **kwargs):
super(ResultsCollector, self).__init__(*args, **kwargs)
self.host_ok = {}
self.host_unreachable = {}
self.host_failed = {}
def v2_runner_on_unreachable(self, result):
self.host_unreachable[result._host.get_name()] = result
def v2_runner_on_ok(self, result, *args, **kwargs):
self.host_ok[result._host.get_name()] = result
def v2_runner_on_failed(self, result, *args, **kwargs):
self.host_failed[result._host.get_name()] = result
# class PlaybookApi(PlaybookExecutor):
class PlaybookApi(PlaybookExecutor):
def __init__(self, host_list, yaml_path, extra_vars):
self.host_list = host_list
self.yaml_path = yaml_path
# self.kcache_path = kcache_path
self.callback = ResultsCollector()
self.extra_vars = extra_vars
self.IpmiPlay()
super(PlaybookApi, self).__init__(playbooks=[self.yaml_path], inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader, options=self.options, passwords={})
self._tqm._stdout_callback = self.callback
def IpmiPlay(self):
Options = namedtuple('Options',
['listtags', 'listtasks', 'listhosts', 'syntax', 'connection', 'module_path', 'forks',
'remote_user', 'private_key_file', 'ssh_common_args', 'ssh_extra_args',
'sftp_extra_args', 'scp_extra_args', 'become',
'become_method',
'become_user',
'verbosity', 'check', 'extra_vars'])
self.options = Options(listtags=False, listtasks=False, listhosts=False, syntax=False, connection='ssh',
module_path=None,
forks=10, remote_user='',
private_key_file=None,
ssh_common_args='',
ssh_extra_args='',
sftp_extra_args='',
scp_extra_args='',
become=True,
become_method='sudo',
become_user='root',
verbosity=3,
check=False,
extra_vars={})
self.loader = DataLoader()
# create the variable manager, which will be shared throughout
# the code, ensuring a consistent view of global variables
variable_manager = VariableManager()
variable_manager.extra_vars = load_extra_vars(loader=self.loader, options=self.options)
variable_manager.options_vars = load_options_vars(self.options)
self.variable_manager = variable_manager
# create the inventory, and filter it based on the subset specified (if any)
self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=self.host_list)
self.variable_manager.set_inventory(self.inventory)
self.variable_manager.extra_vars = self.extra_vars
def get_result(self):
# print("calling in get_result")
self.results_raw = {'success': {}, 'failed': {}, "unreachable": {}}
for host, result in self.callback.host_ok.items():
self.results_raw['success'][host] = result
for host, result in self.callback.host_failed.items():
self.results_raw['failed'][host] = result
for host, result in self.callback.host_unreachable.items():
self.results_raw['unreachable'][host] = result._result['msg']
return self.results_raw
if __name__ == '__main__':
h = ["127.0.0.1"]
yaml = "systemd_stop.yaml"
api = PlaybookApi(h, yaml, {"app": "falcon-judge"})
api.run()
res = api.get_result()
for k, v in res.items():
for kk, vv in v.items():
print(kk, vv._result)
思路二: 代碼中實作自更新
以falcon-agent代碼為例,代碼位址 https://github.com/ning1875/falcon-plus/tree/master/modules/agent 整體實作流程:
ps:原諒我那蜘蛛爬的字吧
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-IbMfRU1L-1610097723649)(/img/bVbKj3J)]
實作分析
- 機器上一般會選用一種daemontools作為服務托管工具:如 supervisor和systemd,而systemd更普遍些
- 在機器上跑的服務最小可由三個檔案組成: 一個二進制可執行檔案、一個配置檔案、一個service檔案
- 是以服務的自更新就是這三個檔案的更新
- 檔案更新完後如何重新開機服務呢:以systemd為例隻需要發送term信号給自身程序即可,即kill 程序pid
pid := os.Getpid() thisPro, _ := os.FindProcess(pid) thisPro.Signal(os.Kill)
- agent如何管理版本: 在const中指定
// changelog: // 3.1.3: code refactor // 3.1.4: bugfix ignore configuration // 5.0.0: 支援通過配置控制是否開啟/run接口;收集udp流量資料;du某個目錄的大小 // 5.1.0: 同步插件的時候不再使用checksum機制 // 5.1.1: 修複往多個transfer發送資料的時候crash的問題 // 5.1.2: ignore mount point when blocks=0 // 6.0.0: agent自更新,新增一些監控項 // 6.0.1: agent collect level // 6.0.2: 添加單核監控開關預設不打開,單核監控tag變更為core=core0x ,添加mem.available.percent // 6.0.3: 增加sys.uptime // 6.0.4: 修複cpu.iowait>100的bug // 6.0.5: 添加程序采集監控,間隔30s // 6.0.6: 調整内置的采集func間隔 disk io相關和tcp 10s-->30s,agent_version 整數代表目前版本,去掉動态監控方法 // 6.0.7: ntp 支援chronyc ,服務監控rpc call 間隔調整為一分鐘 // 6.0.8: 修改監控項抓取時間間隔, 10s隻保留cpu,解決斷點問題 // 6.0.9: 修複dfa dfb塊裝置采集,修複不同版本ss-s的bug // 6.1.0: 修複機器上主機名被改case,使ip轉化為nxx-xx-xx的形式 const ( VERSION = "6.1.0" COLLECT_INTERVAL = time.Second URL_CHECK_HEALTH = "url.check.health" NET_PORT_LISTEN = "net.port.listen" DU_BS = "du.bs" PROC_NUM = "proc.num" UPTIME = "sys.uptime" )
- 服務端如何開啟更新開關:在hbs http接口開啟
- 用戶端如何check要不要更新:隻需check版本是否一緻和是否在更新過程中
- 服務端如何做到并發控制: 隻需檢查redis更新隊列在更新的數量和預設的門檻值對比
- 管理者如何發起更新:隻需要給hbs發起http請求打開更新開關
curl -X POST http://127.0.0.1:6031/agent/upgrade -d '{"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.1","binfile_md5":"35ac8534c0b31237e844ef8ee2bb9b9e"}'
- 管理者
-
如何提高更新并發:其實就是如何讓下載下傳并發更高,即采用大帶寬nginx或cdn或者給不同批次agent發送不同下載下傳cdn位址
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-1REatGsU-1610097723650)(/img/bVbKj90)]
缺點
- 整體實作還比較粗糙沒有覆寫灰階和復原(隻能再發上一個版本)
- 過程中沒有直覺的更新進度展示(隻能通過hbs接口擷取agent版本做求和)
過程說明:
http-req --->hbs --->開啟更新開關--->檢查agent心跳資訊中版本号,并檢查目前hbs更新隊列--->發送更新指令給agent ---> agent通過 更新指令中的url位址和目标版本号下載下傳新的二進制(會有備份和復原邏輯)--->agent check沒有問題後擷取自身的pid向自己發送kill信号 --->agent退出然後會被systemd拉起打到更新的目的--->新的心跳資訊中版本checkok不會繼續更新
更新舉例說明:
1. falcon-agent新加了采集名額,測試OK後在代碼中打上新的版本号比如6.0.0(現有是6.0.1)
2. 然後将新版 放到下載下傳伺服器的路徑下 wget http://${your_cdn_addr}/file/open-falcon/bin_6.0.1
3. 然後向hbs 發送更新的http請求(這裡有個保護機制:隻能在hbs本機發起)
4. 然後通過hbs 的http接口查詢目前心跳上來的agent的版本檢視更新進度 ,curl -s http://localhost:6031/agentversions |python -m "json.tool"
5. 同時需要連接配接的redis叢集觀察 agent_upgrade_set 這個set的值,redis-cli -h ip -p port -c smembers agent_upgrade_set & scard agent_upgrade_set
6. 目前看并發2000可以把一台下載下傳的nginx萬兆網卡流量打滿。1.24GB/s
## falcon-agent 自更新指令
curl -X POST http://127.0.0.1:6031/agent/upgrade -d '{"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.1","binfile_md5":"35ac8534c0b31237e844ef8ee2bb9b9e"}'
curl -X GET http://127.0.0.1:6031/agent/upgrade/nowargs
{"msg":"success","data":{"type":0,"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.1","binfile_md5":"35ac8534c0b31237e844ef8ee2bb9b9e","cfgfile_md5":""}}
curl http://127.0.0.1:6031/agentversions
{"msg":"success","data":{"n3-021-225":"6.0.1"}}
curl -X DELETE http://127.0.0.1:6031/agent/upgrade
{"msg":"success","data":"取消更新成功"}
uri:
url: http://127.0.0.1:6031/agent/upgrade
method: POST
body: {"wgeturl":"http://${your_cdn_addr}/file/open-falcon","version":"6.0.2","binfile_md5":"f5c597f15e379a77d1e2ceeec7bd99a8"}
status_code: 200
body_format: json