天天看點

通過Python将監控資料由influxdb寫入到MySQL

一.項目背景

我們知道InfluxDB是最受歡迎的時序資料庫(TSDB)。InfluxDB具有 持續高并發寫入、無更新;資料壓縮存儲;低查詢延時 的特點。從下面這個權威的統計圖中,就可以看出InfluxDB的熱度。

通過Python将監控資料由influxdb寫入到MySQL

 InfluxDB可以作為 性能監控、應用程式名額、物聯網傳感器資料和實時分析等的後端存儲。

我們的DB性能監控體系是基于Telegraf+InfluxDB+Grafana元件搭建,如下圖所示。

通過Python将監控資料由influxdb寫入到MySQL

 但是這個體系沒有和既有的CMDB打通,例如,不清楚公司目前有多少台資料庫執行個體已部署了監控?是不是有部分執行個體的監控漏掉了?而目前公司CMDB的資訊都儲存在了MySQL資料庫中,是以,需要先實作 Influxdb 與 MySQL DB 的資料互通互聯 。此功能的實作時借助Python完成的。

在此項目中,為便于說明示範,抽象簡化後,需求概況為:将InfluxDB中儲存的各個伺服器的IP查詢出來儲存到指定的MySQL資料庫中。進一步分解任務,因為measurement(表)為disk 存儲有 Server host的資料,根據其命名規則,可host逆向拼湊出Server IP資料。

是以,此需求簡化為:從InfluxDB的disk【measurement、表】中找出host【tag】對應的value,加工處理後,儲存到MySQL。

二.安裝運作環境遇到的錯誤

1.TypeError: Struct() 錯誤

調試時,報如下錯誤,查找資料發現,和python版本有關。

錯誤資訊如下:

...... 
File "/usr/lib/python2.7/site-packages/influxdb/client.py", line 527, in query
    expected_response_code=expected_response_code
  File "/usr/lib/python2.7/site-packages/influxdb/client.py", line 361, in request
    raw=False)
  File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 129, in unpackb
    ret = unpacker._unpack()
  File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 671, in _unpack
    ret[key] = self._unpack(EX_CONSTRUCT)
  File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 644, in _unpack
    ret.append(self._unpack(EX_CONSTRUCT))
  File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 671, in _unpack
    ret[key] = self._unpack(EX_CONSTRUCT)
  File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 644, in _unpack
    ret.append(self._unpack(EX_CONSTRUCT))
  File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 671, in _unpack
    ret[key] = self._unpack(EX_CONSTRUCT)
  File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 644, in _unpack
    ret.append(self._unpack(EX_CONSTRUCT))
  File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 644, in _unpack
    ret.append(self._unpack(EX_CONSTRUCT))
  File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 697, in _unpack
    return self._ext_hook(n, bytes(obj))
  File "/usr/lib/python2.7/site-packages/influxdb/client.py", line 1247, in _msgpack_parse_hook
    (epoch_s, epoch_ns) = struct.unpack(">QI", data)
TypeError: Struct() argument 1 must be string, not unicode      

報錯的python版本為Python 2.7.5,檢視資料,建議更新到2.7.7以上。為規避這個錯誤,我們将版本更新到了Python 3.6.8

2.更新安裝Python 3.6.8

安裝執行make install時報錯,錯誤資訊如下:

zipimport.ZipImportError: can’t decompress data; zlib not available
make: * [install] Error 1      

原因是缺少了zlib的解壓縮類庫,

解決方案,執行以下指令

yum -y install zlib*      

 3.引入influxdb插件報錯

運作報錯,提示資訊如下:

..........   
 from influxdb import InfluxDBClient
ModuleNotFoundError: No module named 'influxdb'      

解決方案:

git clone https://github.com/influxdb/influxdb-python.git

cd influxdb-python

pip3 install -r requirements.txt

python3 setup.py install      

成功安裝的記錄如下:

。。。。。。。。。。。。
Using /usr/local/lib/python3.6/site-packages
Searching for urllib3==1.25.6
Best match: urllib3 1.25.6
Adding urllib3 1.25.6 to easy-install.pth file

Using /usr/lib/python3.6/site-packages
Finished processing dependencies for influxdb==5.3.1      

驗證是否成功安裝,打開python輸入

from influxdb import client as influxdb      

如果沒有錯誤資訊,則表示安裝成功

4.Python3 環境執行mysql報錯

... 
import MySQLdb
ModuleNotFoundError: No module named 'MySQLdb'      

環境測試

通過Python将監控資料由influxdb寫入到MySQL

原因分析: Python 2安裝的是mysql-python,而Python 3應該安裝mysqlclient。

是以需要:

