import json
import logging
import requests
from urllib import parse
fmt = "%(asctime)s - %(levelname)s - %(filename)s - %(module)s - %(funcName)s - %(lineno)d - %(message)s"
logging.basicConfig(level=logging.WARN, format=fmt)
FALCON_BASE_URL = "FALCON_BASE_URL"
USER = "USER"
PASSWORD = "PASSWORD"
BASE_TEMPLATE_NAME = "-base"
class FalconApi():
def __init__(self, base_url, name, password):
"""初始化登陸,擷取 sig"""
self.base_url = base_url
path = "/api/v1/user/login"
url = parse.urljoin(self.base_url, path)
payload = {"name": name, "password": password}
response = requests.post(url, json=payload)
if response.status_code == 200:
logging.debug("login success")
response = response.json()
self.name = response["name"]
self.admin = response["admin"]
self.sig = response["sig"]
self.headers = {"Apitoken": json.dumps({"name": self.name, "sig": self.sig}), }
else:
logging.error("login error code: {}, message: {}".format(response.status_code, response.text))
raise Exception("code: {}, message: {}".format(response.status_code, response.text))
def post(self, url, jsondata, *args, **kwargs):
response = requests.post(url, headers=self.headers, json=jsondata, *args, **kwargs)
return response
def get(self, url, *args, **kwargs):
return requests.get(url, headers=self.headers, *args, **kwargs)
def delete(self, url, jsondata=None, *args, **kwargs):
return requests.delete(url, headers=self.headers, json=jsondata, *args, **kwargs)
def put(self, url, jsondata, *args, **kwargs):
return requests.put(url, headers=self.headers, json=jsondata, *args, **kwargs)
def patch(self, url, jsondata, *args, **kwargs):
return requests.patch(url, headers=self.headers, json=jsondata, *args, **kwargs)
def create_hostgroup(self, hostgroup_name):
"""建立 hostgroup"""
path = "/api/v1/hostgroup"
url = parse.urljoin(self.base_url, path)
payload = {"name": hostgroup_name}
logging.info("start create hostgroup, hostgroup name: {}".format(hostgroup_name))
response = self.post(url, jsondata=payload)
if response.status_code == 200:
logging.info("create hostgroup success, hostgroup name: {}".format(hostgroup_name))
return response.json()
else:
logging.warning("create hostgroup error, response message: {}".format(response.text))
return False
def add_hosts_to_hostgroup(self, hostgroup_id, hosts: list):
"""添加主機清單到hostgroup中"""
path = "/api/v1/hostgroup/host"
url = parse.urljoin(self.base_url, path)
payload = {"hostgroup_id": hostgroup_id, "hosts": hosts}
logging.info("start add hosts to hostgroup, hostgroup id: {}, hosts: {}".format(hostgroup_id, hosts))
response = self.post(url, jsondata=payload)
if response.status_code == 200:
logging.info(
"start add hosts to hostgroup success, hostgroup id: {}, hosts: {}".format(hostgroup_id, hosts))
return response.json()
else:
logging.info("add hosts to hostgroup error, response message: {}".format(response.text))
return False
def update_hosts_to_hostgroup(self, hostgroup_id, hosts: list, action):
"""更新hostgroup主機清單"""
path = "/api/v1/hostgroup/{hostgroup_id}/host".format(hostgroup_id=hostgroup_id)
url = parse.urljoin(self.base_url, path)
payload = {"hosts": hosts, "action": action}
logging.info("update hosts to hostgroup, hostgroup id: {}, hosts: {}".format(hostgroup_id, hosts))
response = self.patch(url, jsondata=payload)
if response.status_code == 200:
logging.info(
"update hosts to hostgroup success, hostgroup id: {}, hosts: {}".format(hostgroup_id, hosts))
return response.json()
else:
logging.info("update to hostgroup error, response message: {}".format(response.text))
return False
def get_hostgroup_list(self):
"""擷取hostgroup list"""
path = "/api/v1/hostgroup"
url = parse.urljoin(self.base_url, path)
logging.info("get hostgroup list")
response = self.get(url)
if response.status_code == 200:
logging.info("get hostgroup list success")
return response.json()
else:
logging.info("get hostgroup list error, response message: {}".format(response.text))
return False
def get_template_list(self):
"""擷取template清單"""
path = "api/v1/template"
url = parse.urljoin(self.base_url, path)
logging.info("get template list")
response = self.get(url)
if response.status_code == 200:
logging.info("get template list success")
return response.json()
else:
logging.info("get template list error, response message: {}".format(response.text))
return False
def bind_template_to_hostgroup(self, hostgroup_id, template_id):
"""hostgroup綁定報警模版"""
path = "/api/v1/hostgroup/template"
url = parse.urljoin(self.base_url, path)
logging.info("start bind template to hostgroup")
payload = {"tpl_id": template_id, "grp_id": hostgroup_id}
response = self.post(url, jsondata=payload)
if response.status_code == 200:
logging.info("bind template to hostgroup success")
return response.json()
else:
logging.warning("bind template to hostgroup, response message: {}".format(response.text))
return False
def get_endporint_list(self):
"""擷取endporint list"""
path = "/api/v1/graph/endpoint?q=.*&limit=10000"
url = parse.urljoin(self.base_url, path)
logging.info("get endporint list")
response = self.get(url)
if response.status_code == 200:
logging.info("get endporint list success")
return response.json()
else:
logging.info("get endporint list error, response message: {}".format(response.text))
return False
def create_team(self, team_name, resume):
"""添加Team"""
path = "/api/v1/team"
url = parse.urljoin(self.base_url, path)
payload = {"team_name": team_name, "resume": resume, "users": []}
logging.info("start create team , team name: {}, resume: {}".format(team_name, resume))
response = self.post(url, jsondata=payload)
if response.status_code == 200:
logging.info("create team success , team name: {}, resume: {}".format(team_name, resume))
return response.json()
else:
logging.info("create team error, response message: {}".format(response.text))
return False
def create_user(self, user_name, email, cname, phone):
"""添加user"""
path = "/api/v1/user/create"
url = parse.urljoin(self.base_url, path)
payload = {"name": user_name, "password": "test", "email": email, "cnname": cname, "phone": phone}
logging.info("start create user , user name: {}, resume: {}".format(user_name, cname))
response = self.post(url, jsondata=payload)
if response.status_code == 200:
logging.info("create user success , user name: {}, resume: {}".format(user_name, cname))
return response.json()
else:
logging.warning("create user error, response message: {}".format(response.text).strip())
return False
def add_user_to_team(self, team_id, users: list):
"""添加使用者到team"""
path = "/api/v1/team/user"
url = parse.urljoin(self.base_url, path)
payload = {"team_id": team_id, "users": users}
logging.info("start add user to team , team id: {}, users: {}".format(team_id, users))
response = self.post(url, jsondata=payload)
if response.status_code == 200:
logging.info("add user to team success , team id: {}, resume: {}".format(team_id, users))
return response.json()
else:
logging.warning("add user to teamerror, response message: {}".format(response.text))
return False
def get_team_info_by_name(self, team_name):
"""根據team name擷取team 資訊"""
path = "/api/v1/team/name/{team_name}".format(team_name=team_name)
url = parse.urljoin(self.base_url, path)
logging.info("get team info by name")
response = self.get(url)
if response.status_code == 200:
logging.info("get team info by name success")
return response.json()
else:
logging.warning("get team info by name error, response message: {}".format(response.text))
return False
def get_user_list(self):
"""擷取使用者清單"""
path = "/api/v1/user/users"
url = parse.urljoin(self.base_url, path)
logging.info("get user list")
response = self.get(url)
if response.status_code == 200:
logging.info("get user list success")
return response.json()
else:
logging.warning("get user list error, response message: {}".format(response.text))
return False
def delete_user(self, user_id: int):
"""删除使用者"""
path = "/api/v1/admin/delete_user"
url = parse.urljoin(self.base_url, path)
payload = {"user_id": user_id}
logging.info("delete user")
response = self.delete(url, jsondata=payload)
if response.status_code == 200:
logging.info("delete user success")
return response.json()
else:
logging.warning("delete user error, response message: {}".format(response.text))
return False
def update_team(self, team_id, users: list):
"""更新組"""
path = "/api/v1/team"
url = parse.urljoin(self.base_url, path)
payload = {"team_id": team_id, "users": users}
logging.info("update team")
response = self.put(url, jsondata=payload)
if response.status_code == 200:
logging.info("update team success")
return response.json()
else:
logging.warning("update team error, response message: {}".format(response.text))
return False
def get_hostgroup_info_by_id(self, hostgroup_id):
"""擷取組資訊"""
path = "/api/v1/hostgroup/{hostgroup_id}".format(hostgroup_id=hostgroup_id)
url = parse.urljoin(self.base_url, path)
logging.info("get hostgroup")
response = self.get(url)
if response.status_code == 200:
logging.info("get hostgroup success")
return response.json()
else:
logging.warning("get hostgroup error, response message: {}".format(response.text))
return False
def delete_hostgroup(self, hostgroup_id: int):
"""删除hostgroup"""
path = "/api/v1/hostgroup/{hostgroup_id}".format(hostgroup_id=hostgroup_id)
url = parse.urljoin(self.base_url, path)
logging.info("delete hostgroup")
response = self.delete(url, jsondata=None)
if response.status_code == 200:
logging.warning("delete hostgroup {} success".format(hostgroup_id))
return response.json()
else:
logging.warning("delete hostgroup error, response message: {}".format(response.text))
return False