天天看點

open-falcon api

import json
import logging
import requests
from urllib import parse

fmt = "%(asctime)s - %(levelname)s - %(filename)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s"
logging.basicConfig(level=logging.WARN, format=fmt)

FALCON_BASE_URL = "FALCON_BASE_URL"
USER = "USER"
PASSWORD = "PASSWORD"
BASE_TEMPLATE_NAME = "-base"


class FalconApi():
    def __init__(self, base_url, name, password):
        """初始化登陸,擷取 sig"""
        self.base_url = base_url
        path = "/api/v1/user/login"
        url = parse.urljoin(self.base_url, path)
        payload = {"name": name, "password": password}
        response = requests.post(url, json=payload)
        if response.status_code == 200:
            logging.debug("login success")
            response = response.json()
            self.name = response["name"]
            self.admin = response["admin"]
            self.sig = response["sig"]
            self.headers = {"Apitoken": json.dumps({"name": self.name, "sig": self.sig}), }

        else:
            logging.error("login error code: {}, message: {}".format(response.status_code, response.text))
            raise Exception("code: {}, message: {}".format(response.status_code, response.text))

    def post(self, url, jsondata, *args, **kwargs):
        response = requests.post(url, headers=self.headers, json=jsondata, *args, **kwargs)
        return response

    def get(self, url, *args, **kwargs):
        return requests.get(url, headers=self.headers, *args, **kwargs)

    def delete(self, url, jsondata=None, *args, **kwargs):
        return requests.delete(url, headers=self.headers, json=jsondata, *args, **kwargs)

    def put(self, url, jsondata, *args, **kwargs):
        return requests.put(url, headers=self.headers, json=jsondata, *args, **kwargs)

    def patch(self, url, jsondata, *args, **kwargs):
        return requests.patch(url, headers=self.headers, json=jsondata, *args, **kwargs)

    def create_hostgroup(self, hostgroup_name):
        """建立 hostgroup"""
        path = "/api/v1/hostgroup"
        url = parse.urljoin(self.base_url, path)
        payload = {"name": hostgroup_name}
        logging.info("start create hostgroup, hostgroup name: {}".format(hostgroup_name))
        response = self.post(url, jsondata=payload)
        if response.status_code == 200:
            logging.info("create hostgroup success, hostgroup name: {}".format(hostgroup_name))
            return response.json()
        else:
            logging.warning("create hostgroup error, response message: {}".format(response.text))
            return False

    def add_hosts_to_hostgroup(self, hostgroup_id, hosts: list):
        """添加主機清單到hostgroup中"""
        path = "/api/v1/hostgroup/host"
        url = parse.urljoin(self.base_url, path)
        payload = {"hostgroup_id": hostgroup_id, "hosts": hosts}
        logging.info("start add hosts to hostgroup, hostgroup id: {}, hosts: {}".format(hostgroup_id, hosts))
        response = self.post(url, jsondata=payload)
        if response.status_code == 200:
            logging.info(
                "start add hosts to hostgroup success, hostgroup id: {}, hosts: {}".format(hostgroup_id, hosts))
            return response.json()
        else:
            logging.info("add hosts to hostgroup error, response message: {}".format(response.text))
            return False

    def update_hosts_to_hostgroup(self, hostgroup_id, hosts: list, action):
        """更新hostgroup主機清單"""
        path = "/api/v1/hostgroup/{hostgroup_id}/host".format(hostgroup_id=hostgroup_id)
        url = parse.urljoin(self.base_url, path)
        payload = {"hosts": hosts, "action": action}
        logging.info("update hosts to hostgroup, hostgroup id: {}, hosts: {}".format(hostgroup_id, hosts))
        response = self.patch(url, jsondata=payload)
        if response.status_code == 200:
            logging.info(
                "update hosts to hostgroup success, hostgroup id: {}, hosts: {}".format(hostgroup_id, hosts))
            return response.json()
        else:
            logging.info("update to hostgroup error, response message: {}".format(response.text))
            return False

    def get_hostgroup_list(self):
        """擷取hostgroup list"""
        path = "/api/v1/hostgroup"
        url = parse.urljoin(self.base_url, path)
        logging.info("get hostgroup list")
        response = self.get(url)
        if response.status_code == 200:
            logging.info("get hostgroup list success")
            return response.json()
        else:
            logging.info("get hostgroup list error, response message: {}".format(response.text))
            return False

    def get_template_list(self):
        """擷取template清單"""
        path = "api/v1/template"
        url = parse.urljoin(self.base_url, path)
        logging.info("get template list")
        response = self.get(url)
        if response.status_code == 200:
            logging.info("get template list success")
            return response.json()
        else:
            logging.info("get template list error, response message: {}".format(response.text))
            return False

    def bind_template_to_hostgroup(self, hostgroup_id, template_id):
        """hostgroup綁定報警模版"""
        path = "/api/v1/hostgroup/template"
        url = parse.urljoin(self.base_url, path)
        logging.info("start bind template to hostgroup")
        payload = {"tpl_id": template_id, "grp_id": hostgroup_id}
        response = self.post(url, jsondata=payload)
        if response.status_code == 200:
            logging.info("bind template to hostgroup success")
            return response.json()
        else:
            logging.warning("bind template to hostgroup, response message: {}".format(response.text))
            return False

    def get_endporint_list(self):
        """擷取endporint list"""
        path = "/api/v1/graph/endpoint?q=.*&limit=10000"
        url = parse.urljoin(self.base_url, path)
        logging.info("get endporint list")
        response = self.get(url)
        if response.status_code == 200:
            logging.info("get endporint list success")
            return response.json()
        else:
            logging.info("get endporint list error, response message: {}".format(response.text))
            return False

    def create_team(self, team_name, resume):
        """添加Team"""
        path = "/api/v1/team"
        url = parse.urljoin(self.base_url, path)
        payload = {"team_name": team_name, "resume": resume, "users": []}
        logging.info("start create team , team name: {}, resume: {}".format(team_name, resume))
        response = self.post(url, jsondata=payload)
        if response.status_code == 200:
            logging.info("create team success , team name: {}, resume: {}".format(team_name, resume))
            return response.json()
        else:
            logging.info("create team error, response message: {}".format(response.text))
            return False

    def create_user(self, user_name, email, cname, phone):
        """添加user"""
        path = "/api/v1/user/create"
        url = parse.urljoin(self.base_url, path)
        payload = {"name": user_name, "password": "test", "email": email, "cnname": cname, "phone": phone}
        logging.info("start create user , user name: {}, resume: {}".format(user_name, cname))
        response = self.post(url, jsondata=payload)
        if response.status_code == 200:
            logging.info("create user success , user name: {}, resume: {}".format(user_name, cname))
            return response.json()
        else:
            logging.warning("create user error, response message: {}".format(response.text).strip())
            return False

    def add_user_to_team(self, team_id, users: list):
        """添加使用者到team"""
        path = "/api/v1/team/user"
        url = parse.urljoin(self.base_url, path)
        payload = {"team_id": team_id, "users": users}
        logging.info("start add user to team , team id: {}, users: {}".format(team_id, users))
        response = self.post(url, jsondata=payload)
        if response.status_code == 200:
            logging.info("add user to team success , team id: {}, resume: {}".format(team_id, users))
            return response.json()
        else:
            logging.warning("add user to teamerror, response message: {}".format(response.text))
            return False

    def get_team_info_by_name(self, team_name):
        """根據team name擷取team 資訊"""
        path = "/api/v1/team/name/{team_name}".format(team_name=team_name)
        url = parse.urljoin(self.base_url, path)
        logging.info("get team info by name")
        response = self.get(url)
        if response.status_code == 200:
            logging.info("get team info by name success")
            return response.json()
        else:
            logging.warning("get team info by name error, response message: {}".format(response.text))
            return False

    def get_user_list(self):
        """擷取使用者清單"""
        path = "/api/v1/user/users"
        url = parse.urljoin(self.base_url, path)
        logging.info("get user list")
        response = self.get(url)
        if response.status_code == 200:
            logging.info("get user list success")
            return response.json()
        else:
            logging.warning("get user list error, response message: {}".format(response.text))
            return False

    def delete_user(self, user_id: int):
        """删除使用者"""
        path = "/api/v1/admin/delete_user"
        url = parse.urljoin(self.base_url, path)
        payload = {"user_id": user_id}
        logging.info("delete user")
        response = self.delete(url, jsondata=payload)
        if response.status_code == 200:
            logging.info("delete user success")
            return response.json()
        else:
            logging.warning("delete user error, response message: {}".format(response.text))
            return False

    def update_team(self, team_id, users: list):
        """更新組"""
        path = "/api/v1/team"
        url = parse.urljoin(self.base_url, path)
        payload = {"team_id": team_id, "users": users}
        logging.info("update team")
        response = self.put(url, jsondata=payload)
        if response.status_code == 200:
            logging.info("update team success")
            return response.json()
        else:
            logging.warning("update team error, response message: {}".format(response.text))
            return False

    def get_hostgroup_info_by_id(self, hostgroup_id):
        """擷取組資訊"""
        path = "/api/v1/hostgroup/{hostgroup_id}".format(hostgroup_id=hostgroup_id)
        url = parse.urljoin(self.base_url, path)
        logging.info("get hostgroup")
        response = self.get(url)
        if response.status_code == 200:
            logging.info("get hostgroup success")
            return response.json()
        else:
            logging.warning("get hostgroup error, response message: {}".format(response.text))
            return False

    def delete_hostgroup(self, hostgroup_id: int):
        """删除hostgroup"""
        path = "/api/v1/hostgroup/{hostgroup_id}".format(hostgroup_id=hostgroup_id)
        url = parse.urljoin(self.base_url, path)
        logging.info("delete hostgroup")
        response = self.delete(url, jsondata=None)
        if response.status_code == 200:
            logging.warning("delete hostgroup {} success".format(hostgroup_id))
            return response.json()
        else:
            logging.warning("delete hostgroup error, response message: {}".format(response.text))
            return False

           

繼續閱讀