pip3 install mysqlclient

但是報錯,錯誤資訊如下:

Traceback (most recent call last):
  File "/usr/local/bin/pip3", line 11, in <module>
    load_entry_point('pip==1.5.4', 'console_scripts', 'pip3')()
  File "/usr/local/lib/python3.6/site-packages/setuptools-39.2.0-py3.6.egg/pkg_resources/__init__.py", line 476, in load_entry_point
  File "/usr/local/lib/python3.6/site-packages/setuptools-39.2.0-py3.6.egg/pkg_resources/__init__.py", line 2700, in load_entry_point
  File "/usr/local/lib/python3.6/site-packages/setuptools-39.2.0-py3.6.egg/pkg_resources/__init__.py", line 2318, in load
  File "/usr/local/lib/python3.6/site-packages/setuptools-39.2.0-py3.6.egg/pkg_resources/__init__.py", line 2324, in resolve
  File "/usr/local/lib/python3.6/site-packages/pip-1.5.4-py3.6.egg/pip/__init__.py", line 9, in <module>
    from pip.log import logger
  File "/usr/local/lib/python3.6/site-packages/pip-1.5.4-py3.6.egg/pip/log.py", line 9, in <module>
    from pip._vendor import colorama, pkg_resources
  File "/usr/local/lib/python3.6/site-packages/pip-1.5.4-py3.6.egg/pip/_vendor/pkg_resources.py", line 1423, in <module>
    register_loader_type(importlib_bootstrap.SourceFileLoader, DefaultProvider)
AttributeError: module 'importlib._bootstrap' has no attribute 'SourceFileLoader'      

原因分析:

pip-1.5.4,遠低于pip目前的版本,

解決方案:

下載下傳新的版本安裝更新pip,下載下傳網址https://pypi.org/project/pip/#files

例如下載下傳了pip-21.1.1.tar.gz,安裝步驟如下:

step 1
tar -zxvf pip-21.1.1.tar.gz
step 2
cd pip-21.1.1
step 3
python3 setup.py build
step 4
python3 setup.py install      

再次安裝 mysqlclient

pip3 install mysqlclient      

安裝過程不再報錯,驗證安裝OK。

通過Python将監控資料由influxdb寫入到MySQL

 三.部分代碼說明

1.對象類型及屬性檢視--print(type(?))和print(dir(?))

因為我們平常對influxdb使用的相對較少,不像關系型資料庫那麼熟練,通過python檢視influxdb資料,比較陌生,不知道傳回值對象的類型是什麼或者怎麼操作。這時候可以通過print(type(?)) 和print(dir(?))來檢視。

如下圖,假如response是influxdb的query傳回值。

通過Python将監控資料由influxdb寫入到MySQL

 print顯示的傳回資訊如下:

通過Python将監控資料由influxdb寫入到MySQL

注意 上面有一個 get_points 方法,不知道你找到了嗎? 這個需要特别注意,後面我們就會講到。

2. Getting all points

Using query() that returns data in 'influxdb.resultset.ResultSet' format.This is the sample output of the query():

Result: ResultSet({'(u'cpu', None)': [{u'usage_guest_nice': 0, u'usage_user': 0.90783871790308868, u'usage_nice': 0, u'usage_steal': 0, u'usage_iowait': 0.056348610076366427, u'host': u'xxx.xxx.hostname.com', u'usage_guest': 0, u'usage_idle': 98.184322579062794, u'usage_softirq': 0.0062609566755314457, u'time': u'2016-06-26T16:25:00Z', u'usage_irq': 0, u'cpu': u'cpu-total', u'usage_system': 0.84522915123660536}]})      

Using 

rs.get_points()

 will return a generator for all the points in the ResultSet.

you can Filtering by measurement

rs = cli.query("SELECT * from cpu") 
cpu_points = list(rs.get_points(measurement='cpu'))      

or you can Filtering by tags

rs = cli.query("SELECT * from cpu")
cpu_influxdb_com_points = list(rs.get_points(tags={"host_name": "influxdb.com"}))      

or you can Filtering by measurement and tags

rs = cli.query("SELECT * from cpu")
points = list(rs.get_points(measurement='cpu', tags={'host_name': 'influxdb.com'}))      

3.telegraf模闆中關于host的命名

我們知道telegraf 模闆中有host參數(預設在/etc/telegraf.conf設定),在grafana界面上可以根據這個host參數進行刷選,進一步定位到想要檢視的 Server 或 DB 執行個體。因為公司有多個項目組,每個項目組負責不同的系統,有各自的DB Server 、執行個體。為了區分這個Server究竟屬于那個項目組(Team),是以,我們在定義Host時,不是簡單的指派Server IP,而是 産品線 + Server IP的後兩位。如此,也友善 監控、研發、運維的同學快速找到Server,判斷相應的業務項目組。

