天天看點

zabbix 線路品質監控自定義python子產品(Mysql版),多線程(後來發現使用協程更好)降低系統消耗

之前零零碎碎寫了一些zabbix 線路監控的腳本,工作中agnet較多,每條線路監控需求不一緻,比較雜亂,現在整理成一個py子產品,集合之前的所有功能

環境

  python3.6以上版本,pip3(pip 9.0.1以上版本),mysql,pymysql庫

  使用zabbix自定義腳本擷取線路時延丢包率不做介紹,參考上一篇zabbix文章

如果系統目前python版本是python3.5,更新3.6時有兩個注意事項

  1 先更新python至3.6再更新pip3否則會導緻pip3無法正常使用

  2 python3.5更新到3.6後需要把

lsb_release.py

 檔案複制到python3.6的lib裡,否則pip3無法正常使用

  3 上兩步完成後再進行pip3更新

darkping封包件如下

  -----bin----程式入口,接收參數調用views

    -----views-----邏輯函數,計算并傳回,清除資料庫曆史資料

      -----mtr.sh----shell腳本,供views調用

      -----start-sql----根據ipinfo表變化動态建立線程,資料寫入zabbixvalue表

        ------tcping----使用socket計算tcp時延丢包工具

  -----log----日志檔案

  -----models-----資料庫相關

  -----settings----配置檔案,sql語句,檔案路徑,重要參數等

 邏輯

  zabbix前端添加item

  bin接收zabbix item傳過來的參數,格式化後調用view.dark_zabbix() ,函數把ipinfo資訊寫入資料庫,檢查start-sql腳本是否執行,如未執行就觸發反之從zabbixvalue表中擷取item所需要的參數,經過計算後傳回,并進行判斷,如果時延相對于上次探測結果增大一定門檻值或丢包超過設定門檻值就調用mtr腳本并儲存至日志

  start-sql輪詢ipinfo表中資料動态建立線程,調用測試指令把資料寫入zabbixvalue表中

   zabbix前端删除item

  views函數會檢查ipinfo中30分鐘未更新資料,并進行删除

  start-sql根據ipinfo表變化重新建立線程

---------------------------------------------- 2021.6.9号更新---------------------------------------------------------------------------------------

資料量大了後發現使用多線程,線程間來回搶占cpu導緻cpu消耗增大,後來改為協程,cpu消耗對比原來降低一半以上

-----------------------------------------------------------------------------------------------------------------------------------------------------------

bin代碼

#!/usr/bin/env python3
#-*-coding:utf-8-*-
#----------------------------------------------------------zabbixping腳本----------------------------------------------------
import argparse
from views import dark_zabbix


if __name__  == "__main__":
    parser = argparse.ArgumentParser(description='icmp for monitor')
    parser.add_argument('-t',action = 'store',dest='tip')
    parser.add_argument('-i',action='store',dest='interval',default='1')
    parser.add_argument('-I',action='store',dest='item')
    parser.add_argument('-p',action='store',dest='port',default='0')
    parser.add_argument('-T',action = 'store',dest='type',default='icmp')
    args= parser.parse_args()
    ip = args.tip
    i = float(args.interval)
    item = args.item
    port = int(args.port)
    t_ype = args.type
    print(dark_zabbix(ip,item,i,port,t_ype))      

views代碼

#!/usr/bin/env python3
#-*-coding:utf-8-*-
import log,models,time,subprocess,re
from models import db
from settings import dbinfo

# 資料庫寫入需要監控的ip參數
def insertdb(ip,i,port,t_ype):
    dbvalues = dbinfo()
    my_sql = db(dbvalues.dbinfo())
    ctime=int(time.time())
    check_sql = dbvalues.sql_sqlstatues(ip,t_ype,port,'ipinfo')
    sql = dbvalues.sql_inserttoipinfo(ip,i,port,t_ype,ctime)
    sql_update = dbvalues.sql_updatetoipinfo(ip,port,t_ype,ctime)
    check = my_sql.db_readone(check_sql)
    try:
        ip,c_time = check['ipaddress'],check['time']
        my_sql.db_write(sql_update)
        
    except:
        my_sql.db_write(sql)
        my_sql.db_close()
