天天看点

测试方案-agent具体测试点

二、测试方案:

1) 单机方案+多数据中心:

测试模块 测试所在环节 测试对象 依赖 测试方法 预期结果 备注

数据采集展现    Agent上报-正常    Cmdb-agent    无    制造正常模拟数据上报    上报到redis    
    Agent上报-异常    Cmdb-agent    无    制造异常情况,查看是否上报以及是否异常        
    Proxy-转发    Proxy    Cmdb-agent    1.查看是否选择一致的redis
2. 查询是否正常        
    Redis写入     Redis    Agent、proxy    1.    节点写入是否一致
2.    批量并发        


命令下发    客户端异常                    
    正常命令                    
    危险命令                    
    采集器升级                    
    采集器卸载                    
 KV 存储    扩容-自动发现                    
    故障                   

2) 分布式集群方案:

三、More

1)异常告警

利用redis的订阅和发布来实现实时监控的一个DEMO(Python版本) - 坚固66 - 博客园 (cnblogs.com)

go实现的一个监控日志报警程序_嘿,逮到一只大码农-CSDN博客_go 监控程序
 go实现进程监控并启动【附源码】_luoguo_51CTO博客
 Python监控进程状态并实现告警_russellye_51CTO博客
 用Python实现自动化监控远程服务器_代码帮-CSDN博客_python 监控服务器
 Python自动化获取Linux服务状态与端口_博客网友陈浩的博客-CSDN博客
           

2)指标完善

3)限流熔断-(cmdb-agent)

4) 可配置

  1. 命令可配置
  2. 采集时长可配置
  3. 告警地址可配置

具体测试点

前置条件:

  1. 测试数据10万+(带redis-proxy节点信息)
  2. redis-cli客户端3个节点
  3. harbor镜像&对应网络权限

case 1-多节点上报

golang 模拟并发-数据库中随机选择数据进行上报

cpu监测

case 2-proxy转发

golang-编写脚本模拟proxy转发,校验数据是否写入redis正常

随机请求cmdb查询数据,检验是否正常

case3-正常命令下发

redis中写入是否正常

是否正确的agent节点

是否执行命令

case4-高危命令下发

redis中是否写入

是否命中正常的agent节点

是否拒绝命令

case5-正常命令下发,agent异常

是否进行自愈和自升级

是否有熔断节流

case6-模拟客户端升级(至少双节点)

升级后是否可以正常上报数据

agent版本是否有变更

case7-模拟客户端自卸载

卸载后是否数据清除(redis && cmdb)

case8-redis故障(单机)

是否进行了转移

是否有监控预警

测试数据生成脚本

from faker import Faker
import unittest
from faker.providers import BaseProvider
import faker_microservice
import pymysql

# 自定义项目名(项目名格式x-y)
class MyProvider(BaseProvider):
    def app_name(self):
        return f"{fake.pystr(6)}-{fake.pystr(5)}"


fake = Faker()
# then add new provider to faker instance
fake.add_provider(MyProvider)
fake.add_provider(faker_microservice.Provider)


class Create_data(object):

    def make_sigle_data(self):
        """
        cpu,app,pid,path,port,name(主机名),ip
        :return:
        """
        app_name = fake.microservice()
        data_dict = {
            "name": fake.pystr(),
            "ip": fake.ipv4(),
            "cpu": fake.random_int(min=1,max=32),
            "app": app_name,
            "pid": fake.random_int(max=1000),
            "port": fake.random_int(min=1,max=65535),
            "path": f"/web/apps/{app_name}",

        }
        return data_dict

    def get_dataN(self, n):
        data = [self.make_sigle_data() for i in range(n)]
        return data

    def convert_sql(self, table_name, data_dict=dict()):
        field_columns = ",".join('`{}`'.format(k) for k in data_dict.keys())
        val_columns = ",".join('%({})s'.format(k) for k in data_dict.keys())
        coverted_sql = f"insert into {table_name}({field_columns}) values({val_columns})"
        return coverted_sql

    def write_batch(self, db_dict, n):
        db = pymysql.connect(**db_dict)
        cursor = db.cursor()
        data_lst = self.get_dataN(n)
        # 每次2000条写入:
        if n<= 2000:
            cursor.executemany(self.convert_sql(data_lst[0]), data_lst)
        else:
            tmp = n/2000
            for i in range(0, tmp):
                start = i*2000
                end = (i+1)*2000 if (i+1)*2000<n else n
                cursor.executemany(self.convert_sql(data_lst[0]), data_lst[start:end])
        db.commit()
        cursor.close()
        db.close()


class Test_faker(unittest.TestCase):
    def test_make(self):
        self.assertTrue(Create_data().make_sigle_data())


if __name__ == '__main__':
    # unittest.main
    db_config = {"host": "127.0.0.1", "port": 3306, "user": "root", "password": "case123","database":"test"}
    Create_data().write_batch(db_config, 10000)