二、測試方案:
1) 單機方案+多資料中心:
測試子產品 測試所在環節 測試對象 依賴 測試方法 預期結果 備注
資料采集展現 Agent上報-正常 Cmdb-agent 無 制造正常模拟資料上報 上報到redis
Agent上報-異常 Cmdb-agent 無 制造異常情況,檢視是否上報以及是否異常
Proxy-轉發 Proxy Cmdb-agent 1.檢視是否選擇一緻的redis
2. 查詢是否正常
Redis寫入 Redis Agent、proxy 1. 節點寫入是否一緻
2. 批量并發
指令下發 用戶端異常
正常指令
危險指令
采集器更新
采集器解除安裝
KV 存儲 擴容-自動發現
故障
2) 分布式叢集方案:
三、More
1)異常告警
利用redis的訂閱和釋出來實作實時監控的一個DEMO(Python版本) - 堅固66 - 部落格園 (cnblogs.com)
go實作的一個監控日志報警程式_嘿,逮到一隻大碼農-CSDN部落格_go 監控程式
go實作程序監控并啟動【附源碼】_luoguo_51CTO部落格
Python監控程序狀态并實作告警_russellye_51CTO部落格
用Python實作自動化監控遠端伺服器_代碼幫-CSDN部落格_python 監控伺服器
Python自動化擷取Linux服務狀态與端口_部落格網友陳浩的部落格-CSDN部落格
2)名額完善
3)限流熔斷-(cmdb-agent)
4) 可配置
- 指令可配置
- 采集時長可配置
- 告警位址可配置
具體測試點
前置條件:
- 測試資料10萬+(帶redis-proxy節點資訊)
- redis-cli用戶端3個節點
- harbor鏡像&對應網絡權限
case 1-多節點上報
golang 模拟并發-資料庫中随機選擇資料進行上報
cpu監測
case 2-proxy轉發
golang-編寫腳本模拟proxy轉發,校驗資料是否寫入redis正常
随機請求cmdb查詢資料,檢驗是否正常
case3-正常指令下發
redis中寫入是否正常
是否正确的agent節點
是否執行指令
case4-高危指令下發
redis中是否寫入
是否命中正常的agent節點
是否拒絕指令
case5-正常指令下發,agent異常
是否進行自愈和自更新
是否有熔斷節流
case6-模拟用戶端更新(至少雙節點)
更新後是否可以正常上報資料
agent版本是否有變更
case7-模拟用戶端自解除安裝
解除安裝後是否資料清除(redis && cmdb)
case8-redis故障(單機)
是否進行了轉移
是否有監控預警
測試資料生成腳本
from faker import Faker
import unittest
from faker.providers import BaseProvider
import faker_microservice
import pymysql
# 自定義項目名(項目名格式x-y)
class MyProvider(BaseProvider):
def app_name(self):
return f"{fake.pystr(6)}-{fake.pystr(5)}"
fake = Faker()
# then add new provider to faker instance
fake.add_provider(MyProvider)
fake.add_provider(faker_microservice.Provider)
class Create_data(object):
def make_sigle_data(self):
"""
cpu,app,pid,path,port,name(主機名),ip
:return:
"""
app_name = fake.microservice()
data_dict = {
"name": fake.pystr(),
"ip": fake.ipv4(),
"cpu": fake.random_int(min=1,max=32),
"app": app_name,
"pid": fake.random_int(max=1000),
"port": fake.random_int(min=1,max=65535),
"path": f"/web/apps/{app_name}",
}
return data_dict
def get_dataN(self, n):
data = [self.make_sigle_data() for i in range(n)]
return data
def convert_sql(self, table_name, data_dict=dict()):
field_columns = ",".join('`{}`'.format(k) for k in data_dict.keys())
val_columns = ",".join('%({})s'.format(k) for k in data_dict.keys())
coverted_sql = f"insert into {table_name}({field_columns}) values({val_columns})"
return coverted_sql
def write_batch(self, db_dict, n):
db = pymysql.connect(**db_dict)
cursor = db.cursor()
data_lst = self.get_dataN(n)
# 每次2000條寫入:
if n<= 2000:
cursor.executemany(self.convert_sql(data_lst[0]), data_lst)
else:
tmp = n/2000
for i in range(0, tmp):
start = i*2000
end = (i+1)*2000 if (i+1)*2000<n else n
cursor.executemany(self.convert_sql(data_lst[0]), data_lst[start:end])
db.commit()
cursor.close()
db.close()
class Test_faker(unittest.TestCase):
def test_make(self):
self.assertTrue(Create_data().make_sigle_data())
if __name__ == '__main__':
# unittest.main
db_config = {"host": "127.0.0.1", "port": 3306, "user": "root", "password": "case123","database":"test"}
Create_data().write_batch(db_config, 10000)