例如 訂單中心 所在的 DB Server 為 18.19.20.21 其host為 order_20_21;CRM 所在的 DB Server 為 18.19.22.23 其host為crm_22_23;ERP所在的DB Server 為 18.19.24.25其host為erp_24_25;app所在的DB Server 為 18.19.34.35其host為app_34_35;等等。

四 主要代碼

 1.在MySQL執行個體上建立儲存Server資訊的表

CREATE TABLE `monitor_serverdb` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `ip_address` varchar(255) NOT NULL DEFAULT '',
  `datetime_created` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '資料行建立時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4;      

2.連接配接MySQL的python檔案db_conn.py

代碼如下:

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import MySQLdb

# 打開資料庫連接配接
db = MySQLdb.connect("server DB執行個體IP","DB 使用者名","DB PWD","DB Name",charset='utf8mb4',port=資料庫端口号)      

3.連接配接InfluxDB的python檔案collect_dbhost_telegraf_info.py

主要代碼:

#!/usr/bin/python
# -*- coding: UTF-8 -*-



from influxdb import InfluxDBClient

import pytz
import time
import dateutil.parser
import datetime


class DBApi(object):
    """
    通過infludb擷取資料
    """

    def __init__(self, ip, port):
        """
        初始化資料
        :param ip:influxdb位址
        :param port: 端口
        """
        self.db_name = 'telegraf'
        self.use_cpu_table = 'cpu' # cpu使用率表
        self.phy_mem_table = 'mem'# 實體記憶體表
        self.traffic_table = 'net'# 接收流量表
        self.disk_table = 'disk'# 磁盤表
        self.client = InfluxDBClient(ip, port, 'DB UID', 'DB PWD', self.db_name)  # 連接配接influxdb資料庫
        print ('test link influxdb')

    def get_telegraf_list(self):
        """
        :param host: 查詢的主機host (telegraf 配置參數中的host欄位)
        """

        print ('step 1 check get deployment')
        response = {}
        telegraf_list = self.client.query('SHOW TAG VALUES FROM disk WITH KEY = "host";')

        return telegraf_list      

4.執行檔案collect_monitordb_info.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-


import os
import time
from collect_dbhost_telegraf_info.py import DBApi
## get mysqldb connection
import db_conn
mysqldb = db_conn.db
# use cursor
cursor = mysqldb.cursor()

###資料收集前,清除之前收集的資料
sql_delete = "delete from monitor_serverdb "
cursor.execute(sql_delete)
mysqldb.commit()

# 連接配接 influxdb
# INFLUXDB_IP influxdb所在主機
# INFLUXDB_PROT influxdb端口
db = DBApi(ip='influxdb 所在主機', port='端口号')
###print(db)
response = db.get_telegraf_list()
#print (response)
#print(type(response))
#print(dir(response))
disk_points = list(response.get_points(measurement='disk'))
#print(disk_points)
#print(dir(disk_points))
for disk_check in disk_points:
  ##print(disk_check)
  for host_key in disk_check.keys():
    if host_key == "value":
      ##print(disk_check[host_key])
      ##基于host的命名進行切割,分割符為_,傳回值為清單
      diskhost_split = disk_check[host_key].split('_')
      ##将清單中的後兩個元素提取出來,組成server IP,因為集團IP前兩位一樣,是以如此拼湊。
      ##print(type(diskhost_split))
      ##print(diskhost_split)
      ##print(diskhost_split[-2:-3:-1][0])
      disk_ip = '110.' + '120.' + diskhost_split[-2:-3:-1][0] + '.' + diskhost_split[-1:-2:-1][0]
      print(disk_ip)
      sql_insert = "insert into monitor_serverdb(ip_address) " \
                      "values('%s')" % \
                      (disk_ip)
      cursor.execute(sql_insert)
      mysqldb.commit()      

5.執行指令如下

python3 collect_monitordb_info.py      

 五.參考資料

1.https://stackoverflow.com/questions/38040644/processing-influx-db-output-of-influxdb-resultset-resultset/38055771

2.python 擷取指定字元前面或後面的所有字元

https://www.cnblogs.com/syw20170419/p/10969191.html

3.https://www.cnblogs.com/jadexia/p/7797791.html

4.https://blog.csdn.net/Linking_sea/article/details/112690038

5.InfluxDB 入門

https://www.jianshu.com/p/f0905f36e9c3

6.https://grafana.com/grafana/

7.https://github.com/dbarun/mysql_archiver#readme

繼續閱讀