#删除30分鐘沒有Item調用的monitor ipinfo
def clear():
    ctime = int(time.time()) - 1800
    dbvalues = dbinfo()
    my_sql = db(dbvalues.dbinfo())
    sql = dbvalues.sql_clearipinfo(ctime)
    my_sql.db_write(sql)
    my_sql.db_close()


def dark_zabbix(ip,item,i,port,t_ype):
    insertdb(ip,i,port,t_ype)
    clear()
    #初始化
    res_ret = 0
    pkloss_ret = 0
    #根據頻率計算所select的資料數量
    packet_count= int(20/i)
    #執行個體化dbinfo
    dbvalues = dbinfo()
    #logger對象
    logger = log.logger()
    #擷取調用pingsql指令
    # cmd = dbvalues.cmd(ip,i,port,t_ype)
    #擷取sql語句
    sql_getvalue = dbvalues.sql_getvalue(ip,packet_count,t_ype,port)
    sql_mtr = dbvalues.sql_mtr(ip,packet_count,t_ype,port)
    
    #執行個體化mysql對象,調用readone/all方法
    my_sql = db(dbvalues.dbinfo())

    #判斷start-sql是否在運作,如果沒有就執行,初始化或者程式異常重新執行
    sta = subprocess.Popen('ps aux | grep start-sql.py',shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()
    flag = re.findall('python3',sta[0].decode('utf8'))
    if  not flag:
        cmd = dbvalues.cmd()
        subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        logger.debug(cmd)

    ret = my_sql.db_readall(sql_getvalue)
    if len(ret) <packet_count:
        return(res_ret)
    else:
        for x in ret:
            res_ret+=x['res']
            pkloss_ret+=x['pkloss']
    #計算時延和丢包率
    try:
        restime = (round(float(res_ret/(packet_count-pkloss_ret)),2))
    except:
        restime=0
    pkloss = (round(float(pkloss_ret/packet_count*100),2))
    #計算本次與上次時延內插補點與本次丢包率,決定是否調用mtr
    try:
        history_restime = round(float(my_sql.db_readone(sql_mtr)['avg(a.res)']),2)
        if restime - history_restime > 20 or 100> pkloss >20:
            mtr = dbvalues.mtr(ip)
            subprocess.Popen(mtr,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    except Exception as a:
        logger.info(a)
       
    if item =='restime':
        my_sql.db_close()
        return  restime
    if item == 'pkloss':
        my_sql.db_close()
        return  pkloss      

start-sql代碼

#!/usr/bin/env python3
#-*-coding:utf-8-*-
import subprocess,re,time,pymysql,argparse,threading
from settings import dbinfo
from models import db
import log
import gevent
from gevent import monkey





logger = log.logger()
def ping(cmd):
    pkloss = 0
    ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
    try:
        ret =re.findall('\d+\.?\d*' ,(re.findall('time=\d+\.?\d*',ret)[0]))[0]
        return(ret,pkloss)
    except Exception as a :
        ret = 0
        pkloss = 1
        return(ret,pkloss)
def value(i,ipaddress,port,t_ype):
    #設定線程循環辨別
    flag = True
    clas = dbinfo()
    path = clas.base_dir()
    if t_ype == 'icmp':
        cmd = 'ping  -c 1 -W 1 %s'%ipaddress
    elif t_ype =='tcp':
        cmd = path + '/tcping.py %s %s 1'%(ipaddress,port)
    elif t_ype =='udp':
        cmd = path+ '/udpping.py %s %s 1'%(ipaddress,port)
    i = float(i)
    
    logger.debug('初始化%s'%ipaddress)
    dbvalues = dbinfo()
    mysql = db(dbvalues.dbinfo())
    #擷取初始ipinfo表資料行數
    count = mysql.db_readone('select count(nid) from ipinfo;')['count(nid)']
    while flag:
        new_count =  mysql.db_readone('select count(nid) from ipinfo;')['count(nid)']
        #如果ipinfo表有新增或删除就結束循環,重新建立新線程
        if new_count != count:
            flag=False
        start = time.time()
        res,pkloss = ping(cmd)
        t_time = int(time.time())
        ctime = t_time - 1800
        #擷取清空曆史資料sql語句
        sql = dbvalues.sql_clearhistory(ctime)
        sql1 = dbvalues.sql_insert(res,pkloss,ipaddress,t_time,t_ype,port)
        #清空48小時前資料
        mysql.db_delete(sql)
        #寫入新資料
        mysql.db_write(sql1)
        usetime = time.time()-start
        #防止sleep值為負數
        try:
            time.sleep(i -usetime)
        except Exception as a:
            pass
    return

if __name__  == "__main__":
        while True:
            dbvalues = dbinfo()
            mysql = db(dbvalues.dbinfo())
            li = []
            ipinfo=mysql.db_readall('select * from ipinfo;')
            mysql.db_close()
            monkey.patch_all()
    
            for x in ipinfo:
                s1 = gevent.spawn(value, x['i'],x['ipaddress'],x['port'],x['type'])
                li.append(s1)
            gevent.joinall(li)

# if __name__  == "__main__":
#     while True:
#         dbvalues = dbinfo()
#         mysql = db(dbvalues.dbinfo())
#         li = []
#         ipinfo=mysql.db_readall('select * from ipinfo;')
#         mysql.db_close()
#         #建立線程
#         for x in ipinfo:
#             t1 = threading.Thread(target=value,args=(x['i'],x['ipaddress'],x['port'],x['type']))
#             t1.start()
#             li.append(t1)
#         #所有線程不結束不進行下次循環
#         for t in li:
#             t.join()
          

tcping代碼

#!/usr/bin/env python3
"""
TCP Ping Test (defaults to port 80, 10000 packets)
Usage: ./tcpping.py host [port] [maxCount]
- Ctrl-C Exits with Results
"""

import sys
import socket
import time
import signal
from timeit import default_timer as timer

host = None
port = 80


maxCount = 10000
count = 0


# try:
#     sip = sys.argv[1]
# except IndexError:
#     print("Usage: tcpping.py host [port] [maxCount]")
#     sys.exit(1)

try:
    host = sys.argv[1]
except IndexError:
    print("Usage: tcpping.py host [port] [maxCount]")
    sys.exit(1)


try:
    port = int(sys.argv[2])
except ValueError:
    print("Error: Port Must be Integer:", sys.argv[3])
    sys.exit(1)
except IndexError:
    pass


try:
    maxCount = int(sys.argv[3])
except ValueError:
    print("Error: Max Count Value Must be Integer", sys.argv[3])
    sys.exit(1)
except IndexError:
    pass



passed = 0
failed = 0


def getResults():
    """ Summarize Results """

    lRate = 0
    if failed != 0:
        lRate = failed / (count) * 100
        lRate = "%.2f" % lRate

    print("\nTCP Ping Results: Connections (Total/Pass/Fail): [{:}/{:}/{:}] (Failed: {:}%)".format((count), passed, failed, str(lRate)))

def signal_handler(signal, frame):
    """ Catch Ctrl-C and Exit """
    getResults()
    sys.exit(0)


signal.signal(signal.SIGINT, signal_handler)


while count < maxCount:


    count += 1

    success = False


    s = socket.socket(
    socket.AF_INET, socket.SOCK_STREAM)


    s.settimeout(1)

    s_start = timer()


    try:
        # s.bind((sip,0))
        s.connect((host, int(port)))
        s.shutdown(socket.SHUT_RD)
        success = True
    

    except socket.timeout:
        print("Connection timed out!")
        failed += 1
    except OSError as e:
        print("OS Error:", e)
        failed += 1


    s_stop = timer()
    s_runtime = "%.2f" % (1000 * (s_stop - s_start))

    if success:
        print("Connected to %s[%s]: tcp_seq=%s time=%s ms" % (host, port, (count-1), s_runtime))
        passed += 1


    if count < maxCount:
        time.sleep(1)


getResults()      

log代碼

#!/usr/bin/env python3
#-*-coding:utf-8-*-
import logging,time
from settings import dbinfo
def logger():
    base_dir = dbinfo()
    log_name = base_dir.log_dir()
    logger = logging.getLogger()
    fh = logging.FileHandler(log_name)
    formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
    fh.setFormatter(formater)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(fh)
    return logger      

models代碼

#!/usr/bin/env python3
#-*-coding:utf-8-*-
#-----------------------------------------------------建立db類--------------------------------------------------------
import pymysql,settings
class db:
    def __init__(self,conninfo):
        self.host = conninfo['host']
        self.port = conninfo['port']
        self.user = conninfo['user']
        self.passwd = conninfo['passwd']
        self.db = conninfo['db']
        self.ch = conninfo['charset']
        self.conn = pymysql.connect(host=self.host,port = self.port,user = self.user,passwd = self.passwd, db=self.db,charset=self.ch)
        self.coursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
     
    def db_readone(self,sql):
        self.coursor.execute(sql)
        return self.coursor.fetchone()
    def db_readall(self,sql):
        self.coursor.execute(sql)
        return self.coursor.fetchall()
    def db_write(self,sql):
        self.coursor.execute(sql)
        self.conn.commit()
    def db_delete(self,sql):
        self.coursor.execute(sql)
        self.conn.commit()
    def db_close(self):
        self.conn.close()      

settings代碼

#!/usr/bin/env python3
#-*-coding:utf-8-*-
#---------------------------------------------配置檔案,定義sql語句,路徑等------------------------------------------------
import os,time

class dbinfo:
    def __init__(self):
        self.dir = os.path.dirname(os.path.abspath(__file__))
    def base_dir(self):
        return self.dir
    def log_dir(self):
        log_dir = self.dir + '/log/' + time.strftime('%Y-%m-%d',time.localtime()) + '.log'
        return log_dir
    def mtr(self,ip):
        mtr_dir = self.dir+'/log/'+ip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log'
        cmd = self.dir + '/mtr.sh'+' '+ip+' '+mtr_dir
        return cmd
    def dbinfo(self):
        dbinfo = {'host':'127.0.0.1','port':3306,'user':'root','passwd':'darkcs', 'db':'pingvalues','charset':'utf8'}
        return dbinfo
    def sql_sqlstatues(self,ip,t_ype,port,tables):
        sql = 'select time,ipaddress from %s where ipaddress = "%s" and type = "%s"  and port = %s order by nid  desc limit 1;'%(tables,ip,t_ype,port)
        return sql
    def sql_getvalue(self,ip,packet_count,t_ype,port):
        sql = 'select res,pkloss,ipaddress,time  from zabbixvalue where ipaddress = "%s" and type = "%s" and port = %s order by nid  desc limit %s;'%(ip,t_ype,port,packet_count)
        return sql
    def cmd(self):
        cmd = 'nohup' + ' '+self.dir + '/start-sql.py  >/dev/null 2>&1 &'
        return cmd
    def sql_clearhistory(self,ctime):
        sql = 'delete from zabbixvalue where time<%s;'%ctime
        return sql
    def sql_insert(self,res,pkloss,ipaddress,t_time,t_ype,port):
        sql = 'insert into zabbixvalue(res,pkloss,ipaddress,time,type,port) values(%s,%s,"%s",%s,"%s",%s)'%(res,pkloss,ipaddress,t_time,t_ype,port)
        return sql
    def sql_mtr(self,ip,packet_count,t_ype,port):
        sql = 'select avg(a.res) from (select res from zabbixvalue where ipaddress like "%s" and type like "%s" and port like %s order by nid  desc limit %s,%s) as a;'%(ip,t_ype,port,packet_count,packet_count)
        return sql
    def sql_inserttoipinfo(self,ip,i,port,t_ype,ctime):
        sql = 'insert into ipinfo(ipaddress,port,type,time,i) values("%s",%s,"%s","%s",%s);'%(ip,port,t_ype,ctime,i)
        return sql
    def sql_updatetoipinfo(self,ip,port,t_ype,ctime):
        sql = 'update ipinfo set time = "%s" where ipaddress like "%s" and type like "%s"  and port like %s'%(ctime,ip,t_ype,port)
        return sql
    def sql_clearipinfo(self,ctime):
        sql = 'delete from ipinfo where time < %s'%ctime
        return sql      

mtr shell腳本

#!/usr/bin/env bash
IP=$1
dir=$2
mtr -r -n -c 30 -w -b $IP >> $2      

mysql

mysql> desc ipinfo;
+-----------+-------------+------+-----+---------+----------------+
| Field     | Type        | Null | Key | Default | Extra          |
+-----------+-------------+------+-----+---------+----------------+
| nid       | int(11)     | NO   | PRI | NULL    | auto_increment |
| ipaddress | varchar(64) | YES  |     | NULL    |                |
| port      | int(11)     | YES  |     | NULL    |                |
| type      | varchar(64) | YES  |     | NULL    |                |
| time      | varchar(64) | YES  |     | NULL    |                |
| i         | float       | YES  |     | NULL    |                |
+-----------+-------------+------+-----+---------+----------------+
6 rows in set (0.00 sec)

mysql> mysql> desc zabbixvalue;
+-----------+-------------+------+-----+---------+----------------+
| Field     | Type        | Null | Key | Default | Extra          |
+-----------+-------------+------+-----+---------+----------------+
| nid       | int(11)     | NO   | PRI | NULL    | auto_increment |
| res       | float       | YES  |     | NULL    |                |
| pkloss    | int(11)     | YES  |     | NULL    |                |
| ipaddress | varchar(64) | YES  |     | NULL    |                |
| time      | int(11)     | YES  |     | NULL    |                |
| type      | varchar(64) | YES  |     | NULL    |                |
| port      | int(11)     | YES  |     | NULL    |                |
+-----------+-------------+------+-----+---------+----------------+
7 rows in set (0.00 sec)

      

建立 ipaddress type port三列聯合索引,避免資料量過大導緻全表掃描造成的系統cpu滿負載

mysql> show index from zabbixvalue;

+-------------+------------+-----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+

| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment |

| zabbixvalue | 0 | PRIMARY | 1 | nid | A | 223725 | NULL | NULL | | BTREE | | |

| zabbixvalue | 1 | ip_info | 1 | ipaddress | A | 1 | NULL | NULL | YES | BTREE | | |

| zabbixvalue | 1 | ip_info | 2 | type | A | 1 | NULL | NULL | YES | BTREE | | |

| zabbixvalue | 1 | ip_info | 3 | port | A | 1 | NULL | NULL | YES | BTREE | | |

| zabbixvalue | 1 | time_info | 1 | time | A | 90354 | NULL | NULL | YES | BTREE | | |

5 rows in set (0.00 sec)

zabbix_agentd.conf

UserParameter=dark_ping_restime[*],/etc/zabbix/darkping/bin.py     -t $1 -I restime
UserParameter=dark_ping_pkloss[*],/etc/zabbix/darkping/bin.py     -t $1 -I pkloss


UserParameter=dark_tcpping_restime[*],/etc/zabbix/darkping/bin.py     -t $1 -p $2 -T tcp -I restime
UserParameter=dark_tcpping_pkloss[*],/etc/zabbix/darkping/bin.py     -t $1 -p $2 -T tcp -I pkloss      

Item 配置略

2021.6.8 補一個udpping工具

client端

#!/usr/bin/env python
#-*-coding:utf-8-*-
import socket
import sys
import time
import string
import random
import signal
import os

INTERVAL = 1000  #unit ms
LEN =64 
IP=""
PORT=0

count=0
count_of_received=0
rtt_sum=0.0
rtt_min=99999999.0
rtt_max=0.0


# def signal_handler(signal, frame):
def signal_handler(*args, **kwargs):
    if count!=0 and count_of_received!=0:
        print('')
        print('--- ping statistics ---')
    if count!=0:
        print('%d packets transmitted, %d received, %.2f%% packet loss'%(count,count_of_received, (count-count_of_received)*100.0/count))
    if count_of_received!=0:
        print('rtt min/avg/max = %.2f/%.2f/%.2f ms'%(rtt_min,rtt_sum/count_of_received,rtt_max))
    os._exit(0)



def random_string(length):
        return ''.join(random.choice(string.ascii_letters+ string.digits) for x in range(length))
['./udpping.py' ,'8.8.8.8' ,'53 ']

if len(sys.argv) != 4 and len(sys.argv)!=5 :
    print(""" usage:""")
    print("""   this_program <dest_ip> <dest_port>""")
    print("""   this_program <dest_ip> <dest_port> "<options>" """)

    print()
    print(""" options:""")
    print("""   LEN         the length of payload, unit:byte""")
    print("""   INTERVAL    the seconds waited between sending each packet, as well as the timeout for reply packet, unit: ms""")

    print()
    print(" examples:")
    print('   ./udping.py 8.8.8.8 4000  10 "LEN=400;INTERVAL=2000"')
#    print("   ./udping.py fe80::5400:ff:aabb:ccdd 4000")
    print()

    exit()
IP=socket.gethostbyname(sys.argv[1])
PORT=int(sys.argv[2])
monitorcount = int(sys.argv[3])
is_ipv6=0;

if IP.find(":")!=-1:
    is_ipv6=1;


if len(sys.argv)==5:
    print (1)
    exec(sys.argv[4])

if LEN<5:
    print("LEN must be >=5")
    exit()
if INTERVAL<50:
    print("INTERVAL must be >=50")
    exit()

signal.signal(signal.SIGINT, signal_handler)

if not is_ipv6:
    sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
else:
    sock = socket.socket(socket.AF_INET6,socket.SOCK_DGRAM)

print("udping %s via port %d with %d bytes of payload"% (IP,PORT,LEN))
sys.stdout.flush()

while count<monitorcount:
    payload= random_string(LEN)
    sock.sendto(payload.encode(), (IP, PORT))
    time_of_send=time.time()
    deadline = time.time() + INTERVAL/1000.0
    received=0
    rtt=0.0

    while True:
        timeout=deadline - time.time()
        if timeout <0:
            break
        #print "timeout=",timeout
        sock.settimeout(timeout);
        try:
            recv_data,addr = sock.recvfrom(65536)
            # print(sock.recvfrom(65535))
            if recv_data== payload.encode()  and addr[0]==IP and addr[1]==PORT:
                rtt=((time.time()-time_of_send)*1000)
                print("Reply from",IP,"seq=%d"%count, "time=%.2f"%(rtt),"ms")
                sys.stdout.flush()
                received=1
                break
        except socket.timeout:
            break
        except :
            pass
    count+= 1
    if received==1:
        count_of_received+=1
        rtt_sum+=rtt
        rtt_max=max(rtt_max,rtt)
        rtt_min=min(rtt_min,rtt)
    else:
        print("Request timed out")
        sys.stdout.flush()

    time_remaining=deadline-time.time()
    if(time_remaining>0):
        time.sleep(time_remaining)
signal_handler()      

server端

#!/usr/bin env python3
import socket
while True:
    sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
    sock.bind(('ipaddress',port))
    data,addr = sock.recvfrom(65535)
    sock.sendto(data,addr)      

 也可以使用socat,實際測試使用socat會引入額外開銷,時延不準确

socat -v UDP-LISTEN:4000,fork PIPE      

queue版

https://www.cnblogs.com/darkchen/p/15524856.html

以驅魔為理想,為生計而奔波