天天看點

測試方案-agent具體測試點

二、測試方案:

1) 單機方案+多資料中心:

測試子產品 測試所在環節 測試對象 依賴 測試方法 預期結果 備注

資料采集展現    Agent上報-正常    Cmdb-agent    無    制造正常模拟資料上報    上報到redis    
    Agent上報-異常    Cmdb-agent    無    制造異常情況,檢視是否上報以及是否異常        
    Proxy-轉發    Proxy    Cmdb-agent    1.檢視是否選擇一緻的redis
2. 查詢是否正常        
    Redis寫入     Redis    Agent、proxy    1.    節點寫入是否一緻
2.    批量并發        


指令下發    用戶端異常                    
    正常指令                    
    危險指令                    
    采集器更新                    
    采集器解除安裝                    
 KV 存儲    擴容-自動發現                    
    故障                   

2) 分布式叢集方案:

三、More

1)異常告警

利用redis的訂閱和釋出來實作實時監控的一個DEMO(Python版本) - 堅固66 - 部落格園 (cnblogs.com)

go實作的一個監控日志報警程式_嘿,逮到一隻大碼農-CSDN部落格_go 監控程式
 go實作程序監控并啟動【附源碼】_luoguo_51CTO部落格
 Python監控程序狀态并實作告警_russellye_51CTO部落格
 用Python實作自動化監控遠端伺服器_代碼幫-CSDN部落格_python 監控伺服器
 Python自動化擷取Linux服務狀态與端口_部落格網友陳浩的部落格-CSDN部落格
           

2)名額完善

3)限流熔斷-(cmdb-agent)

4) 可配置

  1. 指令可配置
  2. 采集時長可配置
  3. 告警位址可配置

具體測試點

前置條件:

  1. 測試資料10萬+(帶redis-proxy節點資訊)
  2. redis-cli用戶端3個節點
  3. harbor鏡像&對應網絡權限

case 1-多節點上報

golang 模拟并發-資料庫中随機選擇資料進行上報

cpu監測

case 2-proxy轉發

golang-編寫腳本模拟proxy轉發,校驗資料是否寫入redis正常

随機請求cmdb查詢資料,檢驗是否正常

case3-正常指令下發

redis中寫入是否正常

是否正确的agent節點

是否執行指令

case4-高危指令下發

redis中是否寫入

是否命中正常的agent節點

是否拒絕指令

case5-正常指令下發,agent異常

是否進行自愈和自更新

是否有熔斷節流

case6-模拟用戶端更新(至少雙節點)

更新後是否可以正常上報資料

agent版本是否有變更

case7-模拟用戶端自解除安裝

解除安裝後是否資料清除(redis && cmdb)

case8-redis故障(單機)

是否進行了轉移

是否有監控預警

測試資料生成腳本

from faker import Faker
import unittest
from faker.providers import BaseProvider
import faker_microservice
import pymysql

# 自定義項目名(項目名格式x-y)
class MyProvider(BaseProvider):
    def app_name(self):
        return f"{fake.pystr(6)}-{fake.pystr(5)}"


fake = Faker()
# then add new provider to faker instance
fake.add_provider(MyProvider)
fake.add_provider(faker_microservice.Provider)


class Create_data(object):

    def make_sigle_data(self):
        """
        cpu,app,pid,path,port,name(主機名),ip
        :return:
        """
        app_name = fake.microservice()
        data_dict = {
            "name": fake.pystr(),
            "ip": fake.ipv4(),
            "cpu": fake.random_int(min=1,max=32),
            "app": app_name,
            "pid": fake.random_int(max=1000),
            "port": fake.random_int(min=1,max=65535),
            "path": f"/web/apps/{app_name}",

        }
        return data_dict

    def get_dataN(self, n):
        data = [self.make_sigle_data() for i in range(n)]
        return data

    def convert_sql(self, table_name, data_dict=dict()):
        field_columns = ",".join('`{}`'.format(k) for k in data_dict.keys())
        val_columns = ",".join('%({})s'.format(k) for k in data_dict.keys())
        coverted_sql = f"insert into {table_name}({field_columns}) values({val_columns})"
        return coverted_sql

    def write_batch(self, db_dict, n):
        db = pymysql.connect(**db_dict)
        cursor = db.cursor()
        data_lst = self.get_dataN(n)
        # 每次2000條寫入:
        if n<= 2000:
            cursor.executemany(self.convert_sql(data_lst[0]), data_lst)
        else:
            tmp = n/2000
            for i in range(0, tmp):
                start = i*2000
                end = (i+1)*2000 if (i+1)*2000<n else n
                cursor.executemany(self.convert_sql(data_lst[0]), data_lst[start:end])
        db.commit()
        cursor.close()
        db.close()


class Test_faker(unittest.TestCase):
    def test_make(self):
        self.assertTrue(Create_data().make_sigle_data())


if __name__ == '__main__':
    # unittest.main
    db_config = {"host": "127.0.0.1", "port": 3306, "user": "root", "password": "case123","database":"test"}
    Create_data().write_batch(db_config, 10000)